1 ////////////////////////////////////////////////////////////////////////////
3 // Copyright 2015 Realm Inc.
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
17 ////////////////////////////////////////////////////////////////////////////
19 #ifndef REALM_COORDINATOR_HPP
20 #define REALM_COORDINATOR_HPP
22 #include "shared_realm.hpp"
24 #include <realm/version_id.hpp>
26 #include <condition_variable>
37 class CollectionNotifier;
38 class ExternalCommitHelper;
39 class WeakRealmNotifier;
41 // RealmCoordinator manages the weak cache of Realm instances and communication
42 // between per-thread Realm instances for a given file
43 class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
45 // Get the coordinator for the given path, creating it if neccesary
46 static std::shared_ptr<RealmCoordinator> get_coordinator(StringData path);
47 // Get the coordinator for the given config, creating it if neccesary
48 static std::shared_ptr<RealmCoordinator> get_coordinator(const Realm::Config&);
49 // Get the coordinator for the given path, or null if there is none
50 static std::shared_ptr<RealmCoordinator> get_existing_coordinator(StringData path);
52 // Get a thread-local shared Realm with the given configuration
53 // If the Realm is already open on another thread, validates that the given
54 // configuration is compatible with the existing one
55 std::shared_ptr<Realm> get_realm(Realm::Config config);
56 std::shared_ptr<Realm> get_realm();
58 Realm::Config get_config() const { return m_config; }
60 uint64_t get_schema_version() const noexcept { return m_schema_version; }
61 const std::string& get_path() const noexcept { return m_config.path; }
62 const std::vector<char>& get_encryption_key() const noexcept { return m_config.encryption_key; }
63 bool is_in_memory() const noexcept { return m_config.in_memory; }
65 // To avoid having to re-read and validate the file's schema every time a
66 // new read transaction is begun, RealmCoordinator maintains a cache of the
67 // most recently seen file schema and the range of transaction versions
68 // which it applies to. Note that this schema may not be identical to that
69 // of any Realm instances managed by this coordinator, as individual Realms
70 // may only be using a subset of it.
72 // Get the latest cached schema and the transaction version which it applies
73 // to. Returns false if there is no cached schema.
74 bool get_cached_schema(Schema& schema, uint64_t& schema_version, uint64_t& transaction) const noexcept;
76 // Cache the state of the schema at the given transaction version
77 void cache_schema(Schema const& new_schema, uint64_t new_schema_version,
78 uint64_t transaction_version);
79 // If there is a schema cached for transaction version `previous`, report
80 // that it is still valid at transaction version `next`
81 void advance_schema_cache(uint64_t previous, uint64_t next);
82 void clear_schema_cache_and_set_schema_version(uint64_t new_schema_version);
85 // Asynchronously call notify() on every Realm instance for this coordinator's
86 // path, including those in other processes
87 void send_commit_notifications(Realm&);
88 void wake_up_notifier_worker();
90 // Clear the weak Realm cache for all paths
91 // Should only be called in test code, as continuing to use the previously
92 // cached instances will have odd results
93 static void clear_cache();
95 // Clears all caches on existing coordinators
96 static void clear_all_caches();
98 // Verify that there are no Realms open for any paths
99 static void assert_no_open_realms() noexcept;
101 // Explicit constructor/destructor needed for the unique_ptrs to forward-declared types
105 // Called by Realm's destructor to ensure the cache is cleaned up promptly
106 // Do not call directly
107 void unregister_realm(Realm* realm);
109 // Called by m_notifier when there's a new commit to send notifications for
112 static void register_notifier(std::shared_ptr<CollectionNotifier> notifier);
114 // Advance the Realm to the most recent transaction version which all async
115 // work is complete for
116 void advance_to_ready(Realm& realm);
118 // Advance the Realm to the most recent transaction version, blocking if
119 // async notifiers are not yet ready for that version
120 // returns whether it actually changed the version
121 bool advance_to_latest(Realm& realm);
123 // Deliver any notifications which are ready for the Realm's version
124 void process_available_async(Realm& realm);
126 // Register a function which is called whenever sync makes a write to the Realm
127 void set_transaction_callback(std::function<void(VersionID, VersionID)>);
129 // Deliver notifications for the Realm, blocking if some aren't ready yet
130 // The calling Realm must be in a write transaction
131 void promote_to_write(Realm& realm);
133 // Commit a Realm's current write transaction and send notifications to all
134 // other Realm instances for that path, including in other processes
135 void commit_write(Realm& realm);
137 template<typename Pred>
138 std::unique_lock<std::mutex> wait_for_notifiers(Pred&& wait_predicate);
141 Realm::Config m_config;
143 mutable std::mutex m_schema_cache_mutex;
144 util::Optional<Schema> m_cached_schema;
145 uint64_t m_schema_version = -1;
146 uint64_t m_schema_transaction_version_min = 0;
147 uint64_t m_schema_transaction_version_max = 0;
149 std::mutex m_realm_mutex;
150 std::vector<WeakRealmNotifier> m_weak_realm_notifiers;
152 std::mutex m_notifier_mutex;
153 std::condition_variable m_notifier_cv;
154 std::vector<std::shared_ptr<_impl::CollectionNotifier>> m_new_notifiers;
155 std::vector<std::shared_ptr<_impl::CollectionNotifier>> m_notifiers;
156 VersionID m_notifier_skip_version = {0, 0};
158 // SharedGroup used for actually running async notifiers
159 // Will have a read transaction iff m_notifiers is non-empty
160 std::unique_ptr<Replication> m_notifier_history;
161 std::unique_ptr<SharedGroup> m_notifier_sg;
163 // SharedGroup used to advance notifiers in m_new_notifiers to the main shared
164 // group's transaction version
165 // Will have a read transaction iff m_new_notifiers is non-empty
166 std::unique_ptr<Replication> m_advancer_history;
167 std::unique_ptr<SharedGroup> m_advancer_sg;
168 std::exception_ptr m_async_error;
170 std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
171 std::function<void(VersionID, VersionID)> m_transaction_callback;
173 std::shared_ptr<SyncSession> m_sync_session;
175 // must be called with m_notifier_mutex locked
176 void pin_version(VersionID version);
178 void set_config(const Realm::Config&);
179 void create_sync_session();
181 void run_async_notifiers();
182 void open_helper_shared_group();
183 void advance_helper_shared_group_to_latest();
184 void clean_up_dead_notifiers();
186 std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers_for_realm(Realm&);
190 template<typename Pred>
191 std::unique_lock<std::mutex> RealmCoordinator::wait_for_notifiers(Pred&& wait_predicate)
193 std::unique_lock<std::mutex> lock(m_notifier_mutex);
195 m_notifier_cv.wait(lock, [&] {
196 if (wait_predicate())
199 wake_up_notifier_worker();
210 #endif /* REALM_COORDINATOR_HPP */