added iOS source code
[wl-app.git] / iOS / Pods / Realm / Realm / ObjectStore / src / impl / realm_coordinator.cpp
diff --git a/iOS/Pods/Realm/Realm/ObjectStore/src/impl/realm_coordinator.cpp b/iOS/Pods/Realm/Realm/ObjectStore/src/impl/realm_coordinator.cpp
new file mode 100644 (file)
index 0000000..7135a67
--- /dev/null
@@ -0,0 +1,892 @@
+////////////////////////////////////////////////////////////////////////////
+//
+// Copyright 2015 Realm Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////
+
+#include "impl/realm_coordinator.hpp"
+
+#include "impl/collection_notifier.hpp"
+#include "impl/external_commit_helper.hpp"
+#include "impl/transact_log_handler.hpp"
+#include "impl/weak_realm_notifier.hpp"
+#include "binding_context.hpp"
+#include "object_schema.hpp"
+#include "object_store.hpp"
+#include "schema.hpp"
+
+#if REALM_ENABLE_SYNC
+#include "sync/sync_config.hpp"
+#include "sync/sync_manager.hpp"
+#include "sync/sync_session.hpp"
+#endif
+
+#include <realm/group_shared.hpp>
+#include <realm/lang_bind_helper.hpp>
+#include <realm/string_data.hpp>
+
+#include <algorithm>
+#include <unordered_map>
+
+using namespace realm;
+using namespace realm::_impl;
+
+static auto& s_coordinator_mutex = *new std::mutex;
+static auto& s_coordinators_per_path = *new std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>>;
+
+std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
+{
+    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
+
+    auto& weak_coordinator = s_coordinators_per_path[path];
+    if (auto coordinator = weak_coordinator.lock()) {
+        return coordinator;
+    }
+
+    auto coordinator = std::make_shared<RealmCoordinator>();
+    weak_coordinator = coordinator;
+    return coordinator;
+}
+
+std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(const Realm::Config& config)
+{
+    auto coordinator = get_coordinator(config.path);
+    std::lock_guard<std::mutex> lock(coordinator->m_realm_mutex);
+    coordinator->set_config(config);
+    return coordinator;
+}
+
+std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
+{
+    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
+    auto it = s_coordinators_per_path.find(path);
+    return it == s_coordinators_per_path.end() ? nullptr : it->second.lock();
+}
+
+void RealmCoordinator::create_sync_session()
+{
+#if REALM_ENABLE_SYNC
+    if (m_sync_session)
+        return;
+
+    if (!m_config.encryption_key.empty() && !m_config.sync_config->realm_encryption_key) {
+        throw std::logic_error("A realm encryption key was specified in Realm::Config but not in SyncConfig");
+    } else if (m_config.sync_config->realm_encryption_key && m_config.encryption_key.empty()) {
+        throw std::logic_error("A realm encryption key was specified in SyncConfig but not in Realm::Config");
+    } else if (m_config.sync_config->realm_encryption_key &&
+               !std::equal(m_config.sync_config->realm_encryption_key->begin(), m_config.sync_config->realm_encryption_key->end(),
+                           m_config.encryption_key.begin(), m_config.encryption_key.end())) {
+        throw std::logic_error("The realm encryption key specified in SyncConfig does not match the one in Realm::Config");
+    }
+
+    auto sync_config = *m_config.sync_config;
+    sync_config.validate_sync_history = false;
+    m_sync_session = SyncManager::shared().get_session(m_config.path, sync_config);
+
+    std::weak_ptr<RealmCoordinator> weak_self = shared_from_this();
+    SyncSession::Internal::set_sync_transact_callback(*m_sync_session,
+                                                      [weak_self](VersionID old_version, VersionID new_version) {
+        if (auto self = weak_self.lock()) {
+            if (self->m_transaction_callback)
+                self->m_transaction_callback(old_version, new_version);
+            if (self->m_notifier)
+                self->m_notifier->notify_others();
+        }
+    });
+#endif
+}
+
+void RealmCoordinator::set_config(const Realm::Config& config)
+{
+    if (config.encryption_key.data() && config.encryption_key.size() != 64)
+        throw InvalidEncryptionKeyException();
+    if (config.schema_mode == SchemaMode::Immutable && config.sync_config)
+        throw std::logic_error("Synchronized Realms cannot be opened in immutable mode");
+    if (config.schema_mode == SchemaMode::Additive && config.migration_function)
+        throw std::logic_error("Realms opened in Additive-only schema mode do not use a migration function");
+    if (config.schema_mode == SchemaMode::Immutable && config.migration_function)
+        throw std::logic_error("Realms opened in immutable mode do not use a migration function");
+    if (config.schema_mode == SchemaMode::ReadOnlyAlternative && config.migration_function)
+        throw std::logic_error("Realms opened in read-only mode do not use a migration function");
+    if (config.schema_mode == SchemaMode::Immutable && config.initialization_function)
+        throw std::logic_error("Realms opened in immutable mode do not use an initialization function");
+    if (config.schema_mode == SchemaMode::ReadOnlyAlternative && config.initialization_function)
+        throw std::logic_error("Realms opened in read-only mode do not use an initialization function");
+    if (config.schema && config.schema_version == ObjectStore::NotVersioned)
+        throw std::logic_error("A schema version must be specified when the schema is specified");
+    if (!config.realm_data.is_null() && (!config.immutable() || !config.in_memory))
+        throw std::logic_error("In-memory realms initialized from memory buffers can only be opened in read-only mode");
+    if (!config.realm_data.is_null() && !config.path.empty())
+        throw std::logic_error("Specifying both memory buffer and path is invalid");
+    if (!config.realm_data.is_null() && !config.encryption_key.empty())
+        throw std::logic_error("Memory buffers do not support encryption");
+    // ResetFile also won't use the migration function, but specifying one is
+    // allowed to simplify temporarily switching modes during development
+
+    bool no_existing_realm = std::all_of(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers),
+                                         [](auto& notifier) { return notifier.expired(); });
+    if (no_existing_realm) {
+        m_config = config;
+    }
+    else {
+        if (m_config.immutable() != config.immutable()) {
+            throw MismatchedConfigException("Realm at path '%1' already opened with different read permissions.", config.path);
+        }
+        if (m_config.in_memory != config.in_memory) {
+            throw MismatchedConfigException("Realm at path '%1' already opened with different inMemory settings.", config.path);
+        }
+        if (m_config.encryption_key != config.encryption_key) {
+            throw MismatchedConfigException("Realm at path '%1' already opened with a different encryption key.", config.path);
+        }
+        if (m_config.schema_mode != config.schema_mode) {
+            throw MismatchedConfigException("Realm at path '%1' already opened with a different schema mode.", config.path);
+        }
+        if (config.schema && m_schema_version != ObjectStore::NotVersioned && m_schema_version != config.schema_version) {
+            throw MismatchedConfigException("Realm at path '%1' already opened with different schema version.", config.path);
+        }
+
+#if REALM_ENABLE_SYNC
+        if (bool(m_config.sync_config) != bool(config.sync_config)) {
+            throw MismatchedConfigException("Realm at path '%1' already opened with different sync configurations.", config.path);
+        }
+
+        if (config.sync_config) {
+            if (m_config.sync_config->user != config.sync_config->user) {
+                throw MismatchedConfigException("Realm at path '%1' already opened with different sync user.", config.path);
+            }
+            if (m_config.sync_config->realm_url() != config.sync_config->realm_url()) {
+                throw MismatchedConfigException("Realm at path '%1' already opened with different sync server URL.", config.path);
+            }
+            if (m_config.sync_config->transformer != config.sync_config->transformer) {
+                throw MismatchedConfigException("Realm at path '%1' already opened with different transformer.", config.path);
+            }
+            if (m_config.sync_config->realm_encryption_key != config.sync_config->realm_encryption_key) {
+                throw MismatchedConfigException("Realm at path '%1' already opened with sync session encryption key.", config.path);
+            }
+        }
+#endif
+
+        // Realm::update_schema() handles complaining about schema mismatches
+    }
+}
+
+std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
+{
+    // realm must be declared before lock so that the mutex is released before
+    // we release the strong reference to realm, as Realm's destructor may want
+    // to acquire the same lock
+    std::shared_ptr<Realm> realm;
+    std::unique_lock<std::mutex> lock(m_realm_mutex);
+
+    set_config(config);
+
+    auto schema = std::move(config.schema);
+    auto migration_function = std::move(config.migration_function);
+    auto initialization_function = std::move(config.initialization_function);
+    config.schema = {};
+
+    if (config.cache) {
+        AnyExecutionContextID execution_context(config.execution_context);
+        for (auto& cached_realm : m_weak_realm_notifiers) {
+            if (!cached_realm.is_cached_for_execution_context(execution_context))
+                continue;
+            // can be null if we jumped in between ref count hitting zero and
+            // unregister_realm() getting the lock
+            if ((realm = cached_realm.realm())) {
+                // If the file is uninitialized and was opened without a schema,
+                // do the normal schema init
+                if (realm->schema_version() == ObjectStore::NotVersioned)
+                    break;
+
+                // Otherwise if we have a realm schema it needs to be an exact
+                // match (even having the same properties but in different
+                // orders isn't good enough)
+                if (schema && realm->schema() != *schema)
+                    throw MismatchedConfigException("Realm at path '%1' already opened on current thread with different schema.", config.path);
+
+                return realm;
+            }
+        }
+    }
+
+    if (!realm) {
+        bool should_initialize_notifier = !config.immutable() && config.automatic_change_notifications;
+        realm = Realm::make_shared_realm(std::move(config), shared_from_this());
+        if (!m_notifier && should_initialize_notifier) {
+            try {
+                m_notifier = std::make_unique<ExternalCommitHelper>(*this);
+            }
+            catch (std::system_error const& ex) {
+                throw RealmFileException(RealmFileException::Kind::AccessError, get_path(), ex.code().message(), "");
+            }
+        }
+        m_weak_realm_notifiers.emplace_back(realm, m_config.cache);
+    }
+
+    if (realm->config().sync_config)
+        create_sync_session();
+
+    if (schema) {
+        lock.unlock();
+        realm->update_schema(std::move(*schema), config.schema_version, std::move(migration_function),
+                             std::move(initialization_function));
+    }
+
+    return realm;
+}
+
+std::shared_ptr<Realm> RealmCoordinator::get_realm()
+{
+    return get_realm(m_config);
+}
+
+bool RealmCoordinator::get_cached_schema(Schema& schema, uint64_t& schema_version,
+                                         uint64_t& transaction) const noexcept
+{
+    std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
+    if (!m_cached_schema)
+        return false;
+    schema = *m_cached_schema;
+    schema_version = m_schema_version;
+    transaction = m_schema_transaction_version_max;
+    return true;
+}
+
+void RealmCoordinator::cache_schema(Schema const& new_schema, uint64_t new_schema_version,
+                                    uint64_t transaction_version)
+{
+    std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
+    if (transaction_version < m_schema_transaction_version_max)
+        return;
+    if (new_schema.empty() || new_schema_version == ObjectStore::NotVersioned)
+        return;
+
+    m_cached_schema = new_schema;
+    m_schema_version = new_schema_version;
+    m_schema_transaction_version_min = transaction_version;
+    m_schema_transaction_version_max = transaction_version;
+}
+
+void RealmCoordinator::clear_schema_cache_and_set_schema_version(uint64_t new_schema_version)
+{
+    std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
+    m_cached_schema = util::none;
+    m_schema_version = new_schema_version;
+}
+
+void RealmCoordinator::advance_schema_cache(uint64_t previous, uint64_t next)
+{
+    std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
+    if (!m_cached_schema)
+        return;
+    REALM_ASSERT(previous <= m_schema_transaction_version_max);
+    if (next < m_schema_transaction_version_min)
+        return;
+    m_schema_transaction_version_min = std::min(previous, m_schema_transaction_version_min);
+    m_schema_transaction_version_max = std::max(next, m_schema_transaction_version_max);
+}
+
+RealmCoordinator::RealmCoordinator() = default;
+
+RealmCoordinator::~RealmCoordinator()
+{
+    std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
+    for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end(); ) {
+        if (it->second.expired()) {
+            it = s_coordinators_per_path.erase(it);
+        }
+        else {
+            ++it;
+        }
+    }
+}
+
+void RealmCoordinator::unregister_realm(Realm* realm)
+{
+    std::lock_guard<std::mutex> lock(m_realm_mutex);
+    auto new_end = remove_if(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers),
+                             [=](auto& notifier) { return notifier.expired() || notifier.is_for_realm(realm); });
+    m_weak_realm_notifiers.erase(new_end, end(m_weak_realm_notifiers));
+}
+
+void RealmCoordinator::clear_cache()
+{
+    std::vector<WeakRealm> realms_to_close;
+    {
+        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
+
+        for (auto& weak_coordinator : s_coordinators_per_path) {
+            auto coordinator = weak_coordinator.second.lock();
+            if (!coordinator) {
+                continue;
+            }
+
+            coordinator->m_notifier = nullptr;
+
+            // Gather a list of all of the realms which will be removed
+            for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
+                if (auto realm = weak_realm_notifier.realm()) {
+                    realms_to_close.push_back(realm);
+                }
+            }
+        }
+
+        s_coordinators_per_path.clear();
+    }
+
+    // Close all of the previously cached Realms. This can't be done while
+    // s_coordinator_mutex is held as it may try to re-lock it.
+    for (auto& weak_realm : realms_to_close) {
+        if (auto realm = weak_realm.lock()) {
+            realm->close();
+        }
+    }
+}
+
+void RealmCoordinator::clear_all_caches()
+{
+    std::vector<std::weak_ptr<RealmCoordinator>> to_clear;
+    {
+        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
+        for (auto iter : s_coordinators_per_path) {
+            to_clear.push_back(iter.second);
+        }
+    }
+    for (auto weak_coordinator : to_clear) {
+        if (auto coordinator = weak_coordinator.lock()) {
+            coordinator->clear_cache();
+        }
+    }
+}
+
+void RealmCoordinator::assert_no_open_realms() noexcept
+{
+#ifdef REALM_DEBUG
+    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
+    REALM_ASSERT(s_coordinators_per_path.empty());
+#endif
+}
+
+void RealmCoordinator::wake_up_notifier_worker()
+{
+    if (m_notifier) {
+        // FIXME: this wakes up the notification workers for all processes and
+        // not just us. This might be worth optimizing in the future.
+        m_notifier->notify_others();
+    }
+}
+
+void RealmCoordinator::commit_write(Realm& realm)
+{
+    REALM_ASSERT(!m_config.immutable());
+    REALM_ASSERT(realm.is_in_transaction());
+
+    {
+        // Need to acquire this lock before committing or another process could
+        // perform a write and notify us before we get the chance to set the
+        // skip version
+        std::lock_guard<std::mutex> l(m_notifier_mutex);
+
+        transaction::commit(*Realm::Internal::get_shared_group(realm));
+
+        // Don't need to check m_new_notifiers because those don't skip versions
+        bool have_notifiers = std::any_of(m_notifiers.begin(), m_notifiers.end(),
+                                          [&](auto&& notifier) { return notifier->is_for_realm(realm); });
+        if (have_notifiers) {
+            m_notifier_skip_version = Realm::Internal::get_shared_group(realm)->get_version_of_current_transaction();
+        }
+    }
+
+#if REALM_ENABLE_SYNC
+    // Realm could be closed in did_change. So send sync notification first before did_change.
+    if (m_sync_session) {
+        auto& sg = Realm::Internal::get_shared_group(realm);
+        auto version = LangBindHelper::get_version_of_latest_snapshot(*sg);
+        SyncSession::Internal::nonsync_transact_notify(*m_sync_session, version);
+    }
+#endif
+    if (realm.m_binding_context) {
+        realm.m_binding_context->did_change({}, {});
+    }
+
+    if (m_notifier) {
+        m_notifier->notify_others();
+    }
+}
+
+void RealmCoordinator::pin_version(VersionID versionid)
+{
+    REALM_ASSERT_DEBUG(!m_notifier_mutex.try_lock());
+    if (m_async_error) {
+        return;
+    }
+
+    if (!m_advancer_sg) {
+        try {
+            std::unique_ptr<Group> read_only_group;
+            Realm::open_with_config(m_config, m_advancer_history, m_advancer_sg, read_only_group, nullptr);
+            REALM_ASSERT(!read_only_group);
+            m_advancer_sg->begin_read(versionid);
+        }
+        catch (...) {
+            m_async_error = std::current_exception();
+            m_advancer_sg = nullptr;
+            m_advancer_history = nullptr;
+        }
+    }
+    else if (m_new_notifiers.empty()) {
+        // If this is the first notifier then we don't already have a read transaction
+        REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Ready);
+        m_advancer_sg->begin_read(versionid);
+    }
+    else {
+        REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
+        if (versionid < m_advancer_sg->get_version_of_current_transaction()) {
+            // Ensure we're holding a readlock on the oldest version we have a
+            // handover object for, as handover objects don't
+            m_advancer_sg->end_read();
+            m_advancer_sg->begin_read(versionid);
+        }
+    }
+}
+
+void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> notifier)
+{
+    auto version = notifier->version();
+    auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
+    {
+        std::lock_guard<std::mutex> lock(self.m_notifier_mutex);
+        self.pin_version(version);
+        self.m_new_notifiers.push_back(std::move(notifier));
+    }
+}
+
+void RealmCoordinator::clean_up_dead_notifiers()
+{
+    auto swap_remove = [&](auto& container) {
+        bool did_remove = false;
+        for (size_t i = 0; i < container.size(); ++i) {
+            if (container[i]->is_alive())
+                continue;
+
+            // Ensure the notifier is destroyed here even if there's lingering refs
+            // to the async notifier elsewhere
+            container[i]->release_data();
+
+            if (container.size() > i + 1)
+                container[i] = std::move(container.back());
+            container.pop_back();
+            --i;
+            did_remove = true;
+        }
+        return did_remove;
+    };
+
+    if (swap_remove(m_notifiers)) {
+        // Make sure we aren't holding on to read versions needlessly if there
+        // are no notifiers left, but don't close them entirely as opening shared
+        // groups is expensive
+        if (m_notifiers.empty() && m_notifier_sg) {
+            REALM_ASSERT_3(m_notifier_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
+            m_notifier_sg->end_read();
+            m_notifier_skip_version = {0, 0};
+        }
+    }
+    if (swap_remove(m_new_notifiers) && m_advancer_sg) {
+        REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
+        if (m_new_notifiers.empty()) {
+            m_advancer_sg->end_read();
+        }
+    }
+}
+
+void RealmCoordinator::on_change()
+{
+    run_async_notifiers();
+
+    std::lock_guard<std::mutex> lock(m_realm_mutex);
+    for (auto& realm : m_weak_realm_notifiers) {
+        realm.notify();
+    }
+}
+
+namespace {
+class IncrementalChangeInfo {
+public:
+    IncrementalChangeInfo(SharedGroup& sg,
+                          std::vector<std::shared_ptr<_impl::CollectionNotifier>>& notifiers)
+    : m_sg(sg)
+    {
+        if (notifiers.empty())
+            return;
+
+        auto cmp = [&](auto&& lft, auto&& rgt) {
+            return lft->version() < rgt->version();
+        };
+
+        // Sort the notifiers by their source version so that we can pull them
+        // all forward to the latest version in a single pass over the transaction log
+        std::sort(notifiers.begin(), notifiers.end(), cmp);
+
+        // Preallocate the required amount of space in the vector so that we can
+        // safely give out pointers to within the vector
+        size_t count = 1;
+        for (auto it = notifiers.begin(), next = it + 1; next != notifiers.end(); ++it, ++next) {
+            if (cmp(*it, *next))
+                ++count;
+        }
+        m_info.reserve(count);
+        m_info.resize(1);
+        m_current = &m_info[0];
+    }
+
+    TransactionChangeInfo& current() const { return *m_current; }
+
+    bool advance_incremental(VersionID version)
+    {
+        if (version != m_sg.get_version_of_current_transaction()) {
+            transaction::advance(m_sg, *m_current, version);
+            m_info.push_back({
+                m_current->table_modifications_needed,
+                m_current->table_moves_needed,
+                std::move(m_current->lists)});
+            m_current = &m_info.back();
+            return true;
+        }
+        return false;
+    }
+
+    void advance_to_final(VersionID version)
+    {
+        if (!m_current) {
+            transaction::advance(m_sg, nullptr, version);
+            return;
+        }
+
+        transaction::advance(m_sg, *m_current, version);
+
+        // We now need to combine the transaction change info objects so that all of
+        // the notifiers see the complete set of changes from their first version to
+        // the most recent one
+        for (size_t i = m_info.size() - 1; i > 0; --i) {
+            auto& cur = m_info[i];
+            if (cur.tables.empty())
+                continue;
+            auto& prev = m_info[i - 1];
+            if (prev.tables.empty()) {
+                prev.tables = cur.tables;
+                continue;
+            }
+
+            for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) {
+                prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]});
+            }
+            prev.tables.reserve(cur.tables.size());
+            while (prev.tables.size() < cur.tables.size()) {
+                prev.tables.push_back(cur.tables[prev.tables.size()]);
+            }
+        }
+
+        // Copy the list change info if there are multiple LinkViews for the same LinkList
+        auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); };
+        for (size_t i = 1; i < m_current->lists.size(); ++i) {
+            for (size_t j = i; j > 0; --j) {
+                if (id(m_current->lists[i]) == id(m_current->lists[j - 1])) {
+                    m_current->lists[j - 1].changes->merge(CollectionChangeBuilder{*m_current->lists[i].changes});
+                }
+            }
+        }
+    }
+
+private:
+    std::vector<TransactionChangeInfo> m_info;
+    TransactionChangeInfo* m_current = nullptr;
+    SharedGroup& m_sg;
+};
+} // anonymous namespace
+
+void RealmCoordinator::run_async_notifiers()
+{
+    std::unique_lock<std::mutex> lock(m_notifier_mutex);
+
+    clean_up_dead_notifiers();
+
+    if (m_notifiers.empty() && m_new_notifiers.empty()) {
+        return;
+    }
+
+    if (!m_async_error) {
+        open_helper_shared_group();
+    }
+
+    if (m_async_error) {
+        std::move(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(m_notifiers));
+        m_new_notifiers.clear();
+        return;
+    }
+
+    VersionID version;
+
+    // Advance all of the new notifiers to the most recent version, if any
+    auto new_notifiers = std::move(m_new_notifiers);
+    IncrementalChangeInfo new_notifier_change_info(*m_advancer_sg, new_notifiers);
+
+    if (!new_notifiers.empty()) {
+        REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
+        REALM_ASSERT_3(m_advancer_sg->get_version_of_current_transaction().version,
+                       <=, new_notifiers.front()->version().version);
+
+        // The advancer SG can be at an older version than the oldest new notifier
+        // if a notifier was added and then removed before it ever got the chance
+        // to run, as we don't move the pin forward when removing dead notifiers
+        transaction::advance(*m_advancer_sg, nullptr, new_notifiers.front()->version());
+
+        // Advance each of the new notifiers to the latest version, attaching them
+        // to the SG at their handover version. This requires a unique
+        // TransactionChangeInfo for each source version, so that things don't
+        // see changes from before the version they were handed over from.
+        // Each Info has all of the changes between that source version and the
+        // next source version, and they'll be merged together later after
+        // releasing the lock
+        for (auto& notifier : new_notifiers) {
+            new_notifier_change_info.advance_incremental(notifier->version());
+            notifier->attach_to(*m_advancer_sg);
+            notifier->add_required_change_info(new_notifier_change_info.current());
+        }
+        new_notifier_change_info.advance_to_final(VersionID{});
+
+        for (auto& notifier : new_notifiers) {
+            notifier->detach();
+        }
+
+        // We want to advance the non-new notifiers to the same version as the
+        // new notifiers to avoid having to merge changes from any new
+        // transaction that happen immediately after this into the new notifier
+        // changes
+        version = m_advancer_sg->get_version_of_current_transaction();
+        m_advancer_sg->end_read();
+    }
+    else {
+        // If we have no new notifiers we want to just advance to the latest
+        // version, but we have to pick a "latest" version while holding the
+        // notifier lock to avoid advancing over a transaction which should be
+        // skipped
+        m_advancer_sg->begin_read();
+        version = m_advancer_sg->get_version_of_current_transaction();
+        m_advancer_sg->end_read();
+    }
+    REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Ready);
+
+    auto skip_version = m_notifier_skip_version;
+    m_notifier_skip_version = {0, 0};
+
+    // Make a copy of the notifiers vector and then release the lock to avoid
+    // blocking other threads trying to register or unregister notifiers while we run them
+    auto notifiers = m_notifiers;
+    m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());
+    lock.unlock();
+
+    if (skip_version.version) {
+        REALM_ASSERT(!notifiers.empty());
+        REALM_ASSERT(version >= skip_version);
+        IncrementalChangeInfo change_info(*m_notifier_sg, notifiers);
+        for (auto& notifier : notifiers)
+            notifier->add_required_change_info(change_info.current());
+        change_info.advance_to_final(skip_version);
+
+        for (auto& notifier : notifiers)
+            notifier->run();
+
+        lock.lock();
+        for (auto& notifier : notifiers)
+            notifier->prepare_handover();
+        lock.unlock();
+    }
+
+    // Advance the non-new notifiers to the same version as we advanced the new
+    // ones to (or the latest if there were no new ones)
+    IncrementalChangeInfo change_info(*m_notifier_sg, notifiers);
+    for (auto& notifier : notifiers) {
+        notifier->add_required_change_info(change_info.current());
+    }
+    change_info.advance_to_final(version);
+
+    // Attach the new notifiers to the main SG and move them to the main list
+    for (auto& notifier : new_notifiers) {
+        notifier->attach_to(*m_notifier_sg);
+        notifier->run();
+    }
+
+    // Change info is now all ready, so the notifiers can now perform their
+    // background work
+    for (auto& notifier : notifiers) {
+        notifier->run();
+    }
+
+    // Reacquire the lock while updating the fields that are actually read on
+    // other threads
+    lock.lock();
+    for (auto& notifier : new_notifiers) {
+        notifier->prepare_handover();
+    }
+    for (auto& notifier : notifiers) {
+        notifier->prepare_handover();
+    }
+    clean_up_dead_notifiers();
+    m_notifier_cv.notify_all();
+}
+
+void RealmCoordinator::open_helper_shared_group()
+{
+    if (!m_notifier_sg) {
+        try {
+            std::unique_ptr<Group> read_only_group;
+            Realm::open_with_config(m_config, m_notifier_history, m_notifier_sg, read_only_group, nullptr);
+            REALM_ASSERT(!read_only_group);
+            m_notifier_sg->begin_read();
+        }
+        catch (...) {
+            // Store the error to be passed to the async notifiers
+            m_async_error = std::current_exception();
+            m_notifier_sg = nullptr;
+            m_notifier_history = nullptr;
+        }
+    }
+    else if (m_notifiers.empty()) {
+        m_notifier_sg->begin_read();
+    }
+}
+
+void RealmCoordinator::advance_to_ready(Realm& realm)
+{
+    std::unique_lock<std::mutex> lock(m_notifier_mutex);
+    _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
+    lock.unlock();
+    notifiers.package_and_wait(util::none);
+
+    auto& sg = Realm::Internal::get_shared_group(realm);
+    if (notifiers) {
+        auto version = notifiers.version();
+        if (version) {
+            auto current_version = sg->get_version_of_current_transaction();
+            // Notifications are out of date, so just discard
+            // This should only happen if begin_read() was used to change the
+            // read version outside of our control
+            if (*version < current_version)
+                return;
+            // While there is a newer version, notifications are for the current
+            // version so just deliver them without advancing
+            if (*version == current_version) {
+                notifiers.deliver(*sg);
+                notifiers.after_advance();
+                return;
+            }
+        }
+    }
+
+    transaction::advance(sg, realm.m_binding_context.get(), notifiers);
+}
+
+std::vector<std::shared_ptr<_impl::CollectionNotifier>> RealmCoordinator::notifiers_for_realm(Realm& realm)
+{
+    std::vector<std::shared_ptr<_impl::CollectionNotifier>> ret;
+    for (auto& notifier : m_new_notifiers) {
+        if (notifier->is_for_realm(realm))
+            ret.push_back(notifier);
+    }
+    for (auto& notifier : m_notifiers) {
+        if (notifier->is_for_realm(realm))
+            ret.push_back(notifier);
+    }
+    return ret;
+}
+
+bool RealmCoordinator::advance_to_latest(Realm& realm)
+{
+    using sgf = SharedGroupFriend;
+
+    auto& sg = Realm::Internal::get_shared_group(realm);
+    std::unique_lock<std::mutex> lock(m_notifier_mutex);
+    _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
+    lock.unlock();
+    notifiers.package_and_wait(sgf::get_version_of_latest_snapshot(*sg));
+
+    auto version = sg->get_version_of_current_transaction();
+    transaction::advance(sg, realm.m_binding_context.get(), notifiers);
+
+    // Realm could be closed in the callbacks.
+    if (realm.is_closed())
+        return false;
+
+    return version != sg->get_version_of_current_transaction();
+}
+
+void RealmCoordinator::promote_to_write(Realm& realm)
+{
+    REALM_ASSERT(!realm.is_in_transaction());
+
+    std::unique_lock<std::mutex> lock(m_notifier_mutex);
+    _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
+    lock.unlock();
+
+    auto& sg = Realm::Internal::get_shared_group(realm);
+    transaction::begin(sg, realm.m_binding_context.get(), notifiers);
+}
+
+void RealmCoordinator::process_available_async(Realm& realm)
+{
+    REALM_ASSERT(!realm.is_in_transaction());
+
+    std::unique_lock<std::mutex> lock(m_notifier_mutex);
+    auto notifiers = notifiers_for_realm(realm);
+    if (notifiers.empty())
+        return;
+
+    if (auto error = m_async_error) {
+        lock.unlock();
+        for (auto& notifier : notifiers)
+            notifier->deliver_error(m_async_error);
+        return;
+    }
+
+    bool in_read = realm.is_in_read_transaction();
+    auto& sg = Realm::Internal::get_shared_group(realm);
+    auto version = sg->get_version_of_current_transaction();
+    auto package = [&](auto& notifier) {
+        return !(notifier->has_run() && (!in_read || notifier->version() == version) && notifier->package_for_delivery());
+    };
+    notifiers.erase(std::remove_if(begin(notifiers), end(notifiers), package), end(notifiers));
+    lock.unlock();
+
+    // no before advance because the Realm is already at the given version,
+    // because we're either sending initial notifications or the write was
+    // done on this Realm instance
+
+    // Skip delivering if the Realm isn't in a read transaction
+    if (in_read) {
+        for (auto& notifier : notifiers)
+            notifier->deliver(*sg);
+    }
+
+    // but still call the change callbacks
+    for (auto& notifier : notifiers)
+        notifier->after_advance();
+}
+
+void RealmCoordinator::set_transaction_callback(std::function<void(VersionID, VersionID)> fn)
+{
+    create_sync_session();
+    m_transaction_callback = std::move(fn);
+}