1 ////////////////////////////////////////////////////////////////////////////
3 // Copyright 2015 Realm Inc.
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
17 ////////////////////////////////////////////////////////////////////////////
19 #include "impl/realm_coordinator.hpp"
21 #include "impl/collection_notifier.hpp"
22 #include "impl/external_commit_helper.hpp"
23 #include "impl/transact_log_handler.hpp"
24 #include "impl/weak_realm_notifier.hpp"
25 #include "binding_context.hpp"
26 #include "object_schema.hpp"
27 #include "object_store.hpp"
31 #include "sync/sync_config.hpp"
32 #include "sync/sync_manager.hpp"
33 #include "sync/sync_session.hpp"
36 #include <realm/group_shared.hpp>
37 #include <realm/lang_bind_helper.hpp>
38 #include <realm/string_data.hpp>
41 #include <unordered_map>
43 using namespace realm;
44 using namespace realm::_impl;
46 static auto& s_coordinator_mutex = *new std::mutex;
47 static auto& s_coordinators_per_path = *new std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>>;
49 std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
51 std::lock_guard<std::mutex> lock(s_coordinator_mutex);
53 auto& weak_coordinator = s_coordinators_per_path[path];
54 if (auto coordinator = weak_coordinator.lock()) {
58 auto coordinator = std::make_shared<RealmCoordinator>();
59 weak_coordinator = coordinator;
63 std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(const Realm::Config& config)
65 auto coordinator = get_coordinator(config.path);
66 std::lock_guard<std::mutex> lock(coordinator->m_realm_mutex);
67 coordinator->set_config(config);
71 std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
73 std::lock_guard<std::mutex> lock(s_coordinator_mutex);
74 auto it = s_coordinators_per_path.find(path);
75 return it == s_coordinators_per_path.end() ? nullptr : it->second.lock();
78 void RealmCoordinator::create_sync_session()
84 if (!m_config.encryption_key.empty() && !m_config.sync_config->realm_encryption_key) {
85 throw std::logic_error("A realm encryption key was specified in Realm::Config but not in SyncConfig");
86 } else if (m_config.sync_config->realm_encryption_key && m_config.encryption_key.empty()) {
87 throw std::logic_error("A realm encryption key was specified in SyncConfig but not in Realm::Config");
88 } else if (m_config.sync_config->realm_encryption_key &&
89 !std::equal(m_config.sync_config->realm_encryption_key->begin(), m_config.sync_config->realm_encryption_key->end(),
90 m_config.encryption_key.begin(), m_config.encryption_key.end())) {
91 throw std::logic_error("The realm encryption key specified in SyncConfig does not match the one in Realm::Config");
94 auto sync_config = *m_config.sync_config;
95 sync_config.validate_sync_history = false;
96 m_sync_session = SyncManager::shared().get_session(m_config.path, sync_config);
98 std::weak_ptr<RealmCoordinator> weak_self = shared_from_this();
99 SyncSession::Internal::set_sync_transact_callback(*m_sync_session,
100 [weak_self](VersionID old_version, VersionID new_version) {
101 if (auto self = weak_self.lock()) {
102 if (self->m_transaction_callback)
103 self->m_transaction_callback(old_version, new_version);
104 if (self->m_notifier)
105 self->m_notifier->notify_others();
111 void RealmCoordinator::set_config(const Realm::Config& config)
113 if (config.encryption_key.data() && config.encryption_key.size() != 64)
114 throw InvalidEncryptionKeyException();
115 if (config.schema_mode == SchemaMode::Immutable && config.sync_config)
116 throw std::logic_error("Synchronized Realms cannot be opened in immutable mode");
117 if (config.schema_mode == SchemaMode::Additive && config.migration_function)
118 throw std::logic_error("Realms opened in Additive-only schema mode do not use a migration function");
119 if (config.schema_mode == SchemaMode::Immutable && config.migration_function)
120 throw std::logic_error("Realms opened in immutable mode do not use a migration function");
121 if (config.schema_mode == SchemaMode::ReadOnlyAlternative && config.migration_function)
122 throw std::logic_error("Realms opened in read-only mode do not use a migration function");
123 if (config.schema_mode == SchemaMode::Immutable && config.initialization_function)
124 throw std::logic_error("Realms opened in immutable mode do not use an initialization function");
125 if (config.schema_mode == SchemaMode::ReadOnlyAlternative && config.initialization_function)
126 throw std::logic_error("Realms opened in read-only mode do not use an initialization function");
127 if (config.schema && config.schema_version == ObjectStore::NotVersioned)
128 throw std::logic_error("A schema version must be specified when the schema is specified");
129 if (!config.realm_data.is_null() && (!config.immutable() || !config.in_memory))
130 throw std::logic_error("In-memory realms initialized from memory buffers can only be opened in read-only mode");
131 if (!config.realm_data.is_null() && !config.path.empty())
132 throw std::logic_error("Specifying both memory buffer and path is invalid");
133 if (!config.realm_data.is_null() && !config.encryption_key.empty())
134 throw std::logic_error("Memory buffers do not support encryption");
135 // ResetFile also won't use the migration function, but specifying one is
136 // allowed to simplify temporarily switching modes during development
138 bool no_existing_realm = std::all_of(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers),
139 [](auto& notifier) { return notifier.expired(); });
140 if (no_existing_realm) {
144 if (m_config.immutable() != config.immutable()) {
145 throw MismatchedConfigException("Realm at path '%1' already opened with different read permissions.", config.path);
147 if (m_config.in_memory != config.in_memory) {
148 throw MismatchedConfigException("Realm at path '%1' already opened with different inMemory settings.", config.path);
150 if (m_config.encryption_key != config.encryption_key) {
151 throw MismatchedConfigException("Realm at path '%1' already opened with a different encryption key.", config.path);
153 if (m_config.schema_mode != config.schema_mode) {
154 throw MismatchedConfigException("Realm at path '%1' already opened with a different schema mode.", config.path);
156 if (config.schema && m_schema_version != ObjectStore::NotVersioned && m_schema_version != config.schema_version) {
157 throw MismatchedConfigException("Realm at path '%1' already opened with different schema version.", config.path);
160 #if REALM_ENABLE_SYNC
161 if (bool(m_config.sync_config) != bool(config.sync_config)) {
162 throw MismatchedConfigException("Realm at path '%1' already opened with different sync configurations.", config.path);
165 if (config.sync_config) {
166 if (m_config.sync_config->user != config.sync_config->user) {
167 throw MismatchedConfigException("Realm at path '%1' already opened with different sync user.", config.path);
169 if (m_config.sync_config->realm_url() != config.sync_config->realm_url()) {
170 throw MismatchedConfigException("Realm at path '%1' already opened with different sync server URL.", config.path);
172 if (m_config.sync_config->transformer != config.sync_config->transformer) {
173 throw MismatchedConfigException("Realm at path '%1' already opened with different transformer.", config.path);
175 if (m_config.sync_config->realm_encryption_key != config.sync_config->realm_encryption_key) {
176 throw MismatchedConfigException("Realm at path '%1' already opened with sync session encryption key.", config.path);
181 // Realm::update_schema() handles complaining about schema mismatches
185 std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
187 // realm must be declared before lock so that the mutex is released before
188 // we release the strong reference to realm, as Realm's destructor may want
189 // to acquire the same lock
190 std::shared_ptr<Realm> realm;
191 std::unique_lock<std::mutex> lock(m_realm_mutex);
195 auto schema = std::move(config.schema);
196 auto migration_function = std::move(config.migration_function);
197 auto initialization_function = std::move(config.initialization_function);
201 AnyExecutionContextID execution_context(config.execution_context);
202 for (auto& cached_realm : m_weak_realm_notifiers) {
203 if (!cached_realm.is_cached_for_execution_context(execution_context))
205 // can be null if we jumped in between ref count hitting zero and
206 // unregister_realm() getting the lock
207 if ((realm = cached_realm.realm())) {
208 // If the file is uninitialized and was opened without a schema,
209 // do the normal schema init
210 if (realm->schema_version() == ObjectStore::NotVersioned)
213 // Otherwise if we have a realm schema it needs to be an exact
214 // match (even having the same properties but in different
215 // orders isn't good enough)
216 if (schema && realm->schema() != *schema)
217 throw MismatchedConfigException("Realm at path '%1' already opened on current thread with different schema.", config.path);
225 bool should_initialize_notifier = !config.immutable() && config.automatic_change_notifications;
226 realm = Realm::make_shared_realm(std::move(config), shared_from_this());
227 if (!m_notifier && should_initialize_notifier) {
229 m_notifier = std::make_unique<ExternalCommitHelper>(*this);
231 catch (std::system_error const& ex) {
232 throw RealmFileException(RealmFileException::Kind::AccessError, get_path(), ex.code().message(), "");
235 m_weak_realm_notifiers.emplace_back(realm, m_config.cache);
238 if (realm->config().sync_config)
239 create_sync_session();
243 realm->update_schema(std::move(*schema), config.schema_version, std::move(migration_function),
244 std::move(initialization_function));
250 std::shared_ptr<Realm> RealmCoordinator::get_realm()
252 return get_realm(m_config);
255 bool RealmCoordinator::get_cached_schema(Schema& schema, uint64_t& schema_version,
256 uint64_t& transaction) const noexcept
258 std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
259 if (!m_cached_schema)
261 schema = *m_cached_schema;
262 schema_version = m_schema_version;
263 transaction = m_schema_transaction_version_max;
267 void RealmCoordinator::cache_schema(Schema const& new_schema, uint64_t new_schema_version,
268 uint64_t transaction_version)
270 std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
271 if (transaction_version < m_schema_transaction_version_max)
273 if (new_schema.empty() || new_schema_version == ObjectStore::NotVersioned)
276 m_cached_schema = new_schema;
277 m_schema_version = new_schema_version;
278 m_schema_transaction_version_min = transaction_version;
279 m_schema_transaction_version_max = transaction_version;
282 void RealmCoordinator::clear_schema_cache_and_set_schema_version(uint64_t new_schema_version)
284 std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
285 m_cached_schema = util::none;
286 m_schema_version = new_schema_version;
289 void RealmCoordinator::advance_schema_cache(uint64_t previous, uint64_t next)
291 std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
292 if (!m_cached_schema)
294 REALM_ASSERT(previous <= m_schema_transaction_version_max);
295 if (next < m_schema_transaction_version_min)
297 m_schema_transaction_version_min = std::min(previous, m_schema_transaction_version_min);
298 m_schema_transaction_version_max = std::max(next, m_schema_transaction_version_max);
301 RealmCoordinator::RealmCoordinator() = default;
303 RealmCoordinator::~RealmCoordinator()
305 std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
306 for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end(); ) {
307 if (it->second.expired()) {
308 it = s_coordinators_per_path.erase(it);
316 void RealmCoordinator::unregister_realm(Realm* realm)
318 std::lock_guard<std::mutex> lock(m_realm_mutex);
319 auto new_end = remove_if(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers),
320 [=](auto& notifier) { return notifier.expired() || notifier.is_for_realm(realm); });
321 m_weak_realm_notifiers.erase(new_end, end(m_weak_realm_notifiers));
324 void RealmCoordinator::clear_cache()
326 std::vector<WeakRealm> realms_to_close;
328 std::lock_guard<std::mutex> lock(s_coordinator_mutex);
330 for (auto& weak_coordinator : s_coordinators_per_path) {
331 auto coordinator = weak_coordinator.second.lock();
336 coordinator->m_notifier = nullptr;
338 // Gather a list of all of the realms which will be removed
339 for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
340 if (auto realm = weak_realm_notifier.realm()) {
341 realms_to_close.push_back(realm);
346 s_coordinators_per_path.clear();
349 // Close all of the previously cached Realms. This can't be done while
350 // s_coordinator_mutex is held as it may try to re-lock it.
351 for (auto& weak_realm : realms_to_close) {
352 if (auto realm = weak_realm.lock()) {
358 void RealmCoordinator::clear_all_caches()
360 std::vector<std::weak_ptr<RealmCoordinator>> to_clear;
362 std::lock_guard<std::mutex> lock(s_coordinator_mutex);
363 for (auto iter : s_coordinators_per_path) {
364 to_clear.push_back(iter.second);
367 for (auto weak_coordinator : to_clear) {
368 if (auto coordinator = weak_coordinator.lock()) {
369 coordinator->clear_cache();
374 void RealmCoordinator::assert_no_open_realms() noexcept
377 std::lock_guard<std::mutex> lock(s_coordinator_mutex);
378 REALM_ASSERT(s_coordinators_per_path.empty());
382 void RealmCoordinator::wake_up_notifier_worker()
385 // FIXME: this wakes up the notification workers for all processes and
386 // not just us. This might be worth optimizing in the future.
387 m_notifier->notify_others();
391 void RealmCoordinator::commit_write(Realm& realm)
393 REALM_ASSERT(!m_config.immutable());
394 REALM_ASSERT(realm.is_in_transaction());
397 // Need to acquire this lock before committing or another process could
398 // perform a write and notify us before we get the chance to set the
400 std::lock_guard<std::mutex> l(m_notifier_mutex);
402 transaction::commit(*Realm::Internal::get_shared_group(realm));
404 // Don't need to check m_new_notifiers because those don't skip versions
405 bool have_notifiers = std::any_of(m_notifiers.begin(), m_notifiers.end(),
406 [&](auto&& notifier) { return notifier->is_for_realm(realm); });
407 if (have_notifiers) {
408 m_notifier_skip_version = Realm::Internal::get_shared_group(realm)->get_version_of_current_transaction();
412 #if REALM_ENABLE_SYNC
413 // Realm could be closed in did_change. So send sync notification first before did_change.
414 if (m_sync_session) {
415 auto& sg = Realm::Internal::get_shared_group(realm);
416 auto version = LangBindHelper::get_version_of_latest_snapshot(*sg);
417 SyncSession::Internal::nonsync_transact_notify(*m_sync_session, version);
420 if (realm.m_binding_context) {
421 realm.m_binding_context->did_change({}, {});
425 m_notifier->notify_others();
429 void RealmCoordinator::pin_version(VersionID versionid)
431 REALM_ASSERT_DEBUG(!m_notifier_mutex.try_lock());
436 if (!m_advancer_sg) {
438 std::unique_ptr<Group> read_only_group;
439 Realm::open_with_config(m_config, m_advancer_history, m_advancer_sg, read_only_group, nullptr);
440 REALM_ASSERT(!read_only_group);
441 m_advancer_sg->begin_read(versionid);
444 m_async_error = std::current_exception();
445 m_advancer_sg = nullptr;
446 m_advancer_history = nullptr;
449 else if (m_new_notifiers.empty()) {
450 // If this is the first notifier then we don't already have a read transaction
451 REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Ready);
452 m_advancer_sg->begin_read(versionid);
455 REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
456 if (versionid < m_advancer_sg->get_version_of_current_transaction()) {
457 // Ensure we're holding a readlock on the oldest version we have a
458 // handover object for, as handover objects don't
459 m_advancer_sg->end_read();
460 m_advancer_sg->begin_read(versionid);
465 void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> notifier)
467 auto version = notifier->version();
468 auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
470 std::lock_guard<std::mutex> lock(self.m_notifier_mutex);
471 self.pin_version(version);
472 self.m_new_notifiers.push_back(std::move(notifier));
476 void RealmCoordinator::clean_up_dead_notifiers()
478 auto swap_remove = [&](auto& container) {
479 bool did_remove = false;
480 for (size_t i = 0; i < container.size(); ++i) {
481 if (container[i]->is_alive())
484 // Ensure the notifier is destroyed here even if there's lingering refs
485 // to the async notifier elsewhere
486 container[i]->release_data();
488 if (container.size() > i + 1)
489 container[i] = std::move(container.back());
490 container.pop_back();
497 if (swap_remove(m_notifiers)) {
498 // Make sure we aren't holding on to read versions needlessly if there
499 // are no notifiers left, but don't close them entirely as opening shared
500 // groups is expensive
501 if (m_notifiers.empty() && m_notifier_sg) {
502 REALM_ASSERT_3(m_notifier_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
503 m_notifier_sg->end_read();
504 m_notifier_skip_version = {0, 0};
507 if (swap_remove(m_new_notifiers) && m_advancer_sg) {
508 REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
509 if (m_new_notifiers.empty()) {
510 m_advancer_sg->end_read();
515 void RealmCoordinator::on_change()
517 run_async_notifiers();
519 std::lock_guard<std::mutex> lock(m_realm_mutex);
520 for (auto& realm : m_weak_realm_notifiers) {
526 class IncrementalChangeInfo {
528 IncrementalChangeInfo(SharedGroup& sg,
529 std::vector<std::shared_ptr<_impl::CollectionNotifier>>& notifiers)
532 if (notifiers.empty())
535 auto cmp = [&](auto&& lft, auto&& rgt) {
536 return lft->version() < rgt->version();
539 // Sort the notifiers by their source version so that we can pull them
540 // all forward to the latest version in a single pass over the transaction log
541 std::sort(notifiers.begin(), notifiers.end(), cmp);
543 // Preallocate the required amount of space in the vector so that we can
544 // safely give out pointers to within the vector
546 for (auto it = notifiers.begin(), next = it + 1; next != notifiers.end(); ++it, ++next) {
550 m_info.reserve(count);
552 m_current = &m_info[0];
555 TransactionChangeInfo& current() const { return *m_current; }
557 bool advance_incremental(VersionID version)
559 if (version != m_sg.get_version_of_current_transaction()) {
560 transaction::advance(m_sg, *m_current, version);
562 m_current->table_modifications_needed,
563 m_current->table_moves_needed,
564 std::move(m_current->lists)});
565 m_current = &m_info.back();
571 void advance_to_final(VersionID version)
574 transaction::advance(m_sg, nullptr, version);
578 transaction::advance(m_sg, *m_current, version);
580 // We now need to combine the transaction change info objects so that all of
581 // the notifiers see the complete set of changes from their first version to
582 // the most recent one
583 for (size_t i = m_info.size() - 1; i > 0; --i) {
584 auto& cur = m_info[i];
585 if (cur.tables.empty())
587 auto& prev = m_info[i - 1];
588 if (prev.tables.empty()) {
589 prev.tables = cur.tables;
593 for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) {
594 prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]});
596 prev.tables.reserve(cur.tables.size());
597 while (prev.tables.size() < cur.tables.size()) {
598 prev.tables.push_back(cur.tables[prev.tables.size()]);
602 // Copy the list change info if there are multiple LinkViews for the same LinkList
603 auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); };
604 for (size_t i = 1; i < m_current->lists.size(); ++i) {
605 for (size_t j = i; j > 0; --j) {
606 if (id(m_current->lists[i]) == id(m_current->lists[j - 1])) {
607 m_current->lists[j - 1].changes->merge(CollectionChangeBuilder{*m_current->lists[i].changes});
614 std::vector<TransactionChangeInfo> m_info;
615 TransactionChangeInfo* m_current = nullptr;
618 } // anonymous namespace
620 void RealmCoordinator::run_async_notifiers()
622 std::unique_lock<std::mutex> lock(m_notifier_mutex);
624 clean_up_dead_notifiers();
626 if (m_notifiers.empty() && m_new_notifiers.empty()) {
630 if (!m_async_error) {
631 open_helper_shared_group();
635 std::move(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(m_notifiers));
636 m_new_notifiers.clear();
642 // Advance all of the new notifiers to the most recent version, if any
643 auto new_notifiers = std::move(m_new_notifiers);
644 IncrementalChangeInfo new_notifier_change_info(*m_advancer_sg, new_notifiers);
646 if (!new_notifiers.empty()) {
647 REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
648 REALM_ASSERT_3(m_advancer_sg->get_version_of_current_transaction().version,
649 <=, new_notifiers.front()->version().version);
651 // The advancer SG can be at an older version than the oldest new notifier
652 // if a notifier was added and then removed before it ever got the chance
653 // to run, as we don't move the pin forward when removing dead notifiers
654 transaction::advance(*m_advancer_sg, nullptr, new_notifiers.front()->version());
656 // Advance each of the new notifiers to the latest version, attaching them
657 // to the SG at their handover version. This requires a unique
658 // TransactionChangeInfo for each source version, so that things don't
659 // see changes from before the version they were handed over from.
660 // Each Info has all of the changes between that source version and the
661 // next source version, and they'll be merged together later after
662 // releasing the lock
663 for (auto& notifier : new_notifiers) {
664 new_notifier_change_info.advance_incremental(notifier->version());
665 notifier->attach_to(*m_advancer_sg);
666 notifier->add_required_change_info(new_notifier_change_info.current());
668 new_notifier_change_info.advance_to_final(VersionID{});
670 for (auto& notifier : new_notifiers) {
674 // We want to advance the non-new notifiers to the same version as the
675 // new notifiers to avoid having to merge changes from any new
676 // transaction that happen immediately after this into the new notifier
678 version = m_advancer_sg->get_version_of_current_transaction();
679 m_advancer_sg->end_read();
682 // If we have no new notifiers we want to just advance to the latest
683 // version, but we have to pick a "latest" version while holding the
684 // notifier lock to avoid advancing over a transaction which should be
686 m_advancer_sg->begin_read();
687 version = m_advancer_sg->get_version_of_current_transaction();
688 m_advancer_sg->end_read();
690 REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Ready);
692 auto skip_version = m_notifier_skip_version;
693 m_notifier_skip_version = {0, 0};
695 // Make a copy of the notifiers vector and then release the lock to avoid
696 // blocking other threads trying to register or unregister notifiers while we run them
697 auto notifiers = m_notifiers;
698 m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());
701 if (skip_version.version) {
702 REALM_ASSERT(!notifiers.empty());
703 REALM_ASSERT(version >= skip_version);
704 IncrementalChangeInfo change_info(*m_notifier_sg, notifiers);
705 for (auto& notifier : notifiers)
706 notifier->add_required_change_info(change_info.current());
707 change_info.advance_to_final(skip_version);
709 for (auto& notifier : notifiers)
713 for (auto& notifier : notifiers)
714 notifier->prepare_handover();
718 // Advance the non-new notifiers to the same version as we advanced the new
719 // ones to (or the latest if there were no new ones)
720 IncrementalChangeInfo change_info(*m_notifier_sg, notifiers);
721 for (auto& notifier : notifiers) {
722 notifier->add_required_change_info(change_info.current());
724 change_info.advance_to_final(version);
726 // Attach the new notifiers to the main SG and move them to the main list
727 for (auto& notifier : new_notifiers) {
728 notifier->attach_to(*m_notifier_sg);
732 // Change info is now all ready, so the notifiers can now perform their
734 for (auto& notifier : notifiers) {
738 // Reacquire the lock while updating the fields that are actually read on
741 for (auto& notifier : new_notifiers) {
742 notifier->prepare_handover();
744 for (auto& notifier : notifiers) {
745 notifier->prepare_handover();
747 clean_up_dead_notifiers();
748 m_notifier_cv.notify_all();
751 void RealmCoordinator::open_helper_shared_group()
753 if (!m_notifier_sg) {
755 std::unique_ptr<Group> read_only_group;
756 Realm::open_with_config(m_config, m_notifier_history, m_notifier_sg, read_only_group, nullptr);
757 REALM_ASSERT(!read_only_group);
758 m_notifier_sg->begin_read();
761 // Store the error to be passed to the async notifiers
762 m_async_error = std::current_exception();
763 m_notifier_sg = nullptr;
764 m_notifier_history = nullptr;
767 else if (m_notifiers.empty()) {
768 m_notifier_sg->begin_read();
772 void RealmCoordinator::advance_to_ready(Realm& realm)
774 std::unique_lock<std::mutex> lock(m_notifier_mutex);
775 _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
777 notifiers.package_and_wait(util::none);
779 auto& sg = Realm::Internal::get_shared_group(realm);
781 auto version = notifiers.version();
783 auto current_version = sg->get_version_of_current_transaction();
784 // Notifications are out of date, so just discard
785 // This should only happen if begin_read() was used to change the
786 // read version outside of our control
787 if (*version < current_version)
789 // While there is a newer version, notifications are for the current
790 // version so just deliver them without advancing
791 if (*version == current_version) {
792 notifiers.deliver(*sg);
793 notifiers.after_advance();
799 transaction::advance(sg, realm.m_binding_context.get(), notifiers);
802 std::vector<std::shared_ptr<_impl::CollectionNotifier>> RealmCoordinator::notifiers_for_realm(Realm& realm)
804 std::vector<std::shared_ptr<_impl::CollectionNotifier>> ret;
805 for (auto& notifier : m_new_notifiers) {
806 if (notifier->is_for_realm(realm))
807 ret.push_back(notifier);
809 for (auto& notifier : m_notifiers) {
810 if (notifier->is_for_realm(realm))
811 ret.push_back(notifier);
816 bool RealmCoordinator::advance_to_latest(Realm& realm)
818 using sgf = SharedGroupFriend;
820 auto& sg = Realm::Internal::get_shared_group(realm);
821 std::unique_lock<std::mutex> lock(m_notifier_mutex);
822 _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
824 notifiers.package_and_wait(sgf::get_version_of_latest_snapshot(*sg));
826 auto version = sg->get_version_of_current_transaction();
827 transaction::advance(sg, realm.m_binding_context.get(), notifiers);
829 // Realm could be closed in the callbacks.
830 if (realm.is_closed())
833 return version != sg->get_version_of_current_transaction();
836 void RealmCoordinator::promote_to_write(Realm& realm)
838 REALM_ASSERT(!realm.is_in_transaction());
840 std::unique_lock<std::mutex> lock(m_notifier_mutex);
841 _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
844 auto& sg = Realm::Internal::get_shared_group(realm);
845 transaction::begin(sg, realm.m_binding_context.get(), notifiers);
848 void RealmCoordinator::process_available_async(Realm& realm)
850 REALM_ASSERT(!realm.is_in_transaction());
852 std::unique_lock<std::mutex> lock(m_notifier_mutex);
853 auto notifiers = notifiers_for_realm(realm);
854 if (notifiers.empty())
857 if (auto error = m_async_error) {
859 for (auto& notifier : notifiers)
860 notifier->deliver_error(m_async_error);
864 bool in_read = realm.is_in_read_transaction();
865 auto& sg = Realm::Internal::get_shared_group(realm);
866 auto version = sg->get_version_of_current_transaction();
867 auto package = [&](auto& notifier) {
868 return !(notifier->has_run() && (!in_read || notifier->version() == version) && notifier->package_for_delivery());
870 notifiers.erase(std::remove_if(begin(notifiers), end(notifiers), package), end(notifiers));
873 // no before advance because the Realm is already at the given version,
874 // because we're either sending initial notifications or the write was
875 // done on this Realm instance
877 // Skip delivering if the Realm isn't in a read transaction
879 for (auto& notifier : notifiers)
880 notifier->deliver(*sg);
883 // but still call the change callbacks
884 for (auto& notifier : notifiers)
885 notifier->after_advance();
888 void RealmCoordinator::set_transaction_callback(std::function<void(VersionID, VersionID)> fn)
890 create_sync_session();
891 m_transaction_callback = std::move(fn);