1 /*************************************************************************
3 * Copyright 2016 Realm Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 **************************************************************************/
19 #ifndef REALM_REPLICATION_HPP
20 #define REALM_REPLICATION_HPP
28 #include <realm/util/assert.hpp>
29 #include <realm/util/safe_int_ops.hpp>
30 #include <realm/util/buffer.hpp>
31 #include <realm/util/string_buffer.hpp>
32 #include <realm/impl/cont_transact_hist.hpp>
33 #include <realm/impl/transact_log.hpp>
40 // FIXME: Be careful about the possibility of one modification function being called by another where both do
41 // transaction logging.
43 // FIXME: The current table/subtable selection scheme assumes that a TableRef of a subtable is not accessed after any
44 // modification of one of its ancestor tables.
46 // FIXME: Checking on same Table* requires that ~Table checks and nullifies on match. Another option would be to store
47 // m_selected_table as a TableRef. Yet another option would be to assign unique identifiers to each Table instance via
48 // Allocator. Yet another option would be to explicitely invalidate subtables recursively when parent is modified.
50 /// Replication is enabled by passing an instance of an implementation of this
51 /// class to the SharedGroup constructor.
52 class Replication : public _impl::TransactLogConvenientEncoder, protected _impl::TransactLogStream {
54 // Be sure to keep this type aligned with what is actually used in
56 using version_type = _impl::History::version_type;
57 using InputStream = _impl::NoCopyInputStream;
58 class TransactLogApplier;
59 class Interrupted; // Exception
60 class SimpleIndexTranslator;
62 virtual std::string get_database_path() = 0;
64 /// Called during construction of the associated SharedGroup object.
66 /// \param shared_group The assocoated SharedGroup object.
67 virtual void initialize(SharedGroup& shared_group) = 0;
69 /// Called by the associated SharedGroup object when a session is
70 /// initiated. A *session* is a sequence of of temporally overlapping
71 /// accesses to a specific Realm file, where each access consists of a
72 /// SharedGroup object through which the Realm file is open. Session
73 /// initiation occurs during the first opening of the Realm file within such
76 /// Session initiation fails if this function throws.
78 /// \param version The current version of the associated Realm. Out-of-Realm
79 /// history implementation can use this to trim off history entries that
80 /// were successfully added to the history, but for which the corresponding
81 /// subsequent commits on the Realm file failed.
83 /// The default implementation does nothing.
84 virtual void initiate_session(version_type version) = 0;
86 /// Called by the associated SharedGroup object when a session is
87 /// terminated. See initiate_session() for the definition of a
88 /// session. Session termination occurs upon closing the Realm through the
89 /// last SharedGroup object within the session.
91 /// The default implementation does nothing.
92 virtual void terminate_session() noexcept = 0;
94 /// \defgroup replication_transactions
97 /// From the point of view of the Replication class, a transaction is
98 /// initiated when, and only when the associated SharedGroup object calls
99 /// initiate_transact() and the call is successful. The associated
100 /// SharedGroup object must terminate every initiated transaction either by
101 /// calling finalize_commit() or by calling abort_transact(). It may only
102 /// call finalize_commit(), however, after calling prepare_commit(), and
103 /// only when prepare_commit() succeeds. If prepare_commit() fails (i.e.,
104 /// throws) abort_transact() must still be called.
106 /// The associated SharedGroup object is supposed to terminate a transaction
107 /// as soon as possible, and is required to terminate it before attempting
108 /// to initiate a new one.
110 /// initiate_transact() is called by the associated SharedGroup object as
111 /// part of the initiation of a transaction, and at a time where the caller
112 /// has acquired exclusive write access to the local Realm. The Replication
113 /// implementation is allowed to perform "precursor transactions" on the
114 /// local Realm at this time. During the initiated transaction, the
115 /// associated SharedGroup object must inform the Replication object of all
116 /// modifying operations by calling set_value() and friends.
118 /// FIXME: There is currently no way for implementations to perform
119 /// precursor transactions, since a regular transaction would cause a dead
120 /// lock when it tries to acquire a write lock. Consider giving access to
121 /// special non-locking precursor transactions via an extra argument to this
124 /// prepare_commit() serves as the first phase of a two-phase commit. This
125 /// function is called by the associated SharedGroup object immediately
126 /// before the commit operation on the local Realm. The associated
127 /// SharedGroup object will then, as the second phase, either call
128 /// finalize_commit() or abort_transact() depending on whether the commit
129 /// operation succeeded or not. The Replication implementation is allowed to
130 /// modify the Realm via the associated SharedGroup object at this time
131 /// (important to in-Realm histories).
133 /// initiate_transact() and prepare_commit() are allowed to block the
134 /// calling thread if, for example, they need to communicate over the
135 /// network. If a calling thread is blocked in one of these functions, it
136 /// must be possible to interrupt the blocking operation by having another
137 /// thread call interrupt(). The contract is as follows: When interrupt() is
138 /// called, then any execution of initiate_transact() or prepare_commit(),
139 /// initiated before the interruption, must complete without blocking, or
140 /// the execution must be aborted by throwing an Interrupted exception. If
141 /// initiate_transact() or prepare_commit() throws Interrupted, it counts as
142 /// a failed operation.
144 /// finalize_commit() is called by the associated SharedGroup object
145 /// immediately after a successful commit operation on the local Realm. This
146 /// happens at a time where modification of the Realm is no longer possible
147 /// via the associated SharedGroup object. In the case of in-Realm
148 /// histories, the changes are automatically finalized as part of the commit
149 /// operation performed by the caller prior to the invocation of
150 /// finalize_commit(), so in that case, finalize_commit() might not need to
153 /// abort_transact() is called by the associated SharedGroup object to
154 /// terminate a transaction without committing. That is, any transaction
155 /// that is not terminated by finalize_commit() is terminated by
156 /// abort_transact(). This could be due to an explicit rollback, or due to a
157 /// failed commit attempt.
159 /// Note that finalize_commit() and abort_transact() are not allowed to
162 /// \param current_version The version of the snapshot that the current
163 /// transaction is based on.
165 /// \param history_updated Pass true only when the history has already been
166 /// updated to reflect the currently bound snapshot, such as when
167 /// _impl::History::update_early_from_top_ref() was called during the
168 /// transition from a read transaction to the current write transaction.
170 /// \return prepare_commit() returns the version of the new snapshot
171 /// produced by the transaction.
173 /// \throw Interrupted Thrown by initiate_transact() and prepare_commit() if
174 /// a blocking operation was interrupted.
176 void initiate_transact(version_type current_version, bool history_updated);
177 version_type prepare_commit(version_type current_version);
178 void finalize_commit() noexcept;
179 void abort_transact() noexcept;
184 /// Interrupt any blocking call to a function in this class. This function
185 /// may be called asyncronously from any thread, but it may not be called
186 /// from a system signal handler.
188 /// Some of the public function members of this class may block, but only
189 /// when it it is explicitely stated in the documention for those functions.
191 /// FIXME: Currently we do not state blocking behaviour for all the
192 /// functions that can block.
194 /// After any function has returned with an interruption indication, the
195 /// only functions that may safely be called are abort_transact() and the
196 /// destructor. If a client, after having received an interruption
197 /// indication, calls abort_transact() and then clear_interrupt(), it may
198 /// resume normal operation through this Replication object.
199 void interrupt() noexcept;
201 /// May be called by a client to reset this Replication object after an
202 /// interrupted transaction. It is not an error to call this function in a
203 /// situation where no interruption has occured.
204 void clear_interrupt() noexcept;
206 /// Apply a changeset to the specified group.
208 /// \param changeset The changes to be applied.
210 /// \param group The destination group to apply the changeset to.
212 /// \param logger If specified, and the library was compiled in debug mode,
213 /// then a line describing each individual operation is writted to the
214 /// specified logger.
216 /// \throw BadTransactLog If the changeset could not be successfully parsed,
217 /// or ended prematurely.
218 static void apply_changeset(InputStream& changeset, Group& group, util::Logger* logger = nullptr);
220 /// CAUTION: These values are stored in Realm files, so value reassignment
223 /// No history available. No support for either continuous transactions
224 /// or inter-client synchronization.
227 /// Out-of-Realm history supporting continuous transactions.
229 /// NOTE: This history type is no longer in use. The value needs to stay
230 /// reserved in case someone tries to open an old Realm file.
233 /// In-Realm history supporting continuous transactions
234 /// (make_in_realm_history()).
237 /// In-Realm history supporting continuous transactions and client-side
238 /// synchronization protocol (realm::sync::ClientHistory).
241 /// In-Realm history supporting continuous transactions and server-side
242 /// synchronization protocol (realm::_impl::ServerHistory).
246 /// Returns the type of history maintained by this Replication
247 /// implementation, or \ref hist_None if no history is maintained by it.
249 /// This type is used to ensure that all session participants agree on
250 /// history type, and that the Realm file contains a compatible type of
251 /// history, at the beginning of a new session.
253 /// As a special case, if there is no top array (Group::m_top) at the
254 /// beginning of a new session, then the history type is still undecided and
255 /// all history types (as returned by get_history_type()) are threfore
256 /// allowed for the session initiator. Note that this case only arises if
257 /// there was no preceding session, or if no transaction was sucessfully
258 /// committed during any of the preceding sessions. As soon as a transaction
259 /// is successfully committed, the Realm contains at least a top array, and
260 /// from that point on, the history type is generally fixed, although still
261 /// subject to certain allowed changes (as mentioned below).
263 /// For the sake of backwards compatibility with older Realm files that does
264 /// not store any history type, the following rule shall apply:
266 /// - If the top array of a Realm file (Group::m_top) does not contain a
267 /// history type, because it is too short, it shall be understood as
268 /// implicitly storing the type \ref hist_None.
270 /// Note: In what follows, the meaning of *preceding session* is: The last
271 /// preceding session that modified the Realm by sucessfully committing a
274 /// It shall be allowed to switch to a \ref hist_InRealm history if the
275 /// stored history type is \ref hist_None. This can be done simply by adding
276 /// a new history to the Realm file. This is possible because histories of
277 /// this type a transient in nature, and need not survive from one session
280 /// On the other hand, as soon as a history of type \ref hist_InRealm is
281 /// added to a Realm file, that history type is binding for all subsequent
282 /// sessions. In theory, this constraint is not necessary, and a later
283 /// switch to \ref hist_None would be possible because of the transient
284 /// nature of it, however, because the \ref hist_InRealm history remains in
285 /// the Realm file, there are practical complications, and for that reason,
286 /// such switching shall not be supported.
288 /// The \ref hist_SyncClient history type can only be used if the stored
289 /// history type is also \ref hist_SyncClient, or when there is no top array
290 /// yet. Likewise, the \ref hist_SyncServer history type can only be used if
291 /// the stored history type is also \ref hist_SyncServer, or when there is
292 /// no top array yet. Additionally, when the stored history type is \ref
293 /// hist_SyncClient or \ref hist_SyncServer, then all subsequent sessions
294 /// must have the same type. These restrictions apply because such a history
295 /// needs to be maintained persistently across sessions.
297 /// In general, if there is no stored history type (no top array) at the
298 /// beginning of a new session, or if the stored type disagrees with what is
299 /// returned by get_history_type() (which is possible due to particular
300 /// allowed changes of history type), the actual history type (as returned
301 /// by get_history_type()) used during that session, must be stored in the
302 /// Realm during the first successfully committed transaction in that
303 /// session. But note that there is still no need to expand the top array to
304 /// store the history type \ref hist_None, due to the rule mentioned above.
306 /// This function must return \ref hist_None when, and only when
307 /// get_history() returns null.
308 virtual HistoryType get_history_type() const noexcept = 0;
310 /// Returns the schema version of the history maintained by this Replication
311 /// implementation, or 0 if no history is maintained by it. All session
312 /// participants must agree on history schema version.
314 /// Must return 0 if get_history_type() returns \ref hist_None.
315 virtual int get_history_schema_version() const noexcept = 0;
317 /// Implementation may assume that this function is only ever called with a
318 /// stored schema version that is less than what was returned by
319 /// get_history_schema_version().
320 virtual bool is_upgradable_history_schema(int stored_schema_version) const noexcept = 0;
322 /// The implementation may assume that this function is only ever called if
323 /// is_upgradable_history_schema() was called with the same stored schema
324 /// version, and returned true. This implies that the specified stored
325 /// schema version is always strictly less than what was returned by
326 /// get_history_schema_version().
327 virtual void upgrade_history_schema(int stored_schema_version) = 0;
329 /// Returns an object that gives access to the history of changesets in a
330 /// way that allows for continuous transactions to work
331 /// (Group::advance_transact() in particular).
333 /// This function must return null when, and only when get_history_type()
334 /// returns \ref hist_None.
335 virtual _impl::History* get_history() = 0;
337 /// Returns false by default, but must return true if, and only if this
338 /// history object represents a session participant that is a sync
339 /// agent. This is used to enforce the "maximum one sync agent per session"
341 virtual bool is_sync_agent() const noexcept;
343 virtual ~Replication() noexcept
353 /// do_initiate_transact() is called by initiate_transact(), and likewise
354 /// for do_prepare_commit), do_finalize_commit(), and do_abort_transact().
356 /// With respect to exception safety, the Replication implementation has two
357 /// options: It can prepare to accept the accumulated changeset in
358 /// do_prepapre_commit() by allocating all required resources, and delay the
359 /// actual acceptance to do_finalize_commit(), which requires that the final
360 /// acceptance can be done without any risk of failure. Alternatively, the
361 /// Replication implementation can fully accept the changeset in
362 /// do_prepapre_commit() (allowing for failure), and then discard that
363 /// changeset during the next invocation of do_initiate_transact() if
364 /// `current_version` indicates that the previous transaction failed.
366 virtual void do_initiate_transact(version_type current_version, bool history_updated) = 0;
367 virtual version_type do_prepare_commit(version_type orig_version) = 0;
368 virtual void do_finalize_commit() noexcept = 0;
369 virtual void do_abort_transact() noexcept = 0;
374 virtual void do_interrupt() noexcept = 0;
376 virtual void do_clear_interrupt() noexcept = 0;
378 friend class _impl::TransactReverser;
382 class Replication::Interrupted : public std::exception {
384 const char* what() const noexcept override
386 return "Interrupted";
391 class TrivialReplication : public Replication {
393 ~TrivialReplication() noexcept
398 typedef Replication::version_type version_type;
400 TrivialReplication(const std::string& database_file);
402 virtual version_type prepare_changeset(const char* data, size_t size, version_type orig_version) = 0;
403 virtual void finalize_changeset() noexcept = 0;
405 static void apply_changeset(const char* data, size_t size, SharedGroup& target, util::Logger* logger = nullptr);
407 bool is_history_updated() const noexcept;
409 BinaryData get_uncommitted_changes() const noexcept;
411 std::string get_database_path() override;
412 void initialize(SharedGroup&) override;
413 void do_initiate_transact(version_type, bool) override;
414 version_type do_prepare_commit(version_type orig_version) override;
415 void do_finalize_commit() noexcept override;
416 void do_abort_transact() noexcept override;
417 void do_interrupt() noexcept override;
418 void do_clear_interrupt() noexcept override;
419 void transact_log_reserve(size_t n, char** new_begin, char** new_end) override;
420 void transact_log_append(const char* data, size_t size, char** new_begin, char** new_end) override;
423 const std::string m_database_file;
424 util::Buffer<char> m_transact_log_buffer;
425 bool m_history_updated;
426 void internal_transact_log_reserve(size_t, char** new_begin, char** new_end);
428 size_t transact_log_size();
434 inline Replication::Replication()
435 : _impl::TransactLogConvenientEncoder(static_cast<_impl::TransactLogStream&>(*this))
439 inline void Replication::initiate_transact(version_type current_version, bool history_updated)
441 do_initiate_transact(current_version, history_updated);
442 reset_selection_caches();
445 inline Replication::version_type Replication::prepare_commit(version_type orig_version)
447 return do_prepare_commit(orig_version);
450 inline void Replication::finalize_commit() noexcept
452 do_finalize_commit();
455 inline void Replication::abort_transact() noexcept
460 inline void Replication::interrupt() noexcept
465 inline void Replication::clear_interrupt() noexcept
467 do_clear_interrupt();
470 inline bool Replication::is_sync_agent() const noexcept
475 inline TrivialReplication::TrivialReplication(const std::string& database_file)
476 : m_database_file(database_file)
480 inline bool TrivialReplication::is_history_updated() const noexcept
482 return m_history_updated;
485 inline BinaryData TrivialReplication::get_uncommitted_changes() const noexcept
487 const char* data = m_transact_log_buffer.data();
488 size_t size = write_position() - data;
489 return BinaryData(data, size);
492 inline size_t TrivialReplication::transact_log_size()
494 return write_position() - m_transact_log_buffer.data();
497 inline void TrivialReplication::transact_log_reserve(size_t n, char** new_begin, char** new_end)
499 internal_transact_log_reserve(n, new_begin, new_end);
502 inline void TrivialReplication::internal_transact_log_reserve(size_t n, char** new_begin, char** new_end)
504 char* data = m_transact_log_buffer.data();
505 size_t size = write_position() - data;
506 m_transact_log_buffer.reserve_extra(size, n);
507 data = m_transact_log_buffer.data(); // May have changed
508 *new_begin = data + size;
509 *new_end = data + m_transact_log_buffer.size();
514 #endif // REALM_REPLICATION_HPP