added iOS source code
[wl-app.git] / iOS / Pods / Realm / Realm / ObjectStore / src / impl / realm_coordinator.cpp
1 ////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright 2015 Realm Inc.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 ////////////////////////////////////////////////////////////////////////////
18
19 #include "impl/realm_coordinator.hpp"
20
21 #include "impl/collection_notifier.hpp"
22 #include "impl/external_commit_helper.hpp"
23 #include "impl/transact_log_handler.hpp"
24 #include "impl/weak_realm_notifier.hpp"
25 #include "binding_context.hpp"
26 #include "object_schema.hpp"
27 #include "object_store.hpp"
28 #include "schema.hpp"
29
30 #if REALM_ENABLE_SYNC
31 #include "sync/sync_config.hpp"
32 #include "sync/sync_manager.hpp"
33 #include "sync/sync_session.hpp"
34 #endif
35
36 #include <realm/group_shared.hpp>
37 #include <realm/lang_bind_helper.hpp>
38 #include <realm/string_data.hpp>
39
40 #include <algorithm>
41 #include <unordered_map>
42
43 using namespace realm;
44 using namespace realm::_impl;
45
46 static auto& s_coordinator_mutex = *new std::mutex;
47 static auto& s_coordinators_per_path = *new std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>>;
48
49 std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
50 {
51     std::lock_guard<std::mutex> lock(s_coordinator_mutex);
52
53     auto& weak_coordinator = s_coordinators_per_path[path];
54     if (auto coordinator = weak_coordinator.lock()) {
55         return coordinator;
56     }
57
58     auto coordinator = std::make_shared<RealmCoordinator>();
59     weak_coordinator = coordinator;
60     return coordinator;
61 }
62
63 std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(const Realm::Config& config)
64 {
65     auto coordinator = get_coordinator(config.path);
66     std::lock_guard<std::mutex> lock(coordinator->m_realm_mutex);
67     coordinator->set_config(config);
68     return coordinator;
69 }
70
71 std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
72 {
73     std::lock_guard<std::mutex> lock(s_coordinator_mutex);
74     auto it = s_coordinators_per_path.find(path);
75     return it == s_coordinators_per_path.end() ? nullptr : it->second.lock();
76 }
77
78 void RealmCoordinator::create_sync_session()
79 {
80 #if REALM_ENABLE_SYNC
81     if (m_sync_session)
82         return;
83
84     if (!m_config.encryption_key.empty() && !m_config.sync_config->realm_encryption_key) {
85         throw std::logic_error("A realm encryption key was specified in Realm::Config but not in SyncConfig");
86     } else if (m_config.sync_config->realm_encryption_key && m_config.encryption_key.empty()) {
87         throw std::logic_error("A realm encryption key was specified in SyncConfig but not in Realm::Config");
88     } else if (m_config.sync_config->realm_encryption_key &&
89                !std::equal(m_config.sync_config->realm_encryption_key->begin(), m_config.sync_config->realm_encryption_key->end(),
90                            m_config.encryption_key.begin(), m_config.encryption_key.end())) {
91         throw std::logic_error("The realm encryption key specified in SyncConfig does not match the one in Realm::Config");
92     }
93
94     auto sync_config = *m_config.sync_config;
95     sync_config.validate_sync_history = false;
96     m_sync_session = SyncManager::shared().get_session(m_config.path, sync_config);
97
98     std::weak_ptr<RealmCoordinator> weak_self = shared_from_this();
99     SyncSession::Internal::set_sync_transact_callback(*m_sync_session,
100                                                       [weak_self](VersionID old_version, VersionID new_version) {
101         if (auto self = weak_self.lock()) {
102             if (self->m_transaction_callback)
103                 self->m_transaction_callback(old_version, new_version);
104             if (self->m_notifier)
105                 self->m_notifier->notify_others();
106         }
107     });
108 #endif
109 }
110
111 void RealmCoordinator::set_config(const Realm::Config& config)
112 {
113     if (config.encryption_key.data() && config.encryption_key.size() != 64)
114         throw InvalidEncryptionKeyException();
115     if (config.schema_mode == SchemaMode::Immutable && config.sync_config)
116         throw std::logic_error("Synchronized Realms cannot be opened in immutable mode");
117     if (config.schema_mode == SchemaMode::Additive && config.migration_function)
118         throw std::logic_error("Realms opened in Additive-only schema mode do not use a migration function");
119     if (config.schema_mode == SchemaMode::Immutable && config.migration_function)
120         throw std::logic_error("Realms opened in immutable mode do not use a migration function");
121     if (config.schema_mode == SchemaMode::ReadOnlyAlternative && config.migration_function)
122         throw std::logic_error("Realms opened in read-only mode do not use a migration function");
123     if (config.schema_mode == SchemaMode::Immutable && config.initialization_function)
124         throw std::logic_error("Realms opened in immutable mode do not use an initialization function");
125     if (config.schema_mode == SchemaMode::ReadOnlyAlternative && config.initialization_function)
126         throw std::logic_error("Realms opened in read-only mode do not use an initialization function");
127     if (config.schema && config.schema_version == ObjectStore::NotVersioned)
128         throw std::logic_error("A schema version must be specified when the schema is specified");
129     if (!config.realm_data.is_null() && (!config.immutable() || !config.in_memory))
130         throw std::logic_error("In-memory realms initialized from memory buffers can only be opened in read-only mode");
131     if (!config.realm_data.is_null() && !config.path.empty())
132         throw std::logic_error("Specifying both memory buffer and path is invalid");
133     if (!config.realm_data.is_null() && !config.encryption_key.empty())
134         throw std::logic_error("Memory buffers do not support encryption");
135     // ResetFile also won't use the migration function, but specifying one is
136     // allowed to simplify temporarily switching modes during development
137
138     bool no_existing_realm = std::all_of(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers),
139                                          [](auto& notifier) { return notifier.expired(); });
140     if (no_existing_realm) {
141         m_config = config;
142     }
143     else {
144         if (m_config.immutable() != config.immutable()) {
145             throw MismatchedConfigException("Realm at path '%1' already opened with different read permissions.", config.path);
146         }
147         if (m_config.in_memory != config.in_memory) {
148             throw MismatchedConfigException("Realm at path '%1' already opened with different inMemory settings.", config.path);
149         }
150         if (m_config.encryption_key != config.encryption_key) {
151             throw MismatchedConfigException("Realm at path '%1' already opened with a different encryption key.", config.path);
152         }
153         if (m_config.schema_mode != config.schema_mode) {
154             throw MismatchedConfigException("Realm at path '%1' already opened with a different schema mode.", config.path);
155         }
156         if (config.schema && m_schema_version != ObjectStore::NotVersioned && m_schema_version != config.schema_version) {
157             throw MismatchedConfigException("Realm at path '%1' already opened with different schema version.", config.path);
158         }
159
160 #if REALM_ENABLE_SYNC
161         if (bool(m_config.sync_config) != bool(config.sync_config)) {
162             throw MismatchedConfigException("Realm at path '%1' already opened with different sync configurations.", config.path);
163         }
164
165         if (config.sync_config) {
166             if (m_config.sync_config->user != config.sync_config->user) {
167                 throw MismatchedConfigException("Realm at path '%1' already opened with different sync user.", config.path);
168             }
169             if (m_config.sync_config->realm_url() != config.sync_config->realm_url()) {
170                 throw MismatchedConfigException("Realm at path '%1' already opened with different sync server URL.", config.path);
171             }
172             if (m_config.sync_config->transformer != config.sync_config->transformer) {
173                 throw MismatchedConfigException("Realm at path '%1' already opened with different transformer.", config.path);
174             }
175             if (m_config.sync_config->realm_encryption_key != config.sync_config->realm_encryption_key) {
176                 throw MismatchedConfigException("Realm at path '%1' already opened with sync session encryption key.", config.path);
177             }
178         }
179 #endif
180
181         // Realm::update_schema() handles complaining about schema mismatches
182     }
183 }
184
185 std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
186 {
187     // realm must be declared before lock so that the mutex is released before
188     // we release the strong reference to realm, as Realm's destructor may want
189     // to acquire the same lock
190     std::shared_ptr<Realm> realm;
191     std::unique_lock<std::mutex> lock(m_realm_mutex);
192
193     set_config(config);
194
195     auto schema = std::move(config.schema);
196     auto migration_function = std::move(config.migration_function);
197     auto initialization_function = std::move(config.initialization_function);
198     config.schema = {};
199
200     if (config.cache) {
201         AnyExecutionContextID execution_context(config.execution_context);
202         for (auto& cached_realm : m_weak_realm_notifiers) {
203             if (!cached_realm.is_cached_for_execution_context(execution_context))
204                 continue;
205             // can be null if we jumped in between ref count hitting zero and
206             // unregister_realm() getting the lock
207             if ((realm = cached_realm.realm())) {
208                 // If the file is uninitialized and was opened without a schema,
209                 // do the normal schema init
210                 if (realm->schema_version() == ObjectStore::NotVersioned)
211                     break;
212
213                 // Otherwise if we have a realm schema it needs to be an exact
214                 // match (even having the same properties but in different
215                 // orders isn't good enough)
216                 if (schema && realm->schema() != *schema)
217                     throw MismatchedConfigException("Realm at path '%1' already opened on current thread with different schema.", config.path);
218
219                 return realm;
220             }
221         }
222     }
223
224     if (!realm) {
225         bool should_initialize_notifier = !config.immutable() && config.automatic_change_notifications;
226         realm = Realm::make_shared_realm(std::move(config), shared_from_this());
227         if (!m_notifier && should_initialize_notifier) {
228             try {
229                 m_notifier = std::make_unique<ExternalCommitHelper>(*this);
230             }
231             catch (std::system_error const& ex) {
232                 throw RealmFileException(RealmFileException::Kind::AccessError, get_path(), ex.code().message(), "");
233             }
234         }
235         m_weak_realm_notifiers.emplace_back(realm, m_config.cache);
236     }
237
238     if (realm->config().sync_config)
239         create_sync_session();
240
241     if (schema) {
242         lock.unlock();
243         realm->update_schema(std::move(*schema), config.schema_version, std::move(migration_function),
244                              std::move(initialization_function));
245     }
246
247     return realm;
248 }
249
250 std::shared_ptr<Realm> RealmCoordinator::get_realm()
251 {
252     return get_realm(m_config);
253 }
254
255 bool RealmCoordinator::get_cached_schema(Schema& schema, uint64_t& schema_version,
256                                          uint64_t& transaction) const noexcept
257 {
258     std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
259     if (!m_cached_schema)
260         return false;
261     schema = *m_cached_schema;
262     schema_version = m_schema_version;
263     transaction = m_schema_transaction_version_max;
264     return true;
265 }
266
267 void RealmCoordinator::cache_schema(Schema const& new_schema, uint64_t new_schema_version,
268                                     uint64_t transaction_version)
269 {
270     std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
271     if (transaction_version < m_schema_transaction_version_max)
272         return;
273     if (new_schema.empty() || new_schema_version == ObjectStore::NotVersioned)
274         return;
275
276     m_cached_schema = new_schema;
277     m_schema_version = new_schema_version;
278     m_schema_transaction_version_min = transaction_version;
279     m_schema_transaction_version_max = transaction_version;
280 }
281
282 void RealmCoordinator::clear_schema_cache_and_set_schema_version(uint64_t new_schema_version)
283 {
284     std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
285     m_cached_schema = util::none;
286     m_schema_version = new_schema_version;
287 }
288
289 void RealmCoordinator::advance_schema_cache(uint64_t previous, uint64_t next)
290 {
291     std::lock_guard<std::mutex> lock(m_schema_cache_mutex);
292     if (!m_cached_schema)
293         return;
294     REALM_ASSERT(previous <= m_schema_transaction_version_max);
295     if (next < m_schema_transaction_version_min)
296         return;
297     m_schema_transaction_version_min = std::min(previous, m_schema_transaction_version_min);
298     m_schema_transaction_version_max = std::max(next, m_schema_transaction_version_max);
299 }
300
301 RealmCoordinator::RealmCoordinator() = default;
302
303 RealmCoordinator::~RealmCoordinator()
304 {
305     std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
306     for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end(); ) {
307         if (it->second.expired()) {
308             it = s_coordinators_per_path.erase(it);
309         }
310         else {
311             ++it;
312         }
313     }
314 }
315
316 void RealmCoordinator::unregister_realm(Realm* realm)
317 {
318     std::lock_guard<std::mutex> lock(m_realm_mutex);
319     auto new_end = remove_if(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers),
320                              [=](auto& notifier) { return notifier.expired() || notifier.is_for_realm(realm); });
321     m_weak_realm_notifiers.erase(new_end, end(m_weak_realm_notifiers));
322 }
323
324 void RealmCoordinator::clear_cache()
325 {
326     std::vector<WeakRealm> realms_to_close;
327     {
328         std::lock_guard<std::mutex> lock(s_coordinator_mutex);
329
330         for (auto& weak_coordinator : s_coordinators_per_path) {
331             auto coordinator = weak_coordinator.second.lock();
332             if (!coordinator) {
333                 continue;
334             }
335
336             coordinator->m_notifier = nullptr;
337
338             // Gather a list of all of the realms which will be removed
339             for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
340                 if (auto realm = weak_realm_notifier.realm()) {
341                     realms_to_close.push_back(realm);
342                 }
343             }
344         }
345
346         s_coordinators_per_path.clear();
347     }
348
349     // Close all of the previously cached Realms. This can't be done while
350     // s_coordinator_mutex is held as it may try to re-lock it.
351     for (auto& weak_realm : realms_to_close) {
352         if (auto realm = weak_realm.lock()) {
353             realm->close();
354         }
355     }
356 }
357
358 void RealmCoordinator::clear_all_caches()
359 {
360     std::vector<std::weak_ptr<RealmCoordinator>> to_clear;
361     {
362         std::lock_guard<std::mutex> lock(s_coordinator_mutex);
363         for (auto iter : s_coordinators_per_path) {
364             to_clear.push_back(iter.second);
365         }
366     }
367     for (auto weak_coordinator : to_clear) {
368         if (auto coordinator = weak_coordinator.lock()) {
369             coordinator->clear_cache();
370         }
371     }
372 }
373
374 void RealmCoordinator::assert_no_open_realms() noexcept
375 {
376 #ifdef REALM_DEBUG
377     std::lock_guard<std::mutex> lock(s_coordinator_mutex);
378     REALM_ASSERT(s_coordinators_per_path.empty());
379 #endif
380 }
381
382 void RealmCoordinator::wake_up_notifier_worker()
383 {
384     if (m_notifier) {
385         // FIXME: this wakes up the notification workers for all processes and
386         // not just us. This might be worth optimizing in the future.
387         m_notifier->notify_others();
388     }
389 }
390
391 void RealmCoordinator::commit_write(Realm& realm)
392 {
393     REALM_ASSERT(!m_config.immutable());
394     REALM_ASSERT(realm.is_in_transaction());
395
396     {
397         // Need to acquire this lock before committing or another process could
398         // perform a write and notify us before we get the chance to set the
399         // skip version
400         std::lock_guard<std::mutex> l(m_notifier_mutex);
401
402         transaction::commit(*Realm::Internal::get_shared_group(realm));
403
404         // Don't need to check m_new_notifiers because those don't skip versions
405         bool have_notifiers = std::any_of(m_notifiers.begin(), m_notifiers.end(),
406                                           [&](auto&& notifier) { return notifier->is_for_realm(realm); });
407         if (have_notifiers) {
408             m_notifier_skip_version = Realm::Internal::get_shared_group(realm)->get_version_of_current_transaction();
409         }
410     }
411
412 #if REALM_ENABLE_SYNC
413     // Realm could be closed in did_change. So send sync notification first before did_change.
414     if (m_sync_session) {
415         auto& sg = Realm::Internal::get_shared_group(realm);
416         auto version = LangBindHelper::get_version_of_latest_snapshot(*sg);
417         SyncSession::Internal::nonsync_transact_notify(*m_sync_session, version);
418     }
419 #endif
420     if (realm.m_binding_context) {
421         realm.m_binding_context->did_change({}, {});
422     }
423
424     if (m_notifier) {
425         m_notifier->notify_others();
426     }
427 }
428
429 void RealmCoordinator::pin_version(VersionID versionid)
430 {
431     REALM_ASSERT_DEBUG(!m_notifier_mutex.try_lock());
432     if (m_async_error) {
433         return;
434     }
435
436     if (!m_advancer_sg) {
437         try {
438             std::unique_ptr<Group> read_only_group;
439             Realm::open_with_config(m_config, m_advancer_history, m_advancer_sg, read_only_group, nullptr);
440             REALM_ASSERT(!read_only_group);
441             m_advancer_sg->begin_read(versionid);
442         }
443         catch (...) {
444             m_async_error = std::current_exception();
445             m_advancer_sg = nullptr;
446             m_advancer_history = nullptr;
447         }
448     }
449     else if (m_new_notifiers.empty()) {
450         // If this is the first notifier then we don't already have a read transaction
451         REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Ready);
452         m_advancer_sg->begin_read(versionid);
453     }
454     else {
455         REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
456         if (versionid < m_advancer_sg->get_version_of_current_transaction()) {
457             // Ensure we're holding a readlock on the oldest version we have a
458             // handover object for, as handover objects don't
459             m_advancer_sg->end_read();
460             m_advancer_sg->begin_read(versionid);
461         }
462     }
463 }
464
465 void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> notifier)
466 {
467     auto version = notifier->version();
468     auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
469     {
470         std::lock_guard<std::mutex> lock(self.m_notifier_mutex);
471         self.pin_version(version);
472         self.m_new_notifiers.push_back(std::move(notifier));
473     }
474 }
475
476 void RealmCoordinator::clean_up_dead_notifiers()
477 {
478     auto swap_remove = [&](auto& container) {
479         bool did_remove = false;
480         for (size_t i = 0; i < container.size(); ++i) {
481             if (container[i]->is_alive())
482                 continue;
483
484             // Ensure the notifier is destroyed here even if there's lingering refs
485             // to the async notifier elsewhere
486             container[i]->release_data();
487
488             if (container.size() > i + 1)
489                 container[i] = std::move(container.back());
490             container.pop_back();
491             --i;
492             did_remove = true;
493         }
494         return did_remove;
495     };
496
497     if (swap_remove(m_notifiers)) {
498         // Make sure we aren't holding on to read versions needlessly if there
499         // are no notifiers left, but don't close them entirely as opening shared
500         // groups is expensive
501         if (m_notifiers.empty() && m_notifier_sg) {
502             REALM_ASSERT_3(m_notifier_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
503             m_notifier_sg->end_read();
504             m_notifier_skip_version = {0, 0};
505         }
506     }
507     if (swap_remove(m_new_notifiers) && m_advancer_sg) {
508         REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
509         if (m_new_notifiers.empty()) {
510             m_advancer_sg->end_read();
511         }
512     }
513 }
514
515 void RealmCoordinator::on_change()
516 {
517     run_async_notifiers();
518
519     std::lock_guard<std::mutex> lock(m_realm_mutex);
520     for (auto& realm : m_weak_realm_notifiers) {
521         realm.notify();
522     }
523 }
524
525 namespace {
526 class IncrementalChangeInfo {
527 public:
528     IncrementalChangeInfo(SharedGroup& sg,
529                           std::vector<std::shared_ptr<_impl::CollectionNotifier>>& notifiers)
530     : m_sg(sg)
531     {
532         if (notifiers.empty())
533             return;
534
535         auto cmp = [&](auto&& lft, auto&& rgt) {
536             return lft->version() < rgt->version();
537         };
538
539         // Sort the notifiers by their source version so that we can pull them
540         // all forward to the latest version in a single pass over the transaction log
541         std::sort(notifiers.begin(), notifiers.end(), cmp);
542
543         // Preallocate the required amount of space in the vector so that we can
544         // safely give out pointers to within the vector
545         size_t count = 1;
546         for (auto it = notifiers.begin(), next = it + 1; next != notifiers.end(); ++it, ++next) {
547             if (cmp(*it, *next))
548                 ++count;
549         }
550         m_info.reserve(count);
551         m_info.resize(1);
552         m_current = &m_info[0];
553     }
554
555     TransactionChangeInfo& current() const { return *m_current; }
556
557     bool advance_incremental(VersionID version)
558     {
559         if (version != m_sg.get_version_of_current_transaction()) {
560             transaction::advance(m_sg, *m_current, version);
561             m_info.push_back({
562                 m_current->table_modifications_needed,
563                 m_current->table_moves_needed,
564                 std::move(m_current->lists)});
565             m_current = &m_info.back();
566             return true;
567         }
568         return false;
569     }
570
571     void advance_to_final(VersionID version)
572     {
573         if (!m_current) {
574             transaction::advance(m_sg, nullptr, version);
575             return;
576         }
577
578         transaction::advance(m_sg, *m_current, version);
579
580         // We now need to combine the transaction change info objects so that all of
581         // the notifiers see the complete set of changes from their first version to
582         // the most recent one
583         for (size_t i = m_info.size() - 1; i > 0; --i) {
584             auto& cur = m_info[i];
585             if (cur.tables.empty())
586                 continue;
587             auto& prev = m_info[i - 1];
588             if (prev.tables.empty()) {
589                 prev.tables = cur.tables;
590                 continue;
591             }
592
593             for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) {
594                 prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]});
595             }
596             prev.tables.reserve(cur.tables.size());
597             while (prev.tables.size() < cur.tables.size()) {
598                 prev.tables.push_back(cur.tables[prev.tables.size()]);
599             }
600         }
601
602         // Copy the list change info if there are multiple LinkViews for the same LinkList
603         auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); };
604         for (size_t i = 1; i < m_current->lists.size(); ++i) {
605             for (size_t j = i; j > 0; --j) {
606                 if (id(m_current->lists[i]) == id(m_current->lists[j - 1])) {
607                     m_current->lists[j - 1].changes->merge(CollectionChangeBuilder{*m_current->lists[i].changes});
608                 }
609             }
610         }
611     }
612
613 private:
614     std::vector<TransactionChangeInfo> m_info;
615     TransactionChangeInfo* m_current = nullptr;
616     SharedGroup& m_sg;
617 };
618 } // anonymous namespace
619
620 void RealmCoordinator::run_async_notifiers()
621 {
622     std::unique_lock<std::mutex> lock(m_notifier_mutex);
623
624     clean_up_dead_notifiers();
625
626     if (m_notifiers.empty() && m_new_notifiers.empty()) {
627         return;
628     }
629
630     if (!m_async_error) {
631         open_helper_shared_group();
632     }
633
634     if (m_async_error) {
635         std::move(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(m_notifiers));
636         m_new_notifiers.clear();
637         return;
638     }
639
640     VersionID version;
641
642     // Advance all of the new notifiers to the most recent version, if any
643     auto new_notifiers = std::move(m_new_notifiers);
644     IncrementalChangeInfo new_notifier_change_info(*m_advancer_sg, new_notifiers);
645
646     if (!new_notifiers.empty()) {
647         REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Reading);
648         REALM_ASSERT_3(m_advancer_sg->get_version_of_current_transaction().version,
649                        <=, new_notifiers.front()->version().version);
650
651         // The advancer SG can be at an older version than the oldest new notifier
652         // if a notifier was added and then removed before it ever got the chance
653         // to run, as we don't move the pin forward when removing dead notifiers
654         transaction::advance(*m_advancer_sg, nullptr, new_notifiers.front()->version());
655
656         // Advance each of the new notifiers to the latest version, attaching them
657         // to the SG at their handover version. This requires a unique
658         // TransactionChangeInfo for each source version, so that things don't
659         // see changes from before the version they were handed over from.
660         // Each Info has all of the changes between that source version and the
661         // next source version, and they'll be merged together later after
662         // releasing the lock
663         for (auto& notifier : new_notifiers) {
664             new_notifier_change_info.advance_incremental(notifier->version());
665             notifier->attach_to(*m_advancer_sg);
666             notifier->add_required_change_info(new_notifier_change_info.current());
667         }
668         new_notifier_change_info.advance_to_final(VersionID{});
669
670         for (auto& notifier : new_notifiers) {
671             notifier->detach();
672         }
673
674         // We want to advance the non-new notifiers to the same version as the
675         // new notifiers to avoid having to merge changes from any new
676         // transaction that happen immediately after this into the new notifier
677         // changes
678         version = m_advancer_sg->get_version_of_current_transaction();
679         m_advancer_sg->end_read();
680     }
681     else {
682         // If we have no new notifiers we want to just advance to the latest
683         // version, but we have to pick a "latest" version while holding the
684         // notifier lock to avoid advancing over a transaction which should be
685         // skipped
686         m_advancer_sg->begin_read();
687         version = m_advancer_sg->get_version_of_current_transaction();
688         m_advancer_sg->end_read();
689     }
690     REALM_ASSERT_3(m_advancer_sg->get_transact_stage(), ==, SharedGroup::transact_Ready);
691
692     auto skip_version = m_notifier_skip_version;
693     m_notifier_skip_version = {0, 0};
694
695     // Make a copy of the notifiers vector and then release the lock to avoid
696     // blocking other threads trying to register or unregister notifiers while we run them
697     auto notifiers = m_notifiers;
698     m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());
699     lock.unlock();
700
701     if (skip_version.version) {
702         REALM_ASSERT(!notifiers.empty());
703         REALM_ASSERT(version >= skip_version);
704         IncrementalChangeInfo change_info(*m_notifier_sg, notifiers);
705         for (auto& notifier : notifiers)
706             notifier->add_required_change_info(change_info.current());
707         change_info.advance_to_final(skip_version);
708
709         for (auto& notifier : notifiers)
710             notifier->run();
711
712         lock.lock();
713         for (auto& notifier : notifiers)
714             notifier->prepare_handover();
715         lock.unlock();
716     }
717
718     // Advance the non-new notifiers to the same version as we advanced the new
719     // ones to (or the latest if there were no new ones)
720     IncrementalChangeInfo change_info(*m_notifier_sg, notifiers);
721     for (auto& notifier : notifiers) {
722         notifier->add_required_change_info(change_info.current());
723     }
724     change_info.advance_to_final(version);
725
726     // Attach the new notifiers to the main SG and move them to the main list
727     for (auto& notifier : new_notifiers) {
728         notifier->attach_to(*m_notifier_sg);
729         notifier->run();
730     }
731
732     // Change info is now all ready, so the notifiers can now perform their
733     // background work
734     for (auto& notifier : notifiers) {
735         notifier->run();
736     }
737
738     // Reacquire the lock while updating the fields that are actually read on
739     // other threads
740     lock.lock();
741     for (auto& notifier : new_notifiers) {
742         notifier->prepare_handover();
743     }
744     for (auto& notifier : notifiers) {
745         notifier->prepare_handover();
746     }
747     clean_up_dead_notifiers();
748     m_notifier_cv.notify_all();
749 }
750
751 void RealmCoordinator::open_helper_shared_group()
752 {
753     if (!m_notifier_sg) {
754         try {
755             std::unique_ptr<Group> read_only_group;
756             Realm::open_with_config(m_config, m_notifier_history, m_notifier_sg, read_only_group, nullptr);
757             REALM_ASSERT(!read_only_group);
758             m_notifier_sg->begin_read();
759         }
760         catch (...) {
761             // Store the error to be passed to the async notifiers
762             m_async_error = std::current_exception();
763             m_notifier_sg = nullptr;
764             m_notifier_history = nullptr;
765         }
766     }
767     else if (m_notifiers.empty()) {
768         m_notifier_sg->begin_read();
769     }
770 }
771
772 void RealmCoordinator::advance_to_ready(Realm& realm)
773 {
774     std::unique_lock<std::mutex> lock(m_notifier_mutex);
775     _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
776     lock.unlock();
777     notifiers.package_and_wait(util::none);
778
779     auto& sg = Realm::Internal::get_shared_group(realm);
780     if (notifiers) {
781         auto version = notifiers.version();
782         if (version) {
783             auto current_version = sg->get_version_of_current_transaction();
784             // Notifications are out of date, so just discard
785             // This should only happen if begin_read() was used to change the
786             // read version outside of our control
787             if (*version < current_version)
788                 return;
789             // While there is a newer version, notifications are for the current
790             // version so just deliver them without advancing
791             if (*version == current_version) {
792                 notifiers.deliver(*sg);
793                 notifiers.after_advance();
794                 return;
795             }
796         }
797     }
798
799     transaction::advance(sg, realm.m_binding_context.get(), notifiers);
800 }
801
802 std::vector<std::shared_ptr<_impl::CollectionNotifier>> RealmCoordinator::notifiers_for_realm(Realm& realm)
803 {
804     std::vector<std::shared_ptr<_impl::CollectionNotifier>> ret;
805     for (auto& notifier : m_new_notifiers) {
806         if (notifier->is_for_realm(realm))
807             ret.push_back(notifier);
808     }
809     for (auto& notifier : m_notifiers) {
810         if (notifier->is_for_realm(realm))
811             ret.push_back(notifier);
812     }
813     return ret;
814 }
815
816 bool RealmCoordinator::advance_to_latest(Realm& realm)
817 {
818     using sgf = SharedGroupFriend;
819
820     auto& sg = Realm::Internal::get_shared_group(realm);
821     std::unique_lock<std::mutex> lock(m_notifier_mutex);
822     _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
823     lock.unlock();
824     notifiers.package_and_wait(sgf::get_version_of_latest_snapshot(*sg));
825
826     auto version = sg->get_version_of_current_transaction();
827     transaction::advance(sg, realm.m_binding_context.get(), notifiers);
828
829     // Realm could be closed in the callbacks.
830     if (realm.is_closed())
831         return false;
832
833     return version != sg->get_version_of_current_transaction();
834 }
835
836 void RealmCoordinator::promote_to_write(Realm& realm)
837 {
838     REALM_ASSERT(!realm.is_in_transaction());
839
840     std::unique_lock<std::mutex> lock(m_notifier_mutex);
841     _impl::NotifierPackage notifiers(m_async_error, notifiers_for_realm(realm), this);
842     lock.unlock();
843
844     auto& sg = Realm::Internal::get_shared_group(realm);
845     transaction::begin(sg, realm.m_binding_context.get(), notifiers);
846 }
847
848 void RealmCoordinator::process_available_async(Realm& realm)
849 {
850     REALM_ASSERT(!realm.is_in_transaction());
851
852     std::unique_lock<std::mutex> lock(m_notifier_mutex);
853     auto notifiers = notifiers_for_realm(realm);
854     if (notifiers.empty())
855         return;
856
857     if (auto error = m_async_error) {
858         lock.unlock();
859         for (auto& notifier : notifiers)
860             notifier->deliver_error(m_async_error);
861         return;
862     }
863
864     bool in_read = realm.is_in_read_transaction();
865     auto& sg = Realm::Internal::get_shared_group(realm);
866     auto version = sg->get_version_of_current_transaction();
867     auto package = [&](auto& notifier) {
868         return !(notifier->has_run() && (!in_read || notifier->version() == version) && notifier->package_for_delivery());
869     };
870     notifiers.erase(std::remove_if(begin(notifiers), end(notifiers), package), end(notifiers));
871     lock.unlock();
872
873     // no before advance because the Realm is already at the given version,
874     // because we're either sending initial notifications or the write was
875     // done on this Realm instance
876
877     // Skip delivering if the Realm isn't in a read transaction
878     if (in_read) {
879         for (auto& notifier : notifiers)
880             notifier->deliver(*sg);
881     }
882
883     // but still call the change callbacks
884     for (auto& notifier : notifiers)
885         notifier->after_advance();
886 }
887
888 void RealmCoordinator::set_transaction_callback(std::function<void(VersionID, VersionID)> fn)
889 {
890     create_sync_session();
891     m_transaction_callback = std::move(fn);
892 }