1 ////////////////////////////////////////////////////////////////////////////
 
   3 // Copyright 2015 Realm Inc.
 
   5 // Licensed under the Apache License, Version 2.0 (the "License");
 
   6 // you may not use this file except in compliance with the License.
 
   7 // You may obtain a copy of the License at
 
   9 // http://www.apache.org/licenses/LICENSE-2.0
 
  11 // Unless required by applicable law or agreed to in writing, software
 
  12 // distributed under the License is distributed on an "AS IS" BASIS,
 
  13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14 // See the License for the specific language governing permissions and
 
  15 // limitations under the License.
 
  17 ////////////////////////////////////////////////////////////////////////////
 
  19 #ifndef REALM_COORDINATOR_HPP
 
  20 #define REALM_COORDINATOR_HPP
 
  22 #include "shared_realm.hpp"
 
  24 #include <realm/version_id.hpp>
 
  26 #include <condition_variable>
 
  37 class CollectionNotifier;
 
  38 class ExternalCommitHelper;
 
  39 class WeakRealmNotifier;
 
  41 // RealmCoordinator manages the weak cache of Realm instances and communication
 
  42 // between per-thread Realm instances for a given file
 
  43 class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
 
  45     // Get the coordinator for the given path, creating it if neccesary
 
  46     static std::shared_ptr<RealmCoordinator> get_coordinator(StringData path);
 
  47     // Get the coordinator for the given config, creating it if neccesary
 
  48     static std::shared_ptr<RealmCoordinator> get_coordinator(const Realm::Config&);
 
  49     // Get the coordinator for the given path, or null if there is none
 
  50     static std::shared_ptr<RealmCoordinator> get_existing_coordinator(StringData path);
 
  52     // Get a thread-local shared Realm with the given configuration
 
  53     // If the Realm is already open on another thread, validates that the given
 
  54     // configuration is compatible with the existing one
 
  55     std::shared_ptr<Realm> get_realm(Realm::Config config);
 
  56     std::shared_ptr<Realm> get_realm();
 
  58     Realm::Config get_config() const { return m_config; }
 
  60     uint64_t get_schema_version() const noexcept { return m_schema_version; }
 
  61     const std::string& get_path() const noexcept { return m_config.path; }
 
  62     const std::vector<char>& get_encryption_key() const noexcept { return m_config.encryption_key; }
 
  63     bool is_in_memory() const noexcept { return m_config.in_memory; }
 
  65     // To avoid having to re-read and validate the file's schema every time a
 
  66     // new read transaction is begun, RealmCoordinator maintains a cache of the
 
  67     // most recently seen file schema and the range of transaction versions
 
  68     // which it applies to. Note that this schema may not be identical to that
 
  69     // of any Realm instances managed by this coordinator, as individual Realms
 
  70     // may only be using a subset of it.
 
  72     // Get the latest cached schema and the transaction version which it applies
 
  73     // to. Returns false if there is no cached schema.
 
  74     bool get_cached_schema(Schema& schema, uint64_t& schema_version, uint64_t& transaction) const noexcept;
 
  76     // Cache the state of the schema at the given transaction version
 
  77     void cache_schema(Schema const& new_schema, uint64_t new_schema_version,
 
  78                       uint64_t transaction_version);
 
  79     // If there is a schema cached for transaction version `previous`, report
 
  80     // that it is still valid at transaction version `next`
 
  81     void advance_schema_cache(uint64_t previous, uint64_t next);
 
  82     void clear_schema_cache_and_set_schema_version(uint64_t new_schema_version);
 
  85     // Asynchronously call notify() on every Realm instance for this coordinator's
 
  86     // path, including those in other processes
 
  87     void send_commit_notifications(Realm&);
 
  88     void wake_up_notifier_worker();
 
  90     // Clear the weak Realm cache for all paths
 
  91     // Should only be called in test code, as continuing to use the previously
 
  92     // cached instances will have odd results
 
  93     static void clear_cache();
 
  95     // Clears all caches on existing coordinators
 
  96     static void clear_all_caches();
 
  98     // Verify that there are no Realms open for any paths
 
  99     static void assert_no_open_realms() noexcept;
 
 101     // Explicit constructor/destructor needed for the unique_ptrs to forward-declared types
 
 105     // Called by Realm's destructor to ensure the cache is cleaned up promptly
 
 106     // Do not call directly
 
 107     void unregister_realm(Realm* realm);
 
 109     // Called by m_notifier when there's a new commit to send notifications for
 
 112     static void register_notifier(std::shared_ptr<CollectionNotifier> notifier);
 
 114     // Advance the Realm to the most recent transaction version which all async
 
 115     // work is complete for
 
 116     void advance_to_ready(Realm& realm);
 
 118     // Advance the Realm to the most recent transaction version, blocking if
 
 119     // async notifiers are not yet ready for that version
 
 120     // returns whether it actually changed the version
 
 121     bool advance_to_latest(Realm& realm);
 
 123     // Deliver any notifications which are ready for the Realm's version
 
 124     void process_available_async(Realm& realm);
 
 126     // Register a function which is called whenever sync makes a write to the Realm
 
 127     void set_transaction_callback(std::function<void(VersionID, VersionID)>);
 
 129     // Deliver notifications for the Realm, blocking if some aren't ready yet
 
 130     // The calling Realm must be in a write transaction
 
 131     void promote_to_write(Realm& realm);
 
 133     // Commit a Realm's current write transaction and send notifications to all
 
 134     // other Realm instances for that path, including in other processes
 
 135     void commit_write(Realm& realm);
 
 137     template<typename Pred>
 
 138     std::unique_lock<std::mutex> wait_for_notifiers(Pred&& wait_predicate);
 
 141     Realm::Config m_config;
 
 143     mutable std::mutex m_schema_cache_mutex;
 
 144     util::Optional<Schema> m_cached_schema;
 
 145     uint64_t m_schema_version = -1;
 
 146     uint64_t m_schema_transaction_version_min = 0;
 
 147     uint64_t m_schema_transaction_version_max = 0;
 
 149     std::mutex m_realm_mutex;
 
 150     std::vector<WeakRealmNotifier> m_weak_realm_notifiers;
 
 152     std::mutex m_notifier_mutex;
 
 153     std::condition_variable m_notifier_cv;
 
 154     std::vector<std::shared_ptr<_impl::CollectionNotifier>> m_new_notifiers;
 
 155     std::vector<std::shared_ptr<_impl::CollectionNotifier>> m_notifiers;
 
 156     VersionID m_notifier_skip_version = {0, 0};
 
 158     // SharedGroup used for actually running async notifiers
 
 159     // Will have a read transaction iff m_notifiers is non-empty
 
 160     std::unique_ptr<Replication> m_notifier_history;
 
 161     std::unique_ptr<SharedGroup> m_notifier_sg;
 
 163     // SharedGroup used to advance notifiers in m_new_notifiers to the main shared
 
 164     // group's transaction version
 
 165     // Will have a read transaction iff m_new_notifiers is non-empty
 
 166     std::unique_ptr<Replication> m_advancer_history;
 
 167     std::unique_ptr<SharedGroup> m_advancer_sg;
 
 168     std::exception_ptr m_async_error;
 
 170     std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
 
 171     std::function<void(VersionID, VersionID)> m_transaction_callback;
 
 173     std::shared_ptr<SyncSession> m_sync_session;
 
 175     // must be called with m_notifier_mutex locked
 
 176     void pin_version(VersionID version);
 
 178     void set_config(const Realm::Config&);
 
 179     void create_sync_session();
 
 181     void run_async_notifiers();
 
 182     void open_helper_shared_group();
 
 183     void advance_helper_shared_group_to_latest();
 
 184     void clean_up_dead_notifiers();
 
 186     std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers_for_realm(Realm&);
 
 190 template<typename Pred>
 
 191 std::unique_lock<std::mutex> RealmCoordinator::wait_for_notifiers(Pred&& wait_predicate)
 
 193     std::unique_lock<std::mutex> lock(m_notifier_mutex);
 
 195     m_notifier_cv.wait(lock, [&] {
 
 196         if (wait_predicate())
 
 199             wake_up_notifier_worker();
 
 210 #endif /* REALM_COORDINATOR_HPP */