1 ////////////////////////////////////////////////////////////////////////////
3 // Copyright 2016 Realm Inc.
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
17 ////////////////////////////////////////////////////////////////////////////
19 #include "sync/sync_session.hpp"
21 #include "sync/impl/sync_client.hpp"
22 #include "sync/impl/sync_file.hpp"
23 #include "sync/impl/sync_metadata.hpp"
24 #include "sync/sync_manager.hpp"
25 #include "sync/sync_user.hpp"
27 #include <realm/sync/client.hpp>
28 #include <realm/sync/protocol.hpp>
31 using namespace realm;
32 using namespace realm::_impl;
33 using namespace realm::_impl::sync_session_states;
35 using SessionWaiterPointer = void(sync::Session::*)(std::function<void(std::error_code)>);
37 constexpr const char SyncError::c_original_file_path_key[];
38 constexpr const char SyncError::c_recovery_file_path_key[];
40 /// A state which a `SyncSession` can currently be within. State classes handle various actions
41 /// and state transitions.
45 /// WAITING_FOR_ACCESS_TOKEN: upon entering this state, the binding is informed
46 /// that the session wants an access token. The session is now waiting for the
47 /// binding to provide the token.
50 /// * ACTIVE: when the binding successfully refreshes the token
51 /// * INACTIVE: if asked to log out, or if asked to close and the stop policy
54 /// ACTIVE: the session is connected to the Realm Object Server and is actively
55 /// transferring data.
56 /// From: WAITING_FOR_ACCESS_TOKEN, DYING
58 /// * WAITING_FOR_ACCESS_TOKEN: if the session is informed (through the error
59 /// handler) that the token expired
60 /// * INACTIVE: if asked to log out, or if asked to close and the stop policy
62 /// * DYING: if asked to close and the stop policy is AfterChangesUploaded
64 /// DYING: the session is performing clean-up work in preparation to be destroyed.
67 /// * INACTIVE: when the clean-up work completes, if the session wasn't
68 /// revived, or if explicitly asked to log out before the
69 /// clean-up work begins
70 /// * ACTIVE: if the session is revived
72 /// INACTIVE: the user owning this session has logged out, the `sync::Session`
73 /// owned by this session is destroyed, and the session is quiescent.
74 /// Note that a session briefly enters this state before being destroyed, but
75 /// it can also enter this state and stay there if the user has been logged out.
76 /// From: initial, WAITING_FOR_ACCESS_TOKEN, ACTIVE, DYING
78 /// * WAITING_FOR_ACCESS_TOKEN: if the session is revived
80 struct SyncSession::State {
83 // Move the given session into this state. All state transitions MUST be carried out through this method.
84 virtual void enter_state(std::unique_lock<std::mutex>&, SyncSession&) const { }
86 virtual void refresh_access_token(std::unique_lock<std::mutex>&,
87 SyncSession&, std::string,
88 const util::Optional<std::string>&) const { }
90 // Returns true iff the lock is still locked when the method returns.
91 virtual bool access_token_expired(std::unique_lock<std::mutex>&, SyncSession&) const { return true; }
93 virtual void nonsync_transact_notify(std::unique_lock<std::mutex>&, SyncSession&, sync::Session::version_type) const { }
95 // Perform any work needed to reactivate a session that is not already active.
96 // Returns true iff the session should ask the binding to get a token for `bind()`.
97 virtual bool revive_if_needed(std::unique_lock<std::mutex>&, SyncSession&) const { return false; }
99 // Perform any work needed to respond to the application regaining network connectivity.
100 virtual void handle_reconnect(std::unique_lock<std::mutex>&, SyncSession&) const { };
102 // The user that owns this session has been logged out, and the session should take appropriate action.
103 virtual void log_out(std::unique_lock<std::mutex>&, SyncSession&) const { }
105 // The session should be closed and moved to `inactive`, in accordance with its stop policy and other state.
106 virtual void close(std::unique_lock<std::mutex>&, SyncSession&) const { }
108 // Returns true iff the error has been fully handled and the error handler should immediately return.
109 virtual bool handle_error(std::unique_lock<std::mutex>&, SyncSession&, const SyncError&) const { return false; }
111 // Register a handler to wait for sync session uploads, downloads, or synchronization.
112 // PRECONDITION: the session state lock must be held at the time this method is called, until after it returns.
113 // Returns true iff the handler was registered, either immediately or placed in a queue for later registration.
114 virtual bool wait_for_completion(SyncSession&,
115 std::function<void(std::error_code)>,
116 SessionWaiterPointer) const {
120 virtual void override_server(std::unique_lock<std::mutex>&, SyncSession&, std::string, int) const { }
122 static const State& waiting_for_access_token;
123 static const State& active;
124 static const State& dying;
125 static const State& inactive;
128 struct sync_session_states::WaitingForAccessToken : public SyncSession::State {
129 void enter_state(std::unique_lock<std::mutex>&, SyncSession& session) const override
131 session.m_deferred_close = false;
134 void refresh_access_token(std::unique_lock<std::mutex>& lock, SyncSession& session,
135 std::string access_token,
136 const util::Optional<std::string>& server_url) const override
138 session.create_sync_session();
140 // Since the sync session was previously unbound, it's safe to do this from the
142 if (!session.m_server_url) {
143 session.m_server_url = server_url;
145 if (session.m_session_has_been_bound) {
146 session.m_session->refresh(std::move(access_token));
147 session.m_session->cancel_reconnect_delay();
149 session.m_session->bind(*session.m_server_url, std::move(access_token));
150 session.m_session_has_been_bound = true;
153 if (session.m_server_override)
154 session.m_session->override_server(session.m_server_override->address, session.m_server_override->port);
156 // Register all the pending wait-for-completion blocks.
157 for (auto& package : session.m_completion_wait_packages) {
158 (*session.m_session.*package.waiter)(std::move(package.callback));
160 session.m_completion_wait_packages.clear();
162 // Handle any deferred commit notification.
163 if (session.m_deferred_commit_notification) {
164 session.m_session->nonsync_transact_notify(*session.m_deferred_commit_notification);
165 session.m_deferred_commit_notification = util::none;
168 session.advance_state(lock, active);
169 if (session.m_deferred_close) {
170 session.m_state->close(lock, session);
174 void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
176 session.advance_state(lock, inactive);
179 bool revive_if_needed(std::unique_lock<std::mutex>&, SyncSession& session) const override
181 session.m_deferred_close = false;
185 void handle_reconnect(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
187 // Ask the binding to retry getting the token for this session.
188 std::shared_ptr<SyncSession> session_ptr = session.shared_from_this();
190 session.m_config.bind_session_handler(session_ptr->m_realm_path, session_ptr->m_config, session_ptr);
193 void nonsync_transact_notify(std::unique_lock<std::mutex>&,
194 SyncSession& session,
195 sync::Session::version_type version) const override
197 // Notify at first available opportunity.
198 session.m_deferred_commit_notification = version;
201 void close(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
203 switch (session.m_config.stop_policy) {
204 case SyncSessionStopPolicy::Immediately:
205 // Immediately kill the session.
206 session.advance_state(lock, inactive);
208 case SyncSessionStopPolicy::LiveIndefinitely:
209 case SyncSessionStopPolicy::AfterChangesUploaded:
210 // Defer handling closing the session until after the login response succeeds.
211 session.m_deferred_close = true;
216 bool wait_for_completion(SyncSession& session,
217 std::function<void(std::error_code)> callback,
218 SessionWaiterPointer waiter) const override
220 session.m_completion_wait_packages.push_back({ waiter, std::move(callback) });
224 void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
225 std::string address, int port) const override
227 session.m_server_override = SyncSession::ServerOverride{address, port};
231 struct sync_session_states::Active : public SyncSession::State {
232 void refresh_access_token(std::unique_lock<std::mutex>&, SyncSession& session,
233 std::string access_token,
234 const util::Optional<std::string>&) const override
236 session.m_session->refresh(std::move(access_token));
237 // Cancel the session's reconnection delay. This is important if the
238 // token is being refreshed as a response to a 202 (token expired)
239 // error, or similar non-fatal sync errors.
240 session.m_session->cancel_reconnect_delay();
243 bool access_token_expired(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
245 session.advance_state(lock, waiting_for_access_token);
246 std::shared_ptr<SyncSession> session_ptr = session.shared_from_this();
248 session.m_config.bind_session_handler(session_ptr->m_realm_path, session_ptr->m_config, session_ptr);
252 void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
254 session.advance_state(lock, inactive);
257 void nonsync_transact_notify(std::unique_lock<std::mutex>&, SyncSession& session,
258 sync::Session::version_type version) const override
260 // Fully ready sync session, notify immediately.
261 session.m_session->nonsync_transact_notify(version);
264 void close(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
266 switch (session.m_config.stop_policy) {
267 case SyncSessionStopPolicy::Immediately:
268 session.advance_state(lock, inactive);
270 case SyncSessionStopPolicy::LiveIndefinitely:
271 // Don't do anything; session lives forever.
273 case SyncSessionStopPolicy::AfterChangesUploaded:
274 // Wait for all pending changes to upload.
275 session.advance_state(lock, dying);
280 bool wait_for_completion(SyncSession& session,
281 std::function<void(std::error_code)> callback,
282 SessionWaiterPointer waiter) const override
284 REALM_ASSERT(session.m_session);
285 (*session.m_session.*waiter)(std::move(callback));
289 void handle_reconnect(std::unique_lock<std::mutex>&, SyncSession& session) const override
291 session.m_session->cancel_reconnect_delay();
294 void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
295 std::string address, int port) const override
297 session.m_server_override = SyncSession::ServerOverride{address, port};
298 session.m_session->override_server(address, port);
302 struct sync_session_states::Dying : public SyncSession::State {
303 void enter_state(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
305 // If we have no session, we cannot possibly upload anything.
306 if (!session.m_session) {
307 session.advance_state(lock, inactive);
311 size_t current_death_count = ++session.m_death_count;
312 std::weak_ptr<SyncSession> weak_session = session.shared_from_this();
313 session.m_session->async_wait_for_upload_completion([weak_session, current_death_count](std::error_code) {
314 if (auto session = weak_session.lock()) {
315 std::unique_lock<std::mutex> lock(session->m_state_mutex);
316 if (session->m_state == &State::dying && session->m_death_count == current_death_count) {
317 session->advance_state(lock, inactive);
323 bool handle_error(std::unique_lock<std::mutex>& lock, SyncSession& session, const SyncError& error) const override
325 if (error.is_fatal) {
326 session.advance_state(lock, inactive);
328 // If the error isn't fatal, don't change state, but don't
329 // allow it to be reported either.
330 // FIXME: What if the token expires while a session is dying?
331 // Should we allow the token to be refreshed so that changes
332 // can finish being uploaded?
336 bool revive_if_needed(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
339 session.advance_state(lock, active);
343 void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
345 session.advance_state(lock, inactive);
348 bool wait_for_completion(SyncSession& session,
349 std::function<void(std::error_code)> callback,
350 SessionWaiterPointer waiter) const override
352 REALM_ASSERT(session.m_session);
353 (*session.m_session.*waiter)(std::move(callback));
357 void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
358 std::string address, int port) const override
360 session.m_server_override = SyncSession::ServerOverride{address, port};
361 session.m_session->override_server(address, port);
365 struct sync_session_states::Inactive : public SyncSession::State {
366 void enter_state(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
368 // Inform any queued-up completion handlers that they were cancelled.
369 for (auto& package : session.m_completion_wait_packages) {
370 package.callback(util::error::operation_aborted);
372 session.m_completion_wait_packages.clear();
373 session.m_session = nullptr;
374 session.unregister(lock);
377 bool revive_if_needed(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
379 session.advance_state(lock, waiting_for_access_token);
383 bool wait_for_completion(SyncSession& session,
384 std::function<void(std::error_code)> callback,
385 SessionWaiterPointer waiter) const override
387 session.m_completion_wait_packages.push_back({ waiter, std::move(callback) });
391 void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
392 std::string address, int port) const override
394 session.m_server_override = SyncSession::ServerOverride{address, port};
399 const SyncSession::State& SyncSession::State::waiting_for_access_token = WaitingForAccessToken();
400 const SyncSession::State& SyncSession::State::active = Active();
401 const SyncSession::State& SyncSession::State::dying = Dying();
402 const SyncSession::State& SyncSession::State::inactive = Inactive();
404 SyncSession::SyncSession(SyncClient& client, std::string realm_path, SyncConfig config)
405 : m_state(&State::inactive)
406 , m_config(std::move(config))
407 , m_realm_path(std::move(realm_path))
410 // Sync history validation ensures that the history within the Realm file is in a format that can be used
411 // by the version of realm-sync that we're using. Validation is enabled by default when the binding manually
412 // opens a sync session (via `SyncManager::get_session`), but is disabled when the sync session is opened
413 // as a side effect of opening a `Realm`. In that case, the sync history has already been validated by the
414 // act of opening the `Realm` so it's not necessary to repeat it here.
415 if (m_config.validate_sync_history) {
416 Realm::Config realm_config;
417 realm_config.path = m_realm_path;
418 realm_config.schema_mode = SchemaMode::Additive;
419 realm_config.force_sync_history = true;
420 realm_config.cache = false;
422 if (m_config.realm_encryption_key) {
423 realm_config.encryption_key.resize(64);
424 std::copy(m_config.realm_encryption_key->begin(), m_config.realm_encryption_key->end(),
425 realm_config.encryption_key.begin());
428 // FIXME: Opening a Realm only to discard it is relatively expensive. It may be preferable to have
429 // realm-sync open the Realm when the `sync::Session` is created since it can continue to use it.
430 Realm::get_shared_realm(realm_config); // Throws
434 std::string SyncSession::get_recovery_file_path()
436 return util::reserve_unique_file_name(SyncManager::shared().recovery_directory_path(),
437 util::create_timestamped_template("recovered_realm"));
440 void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup)
442 // Add a SyncFileActionMetadata marking the Realm as needing to be deleted.
443 std::string recovery_path;
444 auto original_path = path();
445 error.user_info[SyncError::c_original_file_path_key] = original_path;
446 if (should_backup == ShouldBackup::yes) {
447 recovery_path = get_recovery_file_path();
448 error.user_info[SyncError::c_recovery_file_path_key] = recovery_path;
450 using Action = SyncFileActionMetadata::Action;
451 auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm;
452 SyncManager::shared().perform_metadata_update([this,
454 original_path=std::move(original_path),
455 recovery_path=std::move(recovery_path)](const auto& manager) {
456 manager.make_file_action_metadata(original_path, m_config.realm_url(), m_config.user->identity(),
457 action, std::move(recovery_path));
461 // This method should only be called from within the error handler callback registered upon the underlying `m_session`.
462 void SyncSession::handle_error(SyncError error)
464 enum class NextStateAfterError { none, inactive, error };
465 auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
466 auto error_code = error.error_code;
469 // See if the current state wishes to take responsibility for handling the error.
470 std::unique_lock<std::mutex> lock(m_state_mutex);
471 if (m_state->handle_error(lock, *this, error)) {
476 if (error_code.category() == realm::sync::protocol_error_category()) {
477 using ProtocolError = realm::sync::ProtocolError;
478 switch (static_cast<ProtocolError>(error_code.value())) {
479 // Connection level errors
480 case ProtocolError::connection_closed:
481 case ProtocolError::other_error:
482 // Not real errors, don't need to be reported to the binding.
484 case ProtocolError::unknown_message:
485 case ProtocolError::bad_syntax:
486 case ProtocolError::limits_exceeded:
487 case ProtocolError::wrong_protocol_version:
488 case ProtocolError::bad_session_ident:
489 case ProtocolError::reuse_of_session_ident:
490 case ProtocolError::bound_in_other_session:
491 case ProtocolError::bad_message_order:
492 case ProtocolError::bad_client_version:
493 case ProtocolError::illegal_realm_path:
494 case ProtocolError::no_such_realm:
495 case ProtocolError::bad_changeset:
496 case ProtocolError::bad_changeset_header_syntax:
497 case ProtocolError::bad_changeset_size:
498 case ProtocolError::bad_changesets:
499 case ProtocolError::bad_decompression:
500 case ProtocolError::partial_sync_disabled:
503 case ProtocolError::session_closed:
504 case ProtocolError::other_session_error:
505 case ProtocolError::disabled_session:
506 // The binding doesn't need to be aware of these because they are strictly informational, and do not
507 // represent actual errors.
509 case ProtocolError::token_expired: {
510 std::unique_lock<std::mutex> lock(m_state_mutex);
511 // This isn't an error from the binding's point of view. If we're connected we'll
512 // simply ask the binding to log in again.
513 m_state->access_token_expired(lock, *this);
516 case ProtocolError::bad_authentication: {
517 std::shared_ptr<SyncUser> user_to_invalidate;
518 next_state = NextStateAfterError::none;
520 std::unique_lock<std::mutex> lock(m_state_mutex);
521 user_to_invalidate = user();
522 cancel_pending_waits();
524 if (user_to_invalidate)
525 user_to_invalidate->invalidate();
528 case ProtocolError::permission_denied: {
529 next_state = NextStateAfterError::inactive;
530 update_error_and_mark_file_for_deletion(error, ShouldBackup::no);
533 case ProtocolError::bad_server_file_ident:
534 case ProtocolError::bad_client_file_ident:
535 case ProtocolError::bad_server_version:
536 case ProtocolError::diverging_histories:
537 next_state = NextStateAfterError::inactive;
538 update_error_and_mark_file_for_deletion(error, ShouldBackup::yes);
541 } else if (error_code.category() == realm::sync::client_error_category()) {
542 using ClientError = realm::sync::Client::Error;
543 switch (static_cast<ClientError>(error_code.value())) {
544 case ClientError::connection_closed:
545 case ClientError::pong_timeout:
546 // Not real errors, don't need to be reported to the binding.
548 case ClientError::unknown_message:
549 case ClientError::bad_syntax:
550 case ClientError::limits_exceeded:
551 case ClientError::bad_session_ident:
552 case ClientError::bad_message_order:
553 case ClientError::bad_file_ident_pair:
554 case ClientError::bad_progress:
555 case ClientError::bad_changeset_header_syntax:
556 case ClientError::bad_changeset_size:
557 case ClientError::bad_origin_file_ident:
558 case ClientError::bad_server_version:
559 case ClientError::bad_changeset:
560 case ClientError::bad_request_ident:
561 case ClientError::bad_error_code:
562 case ClientError::bad_compression:
563 case ClientError::bad_client_version:
564 case ClientError::ssl_server_cert_rejected:
565 // Don't do anything special for these errors.
566 // Future functionality may require special-case handling for existing
567 // errors, or newly introduced error codes.
571 // Unrecognized error code.
572 error.is_unrecognized_by_client = true;
574 switch (next_state) {
575 case NextStateAfterError::none:
577 case NextStateAfterError::inactive: {
578 std::unique_lock<std::mutex> lock(m_state_mutex);
579 advance_state(lock, State::inactive);
582 case NextStateAfterError::error: {
583 std::unique_lock<std::mutex> lock(m_state_mutex);
584 cancel_pending_waits();
588 if (m_config.error_handler) {
589 m_config.error_handler(shared_from_this(), std::move(error));
593 void SyncSession::cancel_pending_waits()
595 // Inform any queued-up completion handlers that they were cancelled.
596 for (auto& package : m_completion_wait_packages) {
597 package.callback(util::error::operation_aborted);
599 m_completion_wait_packages.clear();
602 void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable,
603 uint64_t uploaded, uint64_t uploadable, bool is_fresh)
605 std::vector<std::function<void()>> invocations;
607 std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
608 m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded};
609 m_latest_progress_data_is_fresh = is_fresh;
611 for (auto it = m_notifiers.begin(); it != m_notifiers.end();) {
612 auto& package = it->second;
613 package.update(*m_current_progress, is_fresh);
615 bool should_delete = false;
616 invocations.emplace_back(package.create_invocation(*m_current_progress, should_delete));
618 it = (should_delete ? m_notifiers.erase(it) : std::next(it));
621 // Run the notifiers only after we've released the lock.
622 for (auto& invocation : invocations) {
627 void SyncSession::NotifierPackage::update(const Progress& current_progress, bool data_is_fresh)
629 if (is_streaming || captured_transferrable || !data_is_fresh)
632 captured_transferrable = direction == NotifierType::download ? current_progress.downloadable
633 : current_progress.uploadable;
636 // PRECONDITION: `update()` must first be called on the same package.
637 std::function<void()> SyncSession::NotifierPackage::create_invocation(const Progress& current_progress,
638 bool& is_expired) const
640 // It's possible for a non-streaming notifier to not yet have fresh transferrable bytes data.
641 // In that case, we don't call it at all.
642 // NOTE: `update()` is always called before `create_invocation()`, and will
643 // set `captured_transferrable` on the notifier package if fresh data has
644 // been received and the package is for a non-streaming notifier.
645 if (!is_streaming && !captured_transferrable)
648 bool is_download = direction == NotifierType::download;
649 uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
650 uint64_t transferrable;
652 transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
654 transferrable = *captured_transferrable;
656 // A notifier is expired if at least as many bytes have been transferred
657 // as were originally considered transferrable.
658 is_expired = !is_streaming && transferred >= *captured_transferrable;
659 return [=, package=*this](){
660 package.notifier(transferred, transferrable);
664 void SyncSession::create_sync_session()
669 sync::Session::Config session_config;
670 session_config.changeset_cooker = m_config.transformer;
671 session_config.encryption_key = m_config.realm_encryption_key;
672 session_config.verify_servers_ssl_certificate = m_config.client_validate_ssl;
673 session_config.ssl_trust_certificate_path = m_config.ssl_trust_certificate_path;
674 session_config.ssl_verify_callback = m_config.ssl_verify_callback;
675 session_config.multiplex_ident = m_multiplex_identity;
676 m_session = m_client.make_session(m_realm_path, std::move(session_config));
678 // The next time we get a token, call `bind()` instead of `refresh()`.
679 m_session_has_been_bound = false;
681 // Configure the error handler.
682 std::weak_ptr<SyncSession> weak_self = shared_from_this();
683 auto wrapped_handler = [this, weak_self](std::error_code error_code, bool is_fatal, std::string message) {
684 auto self = weak_self.lock();
686 // An error was delivered after the session it relates to was destroyed. There's nothing useful
687 // we can do with it.
690 handle_error(SyncError{error_code, std::move(message), is_fatal});
692 m_session->set_error_handler(std::move(wrapped_handler));
694 // Configure the sync transaction callback.
695 auto wrapped_callback = [this, weak_self](VersionID old_version, VersionID new_version) {
696 if (auto self = weak_self.lock()) {
697 if (m_sync_transact_callback) {
698 m_sync_transact_callback(old_version, new_version);
702 m_session->set_sync_transact_callback(std::move(wrapped_callback));
704 // Set up the wrapped progress handler callback
705 auto wrapped_progress_handler = [this, weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
706 uint_fast64_t uploaded, uint_fast64_t uploadable,
707 bool is_fresh, uint_fast64_t /*snapshot_version*/) {
708 if (auto self = weak_self.lock()) {
709 handle_progress_update(downloaded, downloadable, uploaded, uploadable, is_fresh);
712 m_session->set_progress_handler(std::move(wrapped_progress_handler));
715 void SyncSession::set_sync_transact_callback(std::function<sync::Session::SyncTransactCallback> callback)
717 m_sync_transact_callback = std::move(callback);
720 void SyncSession::advance_state(std::unique_lock<std::mutex>& lock, const State& state)
722 REALM_ASSERT(lock.owns_lock());
723 REALM_ASSERT(&state != m_state);
725 m_state->enter_state(lock, *this);
728 void SyncSession::nonsync_transact_notify(sync::Session::version_type version)
730 std::unique_lock<std::mutex> lock(m_state_mutex);
731 m_state->nonsync_transact_notify(lock, *this, version);
734 void SyncSession::revive_if_needed()
736 util::Optional<std::function<SyncBindSessionHandler>&> handler;
738 std::unique_lock<std::mutex> lock(m_state_mutex);
739 if (m_state->revive_if_needed(lock, *this))
740 handler = m_config.bind_session_handler;
743 handler.value()(m_realm_path, m_config, shared_from_this());
746 void SyncSession::handle_reconnect()
748 std::unique_lock<std::mutex> lock(m_state_mutex);
749 m_state->handle_reconnect(lock, *this);
752 void SyncSession::log_out()
754 std::unique_lock<std::mutex> lock(m_state_mutex);
755 m_state->log_out(lock, *this);
758 void SyncSession::close()
760 std::unique_lock<std::mutex> lock(m_state_mutex);
761 m_state->close(lock, *this);
764 void SyncSession::unregister(std::unique_lock<std::mutex>& lock)
766 REALM_ASSERT(lock.owns_lock());
767 REALM_ASSERT(m_state == &State::inactive); // Must stop an active session before unregistering.
770 SyncManager::shared().unregister_session(m_realm_path);
773 bool SyncSession::wait_for_upload_completion(std::function<void(std::error_code)> callback)
775 std::unique_lock<std::mutex> lock(m_state_mutex);
776 return m_state->wait_for_completion(*this, std::move(callback), &sync::Session::async_wait_for_upload_completion);
779 bool SyncSession::wait_for_download_completion(std::function<void(std::error_code)> callback)
781 std::unique_lock<std::mutex> lock(m_state_mutex);
782 return m_state->wait_for_completion(*this, std::move(callback), &sync::Session::async_wait_for_download_completion);
785 uint64_t SyncSession::register_progress_notifier(std::function<SyncProgressNotifierCallback> notifier,
786 NotifierType direction, bool is_streaming)
788 std::function<void()> invocation;
789 uint64_t token_value = 0;
791 std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
792 token_value = m_progress_notifier_token++;
793 NotifierPackage package{std::move(notifier), is_streaming, direction};
794 if (!m_current_progress) {
795 // Simply register the package, since we have no data yet.
796 m_notifiers.emplace(token_value, std::move(package));
799 package.update(*m_current_progress, m_latest_progress_data_is_fresh);
800 bool skip_registration = false;
801 invocation = package.create_invocation(*m_current_progress, skip_registration);
802 if (skip_registration) {
805 m_notifiers.emplace(token_value, std::move(package));
812 void SyncSession::unregister_progress_notifier(uint64_t token)
814 std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
815 m_notifiers.erase(token);
818 void SyncSession::refresh_access_token(std::string access_token, util::Optional<std::string> server_url)
820 std::unique_lock<std::mutex> lock(m_state_mutex);
821 if (!m_server_url && !server_url) {
822 // The first time this method is called, the server URL must be provided.
825 m_state->refresh_access_token(lock, *this, std::move(access_token), server_url);
828 void SyncSession::override_server(std::string address, int port)
830 std::unique_lock<std::mutex> lock(m_state_mutex);
831 m_state->override_server(lock, *this, std::move(address), port);
834 void SyncSession::set_multiplex_identifier(std::string multiplex_identity)
836 m_multiplex_identity = std::move(multiplex_identity);
839 SyncSession::PublicState SyncSession::state() const
841 std::unique_lock<std::mutex> lock(m_state_mutex);
842 if (m_state == &State::waiting_for_access_token) {
843 return PublicState::WaitingForAccessToken;
844 } else if (m_state == &State::active) {
845 return PublicState::Active;
846 } else if (m_state == &State::dying) {
847 return PublicState::Dying;
848 } else if (m_state == &State::inactive) {
849 return PublicState::Inactive;
854 // Represents a reference to the SyncSession from outside of the sync subsystem.
855 // We attempt to keep the SyncSession in an active state as long as it has an external reference.
856 class SyncSession::ExternalReference {
858 ExternalReference(std::shared_ptr<SyncSession> session) : m_session(std::move(session))
863 m_session->did_drop_external_reference();
867 std::shared_ptr<SyncSession> m_session;
870 std::shared_ptr<SyncSession> SyncSession::external_reference()
872 std::unique_lock<std::mutex> lock(m_state_mutex);
874 if (auto external_reference = m_external_reference.lock())
875 return std::shared_ptr<SyncSession>(external_reference, this);
877 auto external_reference = std::make_shared<ExternalReference>(shared_from_this());
878 m_external_reference = external_reference;
879 return std::shared_ptr<SyncSession>(external_reference, this);
882 std::shared_ptr<SyncSession> SyncSession::existing_external_reference()
884 std::unique_lock<std::mutex> lock(m_state_mutex);
886 if (auto external_reference = m_external_reference.lock())
887 return std::shared_ptr<SyncSession>(external_reference, this);
892 void SyncSession::did_drop_external_reference()
894 std::unique_lock<std::mutex> lock(m_state_mutex);
896 // If the session is being resurrected we should not close the session.
897 if (!m_external_reference.expired())
900 m_state->close(lock, *this);