1 /*************************************************************************
3 * Copyright 2016 Realm Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 **************************************************************************/
19 #ifndef REALM_GROUP_SHARED_HPP
20 #define REALM_GROUP_SHARED_HPP
24 #include <realm/util/features.h>
25 #include <realm/util/thread.hpp>
26 #include <realm/util/interprocess_condvar.hpp>
27 #include <realm/util/interprocess_mutex.hpp>
28 #include <realm/group.hpp>
29 #include <realm/group_shared_options.hpp>
30 #include <realm/handover_defs.hpp>
31 #include <realm/impl/transact_log.hpp>
32 #include <realm/metrics/metrics.hpp>
33 #include <realm/replication.hpp>
34 #include <realm/version_id.hpp>
39 class SharedGroupFriend;
40 class WriteLogCollector;
43 /// Thrown by SharedGroup::open() if the lock file is already open in another
44 /// process which can't share mutexes with this process
45 struct IncompatibleLockFile : std::runtime_error {
46 IncompatibleLockFile(const std::string& msg)
47 : std::runtime_error("Incompatible lock file. " + msg)
52 /// Thrown by SharedGroup::open() if the type of history
53 /// (Replication::HistoryType) in the opened Realm file is incompatible with the
54 /// mode in which the Realm file is opened. For example, if there is a mismatch
55 /// between the history type in the file, and the history type associated with
56 /// the replication plugin passed to SharedGroup::open().
58 /// This exception will also be thrown if the history schema version is lower
59 /// than required, and no migration is possible
60 /// (Replication::is_upgradable_history_schema()).
61 struct IncompatibleHistories : util::File::AccessError {
62 IncompatibleHistories(const std::string& msg, const std::string& path)
63 : util::File::AccessError("Incompatible histories. " + msg, path)
68 /// A SharedGroup facilitates transactions.
70 /// When multiple threads or processes need to access a database
71 /// concurrently, they must do so using transactions. By design,
72 /// Realm does not allow for multiple threads (or processes) to
73 /// share a single instance of SharedGroup. Instead, each concurrently
74 /// executing thread or process must use a separate instance of
77 /// Each instance of SharedGroup manages a single transaction at a
78 /// time. That transaction can be either a read transaction, or a
79 /// write transaction.
81 /// Utility classes ReadTransaction and WriteTransaction are provided
82 /// to make it safe and easy to work with transactions in a scoped
83 /// manner (by means of the RAII idiom). However, transactions can
84 /// also be explicitly started (begin_read(), begin_write()) and
85 /// stopped (end_read(), commit(), rollback()).
87 /// If a transaction is active when the SharedGroup is destroyed, that
88 /// transaction is implicitly terminated, either by a call to
89 /// end_read() or rollback().
91 /// Two processes that want to share a database file must reside on
95 /// Desired exception behavior (not yet fully implemented)
96 /// ------------------------------------------------------
98 /// - If any data access API function throws an unexpected exception during a
99 /// read transaction, the shared group accessor is left in state "error
102 /// - If any data access API function throws an unexpected exception during a
103 /// write transaction, the shared group accessor is left in state "error
106 /// - If SharedGroup::begin_write() or SharedGroup::begin_read() throws an
107 /// unexpected exception, the shared group accessor is left in state "no
108 /// transaction in progress".
110 /// - SharedGroup::end_read() and SharedGroup::rollback() do not throw.
112 /// - If SharedGroup::commit() throws an unexpected exception, the shared group
113 /// accessor is left in state "error during write" and the transaction was
116 /// - If SharedGroup::advance_read() or SharedGroup::promote_to_write() throws
117 /// an unexpected exception, the shared group accessor is left in state
118 /// "error during read".
120 /// - If SharedGroup::commit_and_continue_as_read() or
121 /// SharedGroup::rollback_and_continue_as_read() throws an unexpected
122 /// exception, the shared group accessor is left in state "error during
125 /// It has not yet been decided exactly what an "unexpected exception" is, but
126 /// `std::bad_alloc` is surely one example. On the other hand, an expected
127 /// exception is one that is mentioned in the function specific documentation,
128 /// and is used to abort an operation due to a special, but expected condition.
133 /// - A newly created shared group accessor is in state "no transaction in
136 /// - In state "error during read", almost all Realm API functions are
137 /// illegal on the connected group of accessors. The only valid operations
138 /// are destruction of the shared group, and SharedGroup::end_read(). If
139 /// SharedGroup::end_read() is called, the new state becomes "no transaction
142 /// - In state "error during write", almost all Realm API functions are
143 /// illegal on the connected group of accessors. The only valid operations
144 /// are destruction of the shared group, and SharedGroup::rollback(). If
145 /// SharedGroup::end_write() is called, the new state becomes "no transaction
149 /// \brief Same as calling the corresponding version of open() on a instance
150 /// constructed in the unattached state. Exception safety note: if the
151 /// `upgrade_callback` throws, then the file will be closed properly and the
152 /// upgrade will be aborted.
153 explicit SharedGroup(const std::string& file, bool no_create = false,
154 const SharedGroupOptions options = SharedGroupOptions());
156 /// \brief Same as calling the corresponding version of open() on a instance
157 /// constructed in the unattached state. Exception safety note: if the
158 /// `upgrade_callback` throws, then the file will be closed properly and
159 /// the upgrade will be aborted.
160 explicit SharedGroup(Replication& repl, const SharedGroupOptions options = SharedGroupOptions());
162 struct unattached_tag {
165 /// Create a SharedGroup instance in its unattached state. It may
166 /// then be attached to a database file later by calling
167 /// open(). You may test whether this instance is currently in its
168 /// attached state by calling is_attached(). Calling any other
169 /// function (except the destructor) while in the unattached state
170 /// has undefined behavior.
171 SharedGroup(unattached_tag) noexcept;
173 ~SharedGroup() noexcept;
175 // Disable copying to prevent accessor errors. If you really want another
176 // instance, open another SharedGroup object on the same file.
177 SharedGroup(const SharedGroup&) = delete;
178 SharedGroup& operator=(const SharedGroup&) = delete;
180 /// Attach this SharedGroup instance to the specified database file.
182 /// While at least one instance of SharedGroup exists for a specific
183 /// database file, a "lock" file will be present too. The lock file will be
184 /// placed in the same directory as the database file, and its name will be
185 /// derived by appending ".lock" to the name of the database file.
187 /// When multiple SharedGroup instances refer to the same file, they must
188 /// specify the same durability level, otherwise an exception will be
191 /// \param file Filesystem path to a Realm database file.
193 /// \param no_create If the database file does not already exist, it will be
194 /// created (unless this is set to true.) When multiple threads are involved,
195 /// it is safe to let the first thread, that gets to it, create the file.
197 /// \param options See SharedGroupOptions for details of each option.
198 /// Sensible defaults are provided if this parameter is left out.
200 /// Calling open() on a SharedGroup instance that is already in the attached
201 /// state has undefined behavior.
203 /// \throw util::File::AccessError If the file could not be opened. If the
204 /// reason corresponds to one of the exception types that are derived from
205 /// util::File::AccessError, the derived exception type is thrown. Note that
206 /// InvalidDatabase is among these derived exception types.
208 /// \throw FileFormatUpgradeRequired only if \a SharedGroupOptions::allow_upgrade
209 /// is `false` and an upgrade is required.
210 void open(const std::string& file, bool no_create = false,
211 const SharedGroupOptions options = SharedGroupOptions());
213 /// Open this group in replication mode. The specified Replication instance
214 /// must remain in existence for as long as the SharedGroup.
215 void open(Replication&, const SharedGroupOptions options = SharedGroupOptions());
217 /// Close any open database, returning to the unattached state.
218 void close() noexcept;
220 /// A SharedGroup may be created in the unattached state, and then
221 /// later attached to a file with a call to open(). Calling any
222 /// function other than open(), is_attached(), and ~SharedGroup()
223 /// on an unattached instance results in undefined behavior.
224 bool is_attached() const noexcept;
226 /// Reserve disk space now to avoid allocation errors at a later
227 /// point in time, and to minimize on-disk fragmentation. In some
228 /// cases, less fragmentation translates into improved
231 /// When supported by the system, a call to this function will
232 /// make the database file at least as big as the specified size,
233 /// and cause space on the target device to be allocated (note
234 /// that on many systems on-disk allocation is done lazily by
235 /// default). If the file is already bigger than the specified
236 /// size, the size will be unchanged, and on-disk allocation will
237 /// occur only for the initial section that corresponds to the
238 /// specified size. On systems that do not support preallocation,
239 /// this function has no effect. To know whether preallocation is
240 /// supported by Realm on your platform, call
241 /// util::File::is_prealloc_supported().
243 /// It is an error to call this function on an unattached shared
244 /// group. Doing so will result in undefined behavior.
245 void reserve(size_t size_in_bytes);
247 /// Querying for changes:
250 /// "changed" means that one or more commits has been made to the database
251 /// since the SharedGroup (on which wait_for_change() is called) last
252 /// started, committed, promoted or advanced a transaction. If the
253 /// SharedGroup has not yet begun a transaction, "changed" is undefined.
255 /// No distinction is made between changes done by another process
256 /// and changes done by another thread in the same process as the caller.
258 /// Has db been changed ?
261 /// The calling thread goes to sleep until the database is changed, or
262 /// until wait_for_change_release() is called. After a call to
263 /// wait_for_change_release() further calls to wait_for_change() will return
264 /// immediately. To restore the ability to wait for a change, a call to
265 /// enable_wait_for_change() is required. Return true if the database has
266 /// changed, false if it might have.
267 bool wait_for_change();
269 /// release any thread waiting in wait_for_change() on *this* SharedGroup.
270 void wait_for_change_release();
272 /// re-enable waiting for change
273 void enable_wait_for_change();
276 using version_type = _impl::History::version_type;
277 using VersionID = realm::VersionID;
279 /// Thrown by begin_read() if the specified version does not correspond to a
280 /// bound (or tethered) snapshot.
283 /// \defgroup group_shared_transactions
286 /// begin_read() initiates a new read transaction. A read transaction is
287 /// bound to, and provides access to a particular snapshot of the underlying
288 /// Realm (in general the latest snapshot, but see \a version). It cannot be
289 /// used to modify the Realm, and in that sense, a read transaction is not a
290 /// real transaction.
292 /// begin_write() initiates a new write transaction. A write transaction
293 /// allows the application to both read and modify the underlying Realm
294 /// file. At most one write transaction can be in progress at any given time
295 /// for a particular underlying Realm file. If another write transaction is
296 /// already in progress, begin_write() will block the caller until the other
297 /// write transaction terminates. No guarantees are made about the order in
298 /// which multiple concurrent requests will be served.
300 /// It is an error to call begin_read() or begin_write() on a SharedGroup
301 /// object with an active read or write transaction.
303 /// If begin_read() or begin_write() throws, no transaction is initiated,
304 /// and the application may try to initiate a new read or write transaction
307 /// end_read() terminates the active read transaction. If no read
308 /// transaction is active, end_read() does nothing. It is an error to call
309 /// this function on a SharedGroup object with an active write
310 /// transaction. end_read() does not throw.
312 /// commit() commits all changes performed in the context of the active
313 /// write transaction, and thereby terminates that transaction. This
314 /// produces a new snapshot in the underlying Realm. commit() returns the
315 /// version associated with the new snapshot. It is an error to call
316 /// commit() when there is no active write transaction. If commit() throws,
317 /// no changes will have been committed, and the transaction will still be
318 /// active, but in a bad state. In that case, the application must either
319 /// call rollback() to terminate the bad transaction (in which case a new
320 /// transaction can be initiated), call close() which also terminates the
321 /// bad transaction, or destroy the SharedGroup object entirely. When the
322 /// transaction is in a bad state, the application is not allowed to call
323 /// any method on the Group accessor or on any of its subordinate accessors
324 /// (Table, Row, Descriptor). Note that the transaction is also left in a
325 /// bad state when a modifying operation on any subordinate accessor throws.
327 /// rollback() terminates the active write transaction and discards any
328 /// changes performed in the context of it. If no write transaction is
329 /// active, rollback() does nothing. It is an error to call this function in
330 /// a SharedGroup object with an active read transaction. rollback() does
333 /// the Group accessor and all subordinate accessors (Table, Row,
334 /// Descriptor) that are obtained in the context of a particular read or
335 /// write transaction will become detached upon termination of that
336 /// transaction, which means that they can no longer be used to access the
337 /// underlying objects.
339 /// Subordinate accessors that were detached at the end of the previous
340 /// read or write transaction will not be automatically reattached when a
341 /// new transaction is initiated. The application must reobtain new
342 /// accessors during a new transaction to regain access to the underlying
345 /// \param version If specified, this must be the version associated with a
346 /// *bound* snapshot. A snapshot is said to be bound (or tethered) if there
347 /// is at least one active read or write transaction bound to it. A read
348 /// transaction is bound to the snapshot that it provides access to. A write
349 /// transaction is bound to the latest snapshot available at the time of
350 /// initiation of the write transaction. If the specified version is not
351 /// associated with a bound snapshot, this function throws BadVersion.
353 /// \throw BadVersion Thrown by begin_read() if the specified version does
354 /// not correspond to a bound (or tethered) snapshot.
356 const Group& begin_read(VersionID version = VersionID());
357 void end_read() noexcept;
358 Group& begin_write();
359 // Return true (and take the write lock) if there is no other write
360 // in progress. In case of contention return false immediately.
361 // If the write lock is obtained, also provide the Group associated
362 // with the SharedGroup for further operations.
363 bool try_begin_write(Group*& group);
364 version_type commit();
365 void rollback() noexcept;
366 // report statistics of last commit done on THIS shared group.
367 // The free space reported is what can be expected to be freed
368 // by compact(). This may not correspond to the space which is free
369 // at the point where get_stats() is called, since that will include
370 // memory required to hold older versions of data, which still
371 // needs to be available.
372 void get_stats(size_t& free_space, size_t& used_space);
381 /// Get the current transaction type
382 TransactStage get_transact_stage() const noexcept;
384 /// Get a version id which may be used to request a different SharedGroup
385 /// to start transaction at a specific version.
386 VersionID get_version_of_current_transaction();
388 /// Report the number of distinct versions currently stored in the database.
389 /// Note: the database only cleans up versions as part of commit, so ending
390 /// a read transaction will not immediately release any versions.
391 uint_fast64_t get_number_of_versions();
393 /// Compact the database file.
394 /// - The method will throw if called inside a transaction.
395 /// - The method will throw if called in unattached state.
396 /// - The method will return false if other SharedGroups are accessing the
397 /// database in which case compaction is not done. This is not
398 /// necessarily an error.
399 /// It will return true following successful compaction.
400 /// While compaction is in progress, attempts by other
401 /// threads or processes to open the database will wait.
402 /// Be warned that resource requirements for compaction is proportional to
403 /// the amount of live data in the database.
404 /// Compaction works by writing the database contents to a temporary
405 /// database file and then replacing the database with the temporary one.
406 /// The name of the temporary file is formed by appending
407 /// ".tmp_compaction_space" to the name of the database
409 /// FIXME: This function is not yet implemented in an exception-safe manner,
410 /// therefore, if it throws, the application should not attempt to
411 /// continue. If may not even be safe to destroy the SharedGroup object.
413 /// WARNING / FIXME: compact() should NOT be exposed publicly on Windows
414 /// because it's not crash safe! It may corrupt your database if something fails
421 /// To handover a table view, query, linkview or row accessor of type T, you
422 /// must wrap it into a Handover<T> for the transfer. Wrapping and
423 /// unwrapping of a handover object is done by the methods
424 /// 'export_for_handover()' and 'import_from_handover()' declared below.
425 /// 'export_for_handover()' returns a Handover object, and
426 /// 'import_for_handover()' consumes that object, producing a new accessor
427 /// which is ready for use in the context of the importing SharedGroup.
429 /// The Handover always creates a new accessor object at the importing side.
430 /// For TableViews, there are 3 forms of handover.
432 /// - with payload move: the payload is handed over and ends up as a payload
433 /// held by the accessor at the importing side. The accessor on the
434 /// exporting side will rerun its query and generate a new payload, if
435 /// TableView::sync_if_needed() is called. If the original payload was in
436 /// sync at the exporting side, it will also be in sync at the importing
437 /// side. This is indicated to handover_export() by the argument
438 /// MutableSourcePayload::Move
440 /// - with payload copy: a copy of the payload is handed over, so both the
441 /// accessors on the exporting side *and* the accessors created at the
442 /// importing side has their own payload. This is indicated to
443 /// handover_export() by the argument ConstSourcePayload::Copy
445 /// - without payload: the payload stays with the accessor on the exporting
446 /// side. On the importing side, the new accessor is created without
447 /// payload. A call to TableView::sync_if_needed() will trigger generation
448 /// of a new payload. This form of handover is indicated to
449 /// handover_export() by the argument ConstSourcePayload::Stay.
451 /// For all other (non-TableView) accessors, handover is done with payload
452 /// copy, since the payload is trivial.
454 /// Handover *without* payload is useful when you want to ship a tableview
455 /// with its query for execution in a background thread. Handover with
456 /// *payload move* is useful when you want to transfer the result back.
458 /// Handover *without* payload or with payload copy is guaranteed *not* to
459 /// change the accessors on the exporting side.
461 /// Handover is *not* thread safe and should be carried out
462 /// by the thread that "owns" the involved accessors.
464 /// Handover is transitive:
465 /// If the object being handed over depends on other views
466 /// (table- or link- ), those objects will be handed over as well. The mode
467 /// of handover (payload copy, payload move, without payload) is applied
468 /// recursively. Note: If you are handing over a tableview dependent upon
469 /// another tableview and using MutableSourcePayload::Move,
470 /// you are on thin ice!
472 /// On the importing side, the top-level accessor being created during
473 /// import takes ownership of all other accessors (if any) being created as
474 /// part of the import.
476 /// Type used to support handover of accessors between shared groups.
477 template <typename T>
480 /// thread-safe/const export (mode is Stay or Copy)
481 /// during export, the following operations on the shared group is locked:
482 /// - advance_read(), promote_to_write(), commit_and_continue_as_read(),
483 /// rollback_and_continue_as_read(), close()
484 template <typename T>
485 std::unique_ptr<Handover<T>> export_for_handover(const T& accessor, ConstSourcePayload mode);
487 // specialization for handover of Rows
488 template <typename T>
489 std::unique_ptr<Handover<BasicRow<T>>> export_for_handover(const BasicRow<T>& accessor);
491 // destructive export (mode is Move)
492 template <typename T>
493 std::unique_ptr<Handover<T>> export_for_handover(T& accessor, MutableSourcePayload mode);
495 /// Import an accessor wrapped in a handover object. The import will fail
496 /// if the importing SharedGroup is viewing a version of the database that
497 /// is different from the exporting SharedGroup. The call to
498 /// import_from_handover is not thread-safe.
499 template <typename T>
500 std::unique_ptr<T> import_from_handover(std::unique_ptr<Handover<T>> handover);
502 // We need two cases for handling of LinkViews, because they are ref counted.
503 std::unique_ptr<Handover<LinkView>> export_linkview_for_handover(const LinkViewRef& accessor);
504 LinkViewRef import_linkview_from_handover(std::unique_ptr<Handover<LinkView>> handover);
506 // likewise for Tables.
507 std::unique_ptr<Handover<Table>> export_table_for_handover(const TableRef& accessor);
508 TableRef import_table_from_handover(std::unique_ptr<Handover<Table>> handover);
510 /// When doing handover to background tasks that may be run later, we
511 /// may want to momentarily pin the current version until the other thread
512 /// has retrieved it.
514 /// Pinning can be done in both read- and write-transactions, but with different
515 /// semantics. When pinning during a read-transaction, the version pinned is the
516 /// one accessible during the read-transaction. When pinning during a write-transaction,
517 /// the version pinned will be the last version that was succesfully committed to the
518 /// realm file at the point in time, when the write-transaction was started.
520 /// The release is not thread-safe, so it has to be done on the SharedGroup
521 /// associated with the thread calling unpin_version(), and the SharedGroup
522 /// must be attached to the realm file at the point of unpinning.
524 // Pin version for handover (not thread safe)
525 VersionID pin_version();
527 // Release pinned version (not thread safe)
528 void unpin_version(VersionID version);
531 std::shared_ptr<metrics::Metrics> get_metrics();
532 #endif // REALM_METRICS
534 // Try to grab a exclusive lock of the given realm path's lock file. If the lock
535 // can be acquired, the callback will be executed with the lock and then return true.
536 // Otherwise false will be returned directly.
537 // The lock taken precludes races with other threads or processes accessing the
538 // files through a SharedGroup.
539 // It is safe to delete/replace realm files inside the callback.
540 // WARNING: It is not safe to delete the lock file in the callback.
541 using CallbackWithLock = std::function<void(const std::string& realm_path)>;
542 static bool call_with_lock(const std::string& realm_path, CallbackWithLock callback);
544 // Return a list of files/directories core may use of the given realm file path.
545 // The first element of the pair in the returned list is the path string, the
546 // second one is to indicate the path is a directory or not.
547 // The temporary files are not returned by this function.
548 // It is safe to delete those returned files/directories in the call_with_lock's callback.
549 static std::vector<std::pair<std::string, bool>> get_core_files(const std::string& realm_path);
554 struct ReadLockInfo {
555 uint_fast64_t m_version = std::numeric_limits<version_type>::max();
556 uint_fast32_t m_reader_idx = 0;
557 ref_type m_top_ref = 0;
558 size_t m_file_size = 0;
560 class ReadLockUnlockGuard;
563 size_t m_free_space = 0;
564 size_t m_used_space = 0;
566 ReadLockInfo m_read_lock;
567 uint_fast32_t m_local_max_entry;
569 util::File::Map<SharedInfo> m_file_map; // Never remapped
570 util::File::Map<SharedInfo> m_reader_map;
571 bool m_wait_for_change_enabled;
572 std::string m_lockfile_path;
573 std::string m_lockfile_prefix;
574 std::string m_db_path;
575 std::string m_coordination_dir;
577 TransactStage m_transact_stage;
578 util::InterprocessMutex m_writemutex;
579 #ifdef REALM_ASYNC_DAEMON
580 util::InterprocessMutex m_balancemutex;
582 util::InterprocessMutex m_controlmutex;
583 #ifdef REALM_ASYNC_DAEMON
584 util::InterprocessCondVar m_room_to_write;
585 util::InterprocessCondVar m_work_to_do;
586 util::InterprocessCondVar m_daemon_becomes_ready;
588 util::InterprocessCondVar m_new_commit_available;
589 util::InterprocessCondVar m_pick_next_writer;
590 std::function<void(int, int)> m_upgrade_callback;
593 std::shared_ptr<metrics::Metrics> m_metrics;
594 #endif // REALM_METRICS
596 void do_open(const std::string& file, bool no_create, bool is_backend, const SharedGroupOptions options);
598 // Ring buffer management
599 bool ringbuf_is_empty() const noexcept;
600 size_t ringbuf_size() const noexcept;
601 size_t ringbuf_capacity() const noexcept;
602 bool ringbuf_is_first(size_t ndx) const noexcept;
603 void ringbuf_remove_first() noexcept;
604 size_t ringbuf_find(uint64_t version) const noexcept;
605 ReadCount& ringbuf_get(size_t ndx) noexcept;
606 ReadCount& ringbuf_get_first() noexcept;
607 ReadCount& ringbuf_get_last() noexcept;
608 void ringbuf_put(const ReadCount& v);
609 void ringbuf_expand();
611 /// Grab a read lock on the snapshot associated with the specified
612 /// version. If `version_id == VersionID()`, a read lock will be grabbed on
613 /// the latest available snapshot. Fails if the snapshot is no longer
616 /// As a side effect update memory mapping to ensure that the ringbuffer
617 /// entries referenced in the readlock info is accessible.
619 /// FIXME: It needs to be made more clear exactly under which conditions
620 /// this function fails. Also, why is it useful to promise anything about
621 /// detection of bad versions? Can we really promise enough to make such a
622 /// promise useful to the caller?
623 void grab_read_lock(ReadLockInfo&, VersionID);
625 // Release a specific read lock. The read lock MUST have been obtained by a
626 // call to grab_read_lock().
627 void release_read_lock(ReadLockInfo&) noexcept;
629 void do_begin_read(VersionID, bool writable);
630 void do_end_read() noexcept;
631 /// return true if write transaction can commence, false otherwise.
632 bool do_try_begin_write();
633 void do_begin_write();
634 version_type do_commit();
635 void do_end_write() noexcept;
636 void set_transact_stage(TransactStage stage) noexcept;
638 /// Returns the version of the latest snapshot.
639 version_type get_version_of_latest_snapshot();
641 /// Returns the version of the snapshot bound in the current read or write
642 /// transaction. It is an error to call this function when no transaction is
644 version_type get_version_of_bound_snapshot() const noexcept;
646 // make sure the given index is within the currently mapped area.
647 // if not, expand the mapped area. Returns true if the area is expanded.
648 bool grow_reader_mapping(uint_fast32_t index);
650 // Must be called only by someone that has a lock on the write
652 void low_level_commit(uint_fast64_t new_version);
654 void do_async_commits();
656 /// Upgrade file format and/or history schema
657 void upgrade_file_format(bool allow_file_format_upgrade, int target_file_format_version,
658 int current_hist_schema_version, int target_hist_schema_version);
661 /// See LangBindHelper.
663 void advance_read(O* observer, VersionID);
665 void promote_to_write(O* observer);
666 version_type commit_and_continue_as_read();
668 void rollback_and_continue_as_read(O* observer);
671 /// Returns true if, and only if _impl::History::update_early_from_top_ref()
672 /// was called during the execution of this function.
674 bool do_advance_read(O* observer, VersionID, _impl::History&);
676 /// If there is an associated \ref Replication object, then this function
677 /// returns `repl->get_history()` where `repl` is that Replication object,
678 /// otherwise this function returns null.
679 _impl::History* get_history();
681 int get_file_format_version() const noexcept;
683 /// finish up the process of starting a write transaction. Internal use only.
684 void finish_begin_write();
686 void close_internal(std::unique_lock<InterprocessMutex>) noexcept;
687 friend class _impl::SharedGroupFriend;
691 inline void SharedGroup::get_stats(size_t& free_space, size_t& used_space) {
692 free_space = m_free_space;
693 used_space = m_used_space;
697 class ReadTransaction {
699 ReadTransaction(SharedGroup& sg)
702 m_shared_group.begin_read(); // Throws
705 ~ReadTransaction() noexcept
707 m_shared_group.end_read();
710 bool has_table(StringData name) const noexcept
712 return get_group().has_table(name);
715 ConstTableRef get_table(size_t table_ndx) const
717 return get_group().get_table(table_ndx); // Throws
720 ConstTableRef get_table(StringData name) const
722 return get_group().get_table(name); // Throws
725 const Group& get_group() const noexcept;
727 /// Get the version of the snapshot to which this read transaction is bound.
728 SharedGroup::version_type get_version() const noexcept;
731 SharedGroup& m_shared_group;
735 class WriteTransaction {
737 WriteTransaction(SharedGroup& sg)
738 : m_shared_group(&sg)
740 m_shared_group->begin_write(); // Throws
743 ~WriteTransaction() noexcept
746 m_shared_group->rollback();
749 bool has_table(StringData name) const noexcept
751 return get_group().has_table(name);
754 TableRef get_table(size_t table_ndx) const
756 return get_group().get_table(table_ndx); // Throws
759 TableRef get_table(StringData name) const
761 return get_group().get_table(name); // Throws
764 TableRef add_table(StringData name, bool require_unique_name = true) const
766 return get_group().add_table(name, require_unique_name); // Throws
769 TableRef get_or_add_table(StringData name, bool* was_added = nullptr) const
771 return get_group().get_or_add_table(name, was_added); // Throws
774 Group& get_group() const noexcept;
776 /// Get the version of the snapshot on which this write transaction is
778 SharedGroup::version_type get_version() const noexcept;
780 SharedGroup::version_type commit()
782 REALM_ASSERT(m_shared_group);
783 SharedGroup::version_type new_version = m_shared_group->commit();
784 m_shared_group = nullptr;
788 void rollback() noexcept
790 REALM_ASSERT(m_shared_group);
791 m_shared_group->rollback();
792 m_shared_group = nullptr;
796 SharedGroup* m_shared_group;
802 struct SharedGroup::BadVersion : std::exception {
805 inline SharedGroup::SharedGroup(const std::string& file, bool no_create, const SharedGroupOptions options)
806 : m_group(Group::shared_tag())
807 , m_upgrade_callback(std::move(options.upgrade_callback))
809 open(file, no_create, options); // Throws
812 inline SharedGroup::SharedGroup(unattached_tag) noexcept
813 : m_group(Group::shared_tag())
817 inline SharedGroup::SharedGroup(Replication& repl, const SharedGroupOptions options)
818 : m_group(Group::shared_tag())
819 , m_upgrade_callback(std::move(options.upgrade_callback))
821 open(repl, options); // Throws
824 inline void SharedGroup::open(const std::string& path, bool no_create_file, const SharedGroupOptions options)
826 // Exception safety: Since open() is called from constructors, if it throws,
827 // it must leave the file closed.
829 bool is_backend = false;
830 do_open(path, no_create_file, is_backend, options); // Throws
833 inline void SharedGroup::open(Replication& repl, const SharedGroupOptions options)
835 // Exception safety: Since open() is called from constructors, if it throws,
836 // it must leave the file closed.
838 REALM_ASSERT(!is_attached());
840 repl.initialize(*this); // Throws
842 typedef _impl::GroupFriend gf;
843 gf::set_replication(m_group, &repl);
845 std::string file = repl.get_database_path();
846 bool no_create = false;
847 bool is_backend = false;
848 do_open(file, no_create, is_backend, options); // Throws
851 inline bool SharedGroup::is_attached() const noexcept
853 return m_file_map.is_attached();
856 inline SharedGroup::TransactStage SharedGroup::get_transact_stage() const noexcept
858 return m_transact_stage;
861 inline SharedGroup::version_type SharedGroup::get_version_of_bound_snapshot() const noexcept
863 return m_read_lock.m_version;
866 class SharedGroup::ReadLockUnlockGuard {
868 ReadLockUnlockGuard(SharedGroup& shared_group, ReadLockInfo& read_lock) noexcept
869 : m_shared_group(shared_group)
870 , m_read_lock(&read_lock)
873 ~ReadLockUnlockGuard() noexcept
876 m_shared_group.release_read_lock(*m_read_lock);
878 void release() noexcept
884 SharedGroup& m_shared_group;
885 ReadLockInfo* m_read_lock;
889 template <typename T>
890 struct SharedGroup::Handover {
891 std::unique_ptr<typename T::HandoverPatch> patch;
892 std::unique_ptr<T> clone;
896 template <typename T>
897 std::unique_ptr<SharedGroup::Handover<T>> SharedGroup::export_for_handover(const T& accessor, ConstSourcePayload mode)
899 if (m_transact_stage != transact_Reading)
900 throw LogicError(LogicError::wrong_transact_state);
901 std::unique_ptr<Handover<T>> result(new Handover<T>());
902 // Implementation note:
903 // often, the return value from clone will be T*, BUT it may be ptr to some
904 // base of T instead, so we must cast it to T*. This is always safe, because
905 // no matter the type, clone() will clone the actual accessor instance, and
906 // hence return an instance of the same type.
907 result->clone.reset(dynamic_cast<T*>(accessor.clone_for_handover(result->patch, mode).release()));
908 result->version = get_version_of_current_transaction();
913 template <typename T>
914 std::unique_ptr<SharedGroup::Handover<BasicRow<T>>> SharedGroup::export_for_handover(const BasicRow<T>& accessor)
916 if (m_transact_stage != transact_Reading)
917 throw LogicError(LogicError::wrong_transact_state);
918 std::unique_ptr<Handover<BasicRow<T>>> result(new Handover<BasicRow<T>>());
919 // See implementation note above.
920 result->clone.reset(dynamic_cast<BasicRow<T>*>(accessor.clone_for_handover(result->patch).release()));
921 result->version = get_version_of_current_transaction();
926 template <typename T>
927 std::unique_ptr<SharedGroup::Handover<T>> SharedGroup::export_for_handover(T& accessor, MutableSourcePayload mode)
929 if (m_transact_stage != transact_Reading)
930 throw LogicError(LogicError::wrong_transact_state);
931 std::unique_ptr<Handover<T>> result(new Handover<T>());
932 // see implementation note above.
933 result->clone.reset(dynamic_cast<T*>(accessor.clone_for_handover(result->patch, mode).release()));
934 result->version = get_version_of_current_transaction();
939 template <typename T>
940 std::unique_ptr<T> SharedGroup::import_from_handover(std::unique_ptr<SharedGroup::Handover<T>> handover)
942 if (handover->version != get_version_of_current_transaction()) {
945 std::unique_ptr<T> result = move(handover->clone);
946 result->apply_and_consume_patch(handover->patch, m_group);
951 inline void SharedGroup::advance_read(O* observer, VersionID version_id)
953 if (m_transact_stage != transact_Reading)
954 throw LogicError(LogicError::wrong_transact_state);
956 // It is an error if the new version precedes the currently bound one.
957 if (version_id.version < m_read_lock.m_version)
958 throw LogicError(LogicError::bad_version);
960 _impl::History* hist = get_history(); // Throws
962 throw LogicError(LogicError::no_history);
964 do_advance_read(observer, version_id, *hist); // Throws
968 inline void SharedGroup::promote_to_write(O* observer)
970 if (m_transact_stage != transact_Reading)
971 throw LogicError(LogicError::wrong_transact_state);
973 _impl::History* hist = get_history(); // Throws
975 throw LogicError(LogicError::no_history);
977 do_begin_write(); // Throws
979 VersionID version = VersionID(); // Latest
980 bool history_updated = do_advance_read(observer, version, *hist); // Throws
982 Replication* repl = m_group.get_replication();
983 REALM_ASSERT(repl); // Presence of `repl` follows from the presence of `hist`
984 version_type current_version = m_read_lock.m_version;
985 repl->initiate_transact(current_version, history_updated); // Throws
987 // If the group has no top array (top_ref == 0), create a new node
988 // structure for an empty group now, to be ready for modifications. See
989 // also Group::attach_shared().
990 using gf = _impl::GroupFriend;
991 gf::create_empty_group_when_missing(m_group); // Throws
998 set_transact_stage(transact_Writing);
1002 inline void SharedGroup::rollback_and_continue_as_read(O* observer)
1004 if (m_transact_stage != transact_Writing)
1005 throw LogicError(LogicError::wrong_transact_state);
1007 _impl::History* hist = get_history(); // Throws
1009 throw LogicError(LogicError::no_history);
1011 // Mark all managed space (beyond the attached file) as free.
1012 using gf = _impl::GroupFriend;
1013 gf::reset_free_space_tracking(m_group); // Throws
1015 BinaryData uncommitted_changes = hist->get_uncommitted_changes();
1017 // FIXME: We are currently creating two transaction log parsers, one here,
1018 // and one in advance_transact(). That is wasteful as the parser creation is
1020 _impl::SimpleInputStream in(uncommitted_changes.data(), uncommitted_changes.size());
1021 _impl::TransactLogParser parser; // Throws
1022 _impl::TransactReverser reverser;
1023 parser.parse(in, reverser); // Throws
1025 if (observer && uncommitted_changes.size()) {
1026 _impl::ReversedNoCopyInputStream reversed_in(reverser);
1027 parser.parse(reversed_in, *observer); // Throws
1028 observer->parse_complete(); // Throws
1031 ref_type top_ref = m_read_lock.m_top_ref;
1032 size_t file_size = m_read_lock.m_file_size;
1033 _impl::ReversedNoCopyInputStream reversed_in(reverser);
1034 gf::advance_transact(m_group, top_ref, file_size, reversed_in); // Throws
1038 Replication* repl = gf::get_replication(m_group);
1039 REALM_ASSERT(repl); // Presence of `repl` follows from the presence of `hist`
1040 repl->abort_transact();
1042 set_transact_stage(transact_Reading);
1046 inline bool SharedGroup::do_advance_read(O* observer, VersionID version_id, _impl::History& hist)
1048 ReadLockInfo new_read_lock;
1049 grab_read_lock(new_read_lock, version_id); // Throws
1050 REALM_ASSERT(new_read_lock.m_version >= m_read_lock.m_version);
1051 if (new_read_lock.m_version == m_read_lock.m_version) {
1052 release_read_lock(new_read_lock);
1053 // _impl::History::update_early_from_top_ref() was not called
1057 ReadLockUnlockGuard g(*this, new_read_lock);
1059 version_type new_version = new_read_lock.m_version;
1060 size_t new_file_size = new_read_lock.m_file_size;
1061 ref_type new_top_ref = new_read_lock.m_top_ref;
1063 // Synchronize readers view of the file
1064 SlabAlloc& alloc = m_group.m_alloc;
1065 alloc.update_reader_view(new_file_size);
1067 hist.update_early_from_top_ref(new_version, new_file_size, new_top_ref); // Throws
1071 // This has to happen in the context of the originally bound snapshot
1072 // and while the read transaction is still in a fully functional state.
1073 _impl::TransactLogParser parser;
1074 version_type old_version = m_read_lock.m_version;
1075 version_type new_version = new_read_lock.m_version;
1076 _impl::ChangesetInputStream in(hist, old_version, new_version);
1077 parser.parse(in, *observer); // Throws
1078 observer->parse_complete(); // Throws
1081 // The old read lock must be retained for as long as the change history is
1082 // accessed (until Group::advance_transact() returns). This ensures that the
1083 // oldest needed changeset remains in the history, even when the history is
1084 // implemented as a separate unversioned entity outside the Realm (i.e., the
1085 // old implementation and ShortCircuitHistory in
1086 // test_lang_Bind_helper.cpp). On the other hand, if it had been the case,
1087 // that the history was always implemented as a versioned entity, that was
1088 // part of the Realm state, then it would not have been necessary to retain
1089 // the old read lock beyond this point.
1092 version_type old_version = m_read_lock.m_version;
1093 version_type new_version = new_read_lock.m_version;
1094 ref_type new_top_ref = new_read_lock.m_top_ref;
1095 size_t new_file_size = new_read_lock.m_file_size;
1096 _impl::ChangesetInputStream in(hist, old_version, new_version);
1097 m_group.advance_transact(new_top_ref, new_file_size, in); // Throws
1101 release_read_lock(m_read_lock);
1102 m_read_lock = new_read_lock;
1104 return true; // _impl::History::update_early_from_top_ref() was called
1107 inline _impl::History* SharedGroup::get_history()
1109 using gf = _impl::GroupFriend;
1110 if (Replication* repl = gf::get_replication(m_group))
1111 return repl->get_history();
1115 inline int SharedGroup::get_file_format_version() const noexcept
1117 using gf = _impl::GroupFriend;
1118 return gf::get_file_format_version(m_group);
1122 // The purpose of this class is to give internal access to some, but
1123 // not all of the non-public parts of the SharedGroup class.
1124 class _impl::SharedGroupFriend {
1126 static Group& get_group(SharedGroup& sg) noexcept
1132 static void advance_read(SharedGroup& sg, O* obs, SharedGroup::VersionID ver)
1134 sg.advance_read(obs, ver); // Throws
1138 static void promote_to_write(SharedGroup& sg, O* obs)
1140 sg.promote_to_write(obs); // Throws
1143 static SharedGroup::version_type commit_and_continue_as_read(SharedGroup& sg)
1145 return sg.commit_and_continue_as_read(); // Throws
1149 static void rollback_and_continue_as_read(SharedGroup& sg, O* obs)
1151 sg.rollback_and_continue_as_read(obs); // Throws
1154 static void async_daemon_open(SharedGroup& sg, const std::string& file)
1156 bool no_create = true;
1157 bool is_backend = true;
1158 SharedGroupOptions options;
1159 options.durability = SharedGroupOptions::Durability::Async;
1160 options.encryption_key = nullptr;
1161 options.allow_file_format_upgrade = false;
1162 sg.do_open(file, no_create, is_backend, options); // Throws
1165 static int get_file_format_version(const SharedGroup& sg) noexcept
1167 return sg.get_file_format_version();
1170 static SharedGroup::version_type get_version_of_latest_snapshot(SharedGroup& sg)
1172 return sg.get_version_of_latest_snapshot();
1175 static SharedGroup::version_type get_version_of_bound_snapshot(const SharedGroup& sg) noexcept
1177 return sg.get_version_of_bound_snapshot();
1181 inline const Group& ReadTransaction::get_group() const noexcept
1183 using sgf = _impl::SharedGroupFriend;
1184 return sgf::get_group(m_shared_group);
1187 inline SharedGroup::version_type ReadTransaction::get_version() const noexcept
1189 using sgf = _impl::SharedGroupFriend;
1190 return sgf::get_version_of_bound_snapshot(m_shared_group);
1193 inline Group& WriteTransaction::get_group() const noexcept
1195 REALM_ASSERT(m_shared_group);
1196 using sgf = _impl::SharedGroupFriend;
1197 return sgf::get_group(*m_shared_group);
1200 inline SharedGroup::version_type WriteTransaction::get_version() const noexcept
1202 using sgf = _impl::SharedGroupFriend;
1203 return sgf::get_version_of_bound_snapshot(*m_shared_group);
1206 } // namespace realm
1208 #endif // REALM_GROUP_SHARED_HPP