--- /dev/null
+////////////////////////////////////////////////////////////////////////////
+//
+// Copyright 2016 Realm Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////
+
+#include "sync/sync_session.hpp"
+
+#include "sync/impl/sync_client.hpp"
+#include "sync/impl/sync_file.hpp"
+#include "sync/impl/sync_metadata.hpp"
+#include "sync/sync_manager.hpp"
+#include "sync/sync_user.hpp"
+
+#include <realm/sync/client.hpp>
+#include <realm/sync/protocol.hpp>
+
+
+using namespace realm;
+using namespace realm::_impl;
+using namespace realm::_impl::sync_session_states;
+
+using SessionWaiterPointer = void(sync::Session::*)(std::function<void(std::error_code)>);
+
+constexpr const char SyncError::c_original_file_path_key[];
+constexpr const char SyncError::c_recovery_file_path_key[];
+
+/// A state which a `SyncSession` can currently be within. State classes handle various actions
+/// and state transitions.
+///
+/// STATES:
+///
+/// WAITING_FOR_ACCESS_TOKEN: upon entering this state, the binding is informed
+/// that the session wants an access token. The session is now waiting for the
+/// binding to provide the token.
+/// From: INACTIVE
+/// To:
+/// * ACTIVE: when the binding successfully refreshes the token
+/// * INACTIVE: if asked to log out, or if asked to close and the stop policy
+/// is Immediate.
+///
+/// ACTIVE: the session is connected to the Realm Object Server and is actively
+/// transferring data.
+/// From: WAITING_FOR_ACCESS_TOKEN, DYING
+/// To:
+/// * WAITING_FOR_ACCESS_TOKEN: if the session is informed (through the error
+/// handler) that the token expired
+/// * INACTIVE: if asked to log out, or if asked to close and the stop policy
+/// is Immediate.
+/// * DYING: if asked to close and the stop policy is AfterChangesUploaded
+///
+/// DYING: the session is performing clean-up work in preparation to be destroyed.
+/// From: ACTIVE
+/// To:
+/// * INACTIVE: when the clean-up work completes, if the session wasn't
+/// revived, or if explicitly asked to log out before the
+/// clean-up work begins
+/// * ACTIVE: if the session is revived
+///
+/// INACTIVE: the user owning this session has logged out, the `sync::Session`
+/// owned by this session is destroyed, and the session is quiescent.
+/// Note that a session briefly enters this state before being destroyed, but
+/// it can also enter this state and stay there if the user has been logged out.
+/// From: initial, WAITING_FOR_ACCESS_TOKEN, ACTIVE, DYING
+/// To:
+/// * WAITING_FOR_ACCESS_TOKEN: if the session is revived
+///
+struct SyncSession::State {
+ virtual ~State() { }
+
+ // Move the given session into this state. All state transitions MUST be carried out through this method.
+ virtual void enter_state(std::unique_lock<std::mutex>&, SyncSession&) const { }
+
+ virtual void refresh_access_token(std::unique_lock<std::mutex>&,
+ SyncSession&, std::string,
+ const util::Optional<std::string>&) const { }
+
+ // Returns true iff the lock is still locked when the method returns.
+ virtual bool access_token_expired(std::unique_lock<std::mutex>&, SyncSession&) const { return true; }
+
+ virtual void nonsync_transact_notify(std::unique_lock<std::mutex>&, SyncSession&, sync::Session::version_type) const { }
+
+ // Perform any work needed to reactivate a session that is not already active.
+ // Returns true iff the session should ask the binding to get a token for `bind()`.
+ virtual bool revive_if_needed(std::unique_lock<std::mutex>&, SyncSession&) const { return false; }
+
+ // Perform any work needed to respond to the application regaining network connectivity.
+ virtual void handle_reconnect(std::unique_lock<std::mutex>&, SyncSession&) const { };
+
+ // The user that owns this session has been logged out, and the session should take appropriate action.
+ virtual void log_out(std::unique_lock<std::mutex>&, SyncSession&) const { }
+
+ // The session should be closed and moved to `inactive`, in accordance with its stop policy and other state.
+ virtual void close(std::unique_lock<std::mutex>&, SyncSession&) const { }
+
+ // Returns true iff the error has been fully handled and the error handler should immediately return.
+ virtual bool handle_error(std::unique_lock<std::mutex>&, SyncSession&, const SyncError&) const { return false; }
+
+ // Register a handler to wait for sync session uploads, downloads, or synchronization.
+ // PRECONDITION: the session state lock must be held at the time this method is called, until after it returns.
+ // Returns true iff the handler was registered, either immediately or placed in a queue for later registration.
+ virtual bool wait_for_completion(SyncSession&,
+ std::function<void(std::error_code)>,
+ SessionWaiterPointer) const {
+ return false;
+ }
+
+ virtual void override_server(std::unique_lock<std::mutex>&, SyncSession&, std::string, int) const { }
+
+ static const State& waiting_for_access_token;
+ static const State& active;
+ static const State& dying;
+ static const State& inactive;
+};
+
+struct sync_session_states::WaitingForAccessToken : public SyncSession::State {
+ void enter_state(std::unique_lock<std::mutex>&, SyncSession& session) const override
+ {
+ session.m_deferred_close = false;
+ }
+
+ void refresh_access_token(std::unique_lock<std::mutex>& lock, SyncSession& session,
+ std::string access_token,
+ const util::Optional<std::string>& server_url) const override
+ {
+ session.create_sync_session();
+
+ // Since the sync session was previously unbound, it's safe to do this from the
+ // calling thread.
+ if (!session.m_server_url) {
+ session.m_server_url = server_url;
+ }
+ if (session.m_session_has_been_bound) {
+ session.m_session->refresh(std::move(access_token));
+ session.m_session->cancel_reconnect_delay();
+ } else {
+ session.m_session->bind(*session.m_server_url, std::move(access_token));
+ session.m_session_has_been_bound = true;
+ }
+
+ if (session.m_server_override)
+ session.m_session->override_server(session.m_server_override->address, session.m_server_override->port);
+
+ // Register all the pending wait-for-completion blocks.
+ for (auto& package : session.m_completion_wait_packages) {
+ (*session.m_session.*package.waiter)(std::move(package.callback));
+ }
+ session.m_completion_wait_packages.clear();
+
+ // Handle any deferred commit notification.
+ if (session.m_deferred_commit_notification) {
+ session.m_session->nonsync_transact_notify(*session.m_deferred_commit_notification);
+ session.m_deferred_commit_notification = util::none;
+ }
+
+ session.advance_state(lock, active);
+ if (session.m_deferred_close) {
+ session.m_state->close(lock, session);
+ }
+ }
+
+ void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ session.advance_state(lock, inactive);
+ }
+
+ bool revive_if_needed(std::unique_lock<std::mutex>&, SyncSession& session) const override
+ {
+ session.m_deferred_close = false;
+ return false;
+ }
+
+ void handle_reconnect(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ // Ask the binding to retry getting the token for this session.
+ std::shared_ptr<SyncSession> session_ptr = session.shared_from_this();
+ lock.unlock();
+ session.m_config.bind_session_handler(session_ptr->m_realm_path, session_ptr->m_config, session_ptr);
+ }
+
+ void nonsync_transact_notify(std::unique_lock<std::mutex>&,
+ SyncSession& session,
+ sync::Session::version_type version) const override
+ {
+ // Notify at first available opportunity.
+ session.m_deferred_commit_notification = version;
+ }
+
+ void close(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ switch (session.m_config.stop_policy) {
+ case SyncSessionStopPolicy::Immediately:
+ // Immediately kill the session.
+ session.advance_state(lock, inactive);
+ break;
+ case SyncSessionStopPolicy::LiveIndefinitely:
+ case SyncSessionStopPolicy::AfterChangesUploaded:
+ // Defer handling closing the session until after the login response succeeds.
+ session.m_deferred_close = true;
+ break;
+ }
+ }
+
+ bool wait_for_completion(SyncSession& session,
+ std::function<void(std::error_code)> callback,
+ SessionWaiterPointer waiter) const override
+ {
+ session.m_completion_wait_packages.push_back({ waiter, std::move(callback) });
+ return true;
+ }
+
+ void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
+ std::string address, int port) const override
+ {
+ session.m_server_override = SyncSession::ServerOverride{address, port};
+ }
+};
+
+struct sync_session_states::Active : public SyncSession::State {
+ void refresh_access_token(std::unique_lock<std::mutex>&, SyncSession& session,
+ std::string access_token,
+ const util::Optional<std::string>&) const override
+ {
+ session.m_session->refresh(std::move(access_token));
+ // Cancel the session's reconnection delay. This is important if the
+ // token is being refreshed as a response to a 202 (token expired)
+ // error, or similar non-fatal sync errors.
+ session.m_session->cancel_reconnect_delay();
+ }
+
+ bool access_token_expired(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ session.advance_state(lock, waiting_for_access_token);
+ std::shared_ptr<SyncSession> session_ptr = session.shared_from_this();
+ lock.unlock();
+ session.m_config.bind_session_handler(session_ptr->m_realm_path, session_ptr->m_config, session_ptr);
+ return false;
+ }
+
+ void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ session.advance_state(lock, inactive);
+ }
+
+ void nonsync_transact_notify(std::unique_lock<std::mutex>&, SyncSession& session,
+ sync::Session::version_type version) const override
+ {
+ // Fully ready sync session, notify immediately.
+ session.m_session->nonsync_transact_notify(version);
+ }
+
+ void close(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ switch (session.m_config.stop_policy) {
+ case SyncSessionStopPolicy::Immediately:
+ session.advance_state(lock, inactive);
+ break;
+ case SyncSessionStopPolicy::LiveIndefinitely:
+ // Don't do anything; session lives forever.
+ break;
+ case SyncSessionStopPolicy::AfterChangesUploaded:
+ // Wait for all pending changes to upload.
+ session.advance_state(lock, dying);
+ break;
+ }
+ }
+
+ bool wait_for_completion(SyncSession& session,
+ std::function<void(std::error_code)> callback,
+ SessionWaiterPointer waiter) const override
+ {
+ REALM_ASSERT(session.m_session);
+ (*session.m_session.*waiter)(std::move(callback));
+ return true;
+ }
+
+ void handle_reconnect(std::unique_lock<std::mutex>&, SyncSession& session) const override
+ {
+ session.m_session->cancel_reconnect_delay();
+ }
+
+ void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
+ std::string address, int port) const override
+ {
+ session.m_server_override = SyncSession::ServerOverride{address, port};
+ session.m_session->override_server(address, port);
+ }
+};
+
+struct sync_session_states::Dying : public SyncSession::State {
+ void enter_state(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ // If we have no session, we cannot possibly upload anything.
+ if (!session.m_session) {
+ session.advance_state(lock, inactive);
+ return;
+ }
+
+ size_t current_death_count = ++session.m_death_count;
+ std::weak_ptr<SyncSession> weak_session = session.shared_from_this();
+ session.m_session->async_wait_for_upload_completion([weak_session, current_death_count](std::error_code) {
+ if (auto session = weak_session.lock()) {
+ std::unique_lock<std::mutex> lock(session->m_state_mutex);
+ if (session->m_state == &State::dying && session->m_death_count == current_death_count) {
+ session->advance_state(lock, inactive);
+ }
+ }
+ });
+ }
+
+ bool handle_error(std::unique_lock<std::mutex>& lock, SyncSession& session, const SyncError& error) const override
+ {
+ if (error.is_fatal) {
+ session.advance_state(lock, inactive);
+ }
+ // If the error isn't fatal, don't change state, but don't
+ // allow it to be reported either.
+ // FIXME: What if the token expires while a session is dying?
+ // Should we allow the token to be refreshed so that changes
+ // can finish being uploaded?
+ return true;
+ }
+
+ bool revive_if_needed(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ // Revive.
+ session.advance_state(lock, active);
+ return false;
+ }
+
+ void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ session.advance_state(lock, inactive);
+ }
+
+ bool wait_for_completion(SyncSession& session,
+ std::function<void(std::error_code)> callback,
+ SessionWaiterPointer waiter) const override
+ {
+ REALM_ASSERT(session.m_session);
+ (*session.m_session.*waiter)(std::move(callback));
+ return true;
+ }
+
+ void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
+ std::string address, int port) const override
+ {
+ session.m_server_override = SyncSession::ServerOverride{address, port};
+ session.m_session->override_server(address, port);
+ }
+};
+
+struct sync_session_states::Inactive : public SyncSession::State {
+ void enter_state(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ // Inform any queued-up completion handlers that they were cancelled.
+ for (auto& package : session.m_completion_wait_packages) {
+ package.callback(util::error::operation_aborted);
+ }
+ session.m_completion_wait_packages.clear();
+ session.m_session = nullptr;
+ session.unregister(lock);
+ }
+
+ bool revive_if_needed(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
+ {
+ session.advance_state(lock, waiting_for_access_token);
+ return true;
+ }
+
+ bool wait_for_completion(SyncSession& session,
+ std::function<void(std::error_code)> callback,
+ SessionWaiterPointer waiter) const override
+ {
+ session.m_completion_wait_packages.push_back({ waiter, std::move(callback) });
+ return true;
+ }
+
+ void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
+ std::string address, int port) const override
+ {
+ session.m_server_override = SyncSession::ServerOverride{address, port};
+ }
+};
+
+
+const SyncSession::State& SyncSession::State::waiting_for_access_token = WaitingForAccessToken();
+const SyncSession::State& SyncSession::State::active = Active();
+const SyncSession::State& SyncSession::State::dying = Dying();
+const SyncSession::State& SyncSession::State::inactive = Inactive();
+
+SyncSession::SyncSession(SyncClient& client, std::string realm_path, SyncConfig config)
+: m_state(&State::inactive)
+, m_config(std::move(config))
+, m_realm_path(std::move(realm_path))
+, m_client(client)
+{
+ // Sync history validation ensures that the history within the Realm file is in a format that can be used
+ // by the version of realm-sync that we're using. Validation is enabled by default when the binding manually
+ // opens a sync session (via `SyncManager::get_session`), but is disabled when the sync session is opened
+ // as a side effect of opening a `Realm`. In that case, the sync history has already been validated by the
+ // act of opening the `Realm` so it's not necessary to repeat it here.
+ if (m_config.validate_sync_history) {
+ Realm::Config realm_config;
+ realm_config.path = m_realm_path;
+ realm_config.schema_mode = SchemaMode::Additive;
+ realm_config.force_sync_history = true;
+ realm_config.cache = false;
+
+ if (m_config.realm_encryption_key) {
+ realm_config.encryption_key.resize(64);
+ std::copy(m_config.realm_encryption_key->begin(), m_config.realm_encryption_key->end(),
+ realm_config.encryption_key.begin());
+ }
+
+ // FIXME: Opening a Realm only to discard it is relatively expensive. It may be preferable to have
+ // realm-sync open the Realm when the `sync::Session` is created since it can continue to use it.
+ Realm::get_shared_realm(realm_config); // Throws
+ }
+}
+
+std::string SyncSession::get_recovery_file_path()
+{
+ return util::reserve_unique_file_name(SyncManager::shared().recovery_directory_path(),
+ util::create_timestamped_template("recovered_realm"));
+}
+
+void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup)
+{
+ // Add a SyncFileActionMetadata marking the Realm as needing to be deleted.
+ std::string recovery_path;
+ auto original_path = path();
+ error.user_info[SyncError::c_original_file_path_key] = original_path;
+ if (should_backup == ShouldBackup::yes) {
+ recovery_path = get_recovery_file_path();
+ error.user_info[SyncError::c_recovery_file_path_key] = recovery_path;
+ }
+ using Action = SyncFileActionMetadata::Action;
+ auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm;
+ SyncManager::shared().perform_metadata_update([this,
+ action,
+ original_path=std::move(original_path),
+ recovery_path=std::move(recovery_path)](const auto& manager) {
+ manager.make_file_action_metadata(original_path, m_config.realm_url(), m_config.user->identity(),
+ action, std::move(recovery_path));
+ });
+}
+
+// This method should only be called from within the error handler callback registered upon the underlying `m_session`.
+void SyncSession::handle_error(SyncError error)
+{
+ enum class NextStateAfterError { none, inactive, error };
+ auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
+ auto error_code = error.error_code;
+
+ {
+ // See if the current state wishes to take responsibility for handling the error.
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ if (m_state->handle_error(lock, *this, error)) {
+ return;
+ }
+ }
+
+ if (error_code.category() == realm::sync::protocol_error_category()) {
+ using ProtocolError = realm::sync::ProtocolError;
+ switch (static_cast<ProtocolError>(error_code.value())) {
+ // Connection level errors
+ case ProtocolError::connection_closed:
+ case ProtocolError::other_error:
+ // Not real errors, don't need to be reported to the binding.
+ return;
+ case ProtocolError::unknown_message:
+ case ProtocolError::bad_syntax:
+ case ProtocolError::limits_exceeded:
+ case ProtocolError::wrong_protocol_version:
+ case ProtocolError::bad_session_ident:
+ case ProtocolError::reuse_of_session_ident:
+ case ProtocolError::bound_in_other_session:
+ case ProtocolError::bad_message_order:
+ case ProtocolError::bad_client_version:
+ case ProtocolError::illegal_realm_path:
+ case ProtocolError::no_such_realm:
+ case ProtocolError::bad_changeset:
+ case ProtocolError::bad_changeset_header_syntax:
+ case ProtocolError::bad_changeset_size:
+ case ProtocolError::bad_changesets:
+ case ProtocolError::bad_decompression:
+ case ProtocolError::partial_sync_disabled:
+ break;
+ // Session errors
+ case ProtocolError::session_closed:
+ case ProtocolError::other_session_error:
+ case ProtocolError::disabled_session:
+ // The binding doesn't need to be aware of these because they are strictly informational, and do not
+ // represent actual errors.
+ return;
+ case ProtocolError::token_expired: {
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ // This isn't an error from the binding's point of view. If we're connected we'll
+ // simply ask the binding to log in again.
+ m_state->access_token_expired(lock, *this);
+ return;
+ }
+ case ProtocolError::bad_authentication: {
+ std::shared_ptr<SyncUser> user_to_invalidate;
+ next_state = NextStateAfterError::none;
+ {
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ user_to_invalidate = user();
+ cancel_pending_waits();
+ }
+ if (user_to_invalidate)
+ user_to_invalidate->invalidate();
+ break;
+ }
+ case ProtocolError::permission_denied: {
+ next_state = NextStateAfterError::inactive;
+ update_error_and_mark_file_for_deletion(error, ShouldBackup::no);
+ break;
+ }
+ case ProtocolError::bad_server_file_ident:
+ case ProtocolError::bad_client_file_ident:
+ case ProtocolError::bad_server_version:
+ case ProtocolError::diverging_histories:
+ next_state = NextStateAfterError::inactive;
+ update_error_and_mark_file_for_deletion(error, ShouldBackup::yes);
+ break;
+ }
+ } else if (error_code.category() == realm::sync::client_error_category()) {
+ using ClientError = realm::sync::Client::Error;
+ switch (static_cast<ClientError>(error_code.value())) {
+ case ClientError::connection_closed:
+ case ClientError::pong_timeout:
+ // Not real errors, don't need to be reported to the binding.
+ return;
+ case ClientError::unknown_message:
+ case ClientError::bad_syntax:
+ case ClientError::limits_exceeded:
+ case ClientError::bad_session_ident:
+ case ClientError::bad_message_order:
+ case ClientError::bad_file_ident_pair:
+ case ClientError::bad_progress:
+ case ClientError::bad_changeset_header_syntax:
+ case ClientError::bad_changeset_size:
+ case ClientError::bad_origin_file_ident:
+ case ClientError::bad_server_version:
+ case ClientError::bad_changeset:
+ case ClientError::bad_request_ident:
+ case ClientError::bad_error_code:
+ case ClientError::bad_compression:
+ case ClientError::bad_client_version:
+ case ClientError::ssl_server_cert_rejected:
+ // Don't do anything special for these errors.
+ // Future functionality may require special-case handling for existing
+ // errors, or newly introduced error codes.
+ break;
+ }
+ } else {
+ // Unrecognized error code.
+ error.is_unrecognized_by_client = true;
+ }
+ switch (next_state) {
+ case NextStateAfterError::none:
+ break;
+ case NextStateAfterError::inactive: {
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ advance_state(lock, State::inactive);
+ break;
+ }
+ case NextStateAfterError::error: {
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ cancel_pending_waits();
+ break;
+ }
+ }
+ if (m_config.error_handler) {
+ m_config.error_handler(shared_from_this(), std::move(error));
+ }
+}
+
+void SyncSession::cancel_pending_waits()
+{
+ // Inform any queued-up completion handlers that they were cancelled.
+ for (auto& package : m_completion_wait_packages) {
+ package.callback(util::error::operation_aborted);
+ }
+ m_completion_wait_packages.clear();
+}
+
+void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable,
+ uint64_t uploaded, uint64_t uploadable, bool is_fresh)
+{
+ std::vector<std::function<void()>> invocations;
+ {
+ std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
+ m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded};
+ m_latest_progress_data_is_fresh = is_fresh;
+
+ for (auto it = m_notifiers.begin(); it != m_notifiers.end();) {
+ auto& package = it->second;
+ package.update(*m_current_progress, is_fresh);
+
+ bool should_delete = false;
+ invocations.emplace_back(package.create_invocation(*m_current_progress, should_delete));
+
+ it = (should_delete ? m_notifiers.erase(it) : std::next(it));
+ }
+ }
+ // Run the notifiers only after we've released the lock.
+ for (auto& invocation : invocations) {
+ invocation();
+ }
+}
+
+void SyncSession::NotifierPackage::update(const Progress& current_progress, bool data_is_fresh)
+{
+ if (is_streaming || captured_transferrable || !data_is_fresh)
+ return;
+
+ captured_transferrable = direction == NotifierType::download ? current_progress.downloadable
+ : current_progress.uploadable;
+}
+
+// PRECONDITION: `update()` must first be called on the same package.
+std::function<void()> SyncSession::NotifierPackage::create_invocation(const Progress& current_progress,
+ bool& is_expired) const
+{
+ // It's possible for a non-streaming notifier to not yet have fresh transferrable bytes data.
+ // In that case, we don't call it at all.
+ // NOTE: `update()` is always called before `create_invocation()`, and will
+ // set `captured_transferrable` on the notifier package if fresh data has
+ // been received and the package is for a non-streaming notifier.
+ if (!is_streaming && !captured_transferrable)
+ return [](){ };
+
+ bool is_download = direction == NotifierType::download;
+ uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
+ uint64_t transferrable;
+ if (is_streaming) {
+ transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
+ } else {
+ transferrable = *captured_transferrable;
+ }
+ // A notifier is expired if at least as many bytes have been transferred
+ // as were originally considered transferrable.
+ is_expired = !is_streaming && transferred >= *captured_transferrable;
+ return [=, package=*this](){
+ package.notifier(transferred, transferrable);
+ };
+}
+
+void SyncSession::create_sync_session()
+{
+ if (m_session)
+ return;
+
+ sync::Session::Config session_config;
+ session_config.changeset_cooker = m_config.transformer;
+ session_config.encryption_key = m_config.realm_encryption_key;
+ session_config.verify_servers_ssl_certificate = m_config.client_validate_ssl;
+ session_config.ssl_trust_certificate_path = m_config.ssl_trust_certificate_path;
+ session_config.ssl_verify_callback = m_config.ssl_verify_callback;
+ session_config.multiplex_ident = m_multiplex_identity;
+ m_session = m_client.make_session(m_realm_path, std::move(session_config));
+
+ // The next time we get a token, call `bind()` instead of `refresh()`.
+ m_session_has_been_bound = false;
+
+ // Configure the error handler.
+ std::weak_ptr<SyncSession> weak_self = shared_from_this();
+ auto wrapped_handler = [this, weak_self](std::error_code error_code, bool is_fatal, std::string message) {
+ auto self = weak_self.lock();
+ if (!self) {
+ // An error was delivered after the session it relates to was destroyed. There's nothing useful
+ // we can do with it.
+ return;
+ }
+ handle_error(SyncError{error_code, std::move(message), is_fatal});
+ };
+ m_session->set_error_handler(std::move(wrapped_handler));
+
+ // Configure the sync transaction callback.
+ auto wrapped_callback = [this, weak_self](VersionID old_version, VersionID new_version) {
+ if (auto self = weak_self.lock()) {
+ if (m_sync_transact_callback) {
+ m_sync_transact_callback(old_version, new_version);
+ }
+ }
+ };
+ m_session->set_sync_transact_callback(std::move(wrapped_callback));
+
+ // Set up the wrapped progress handler callback
+ auto wrapped_progress_handler = [this, weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
+ uint_fast64_t uploaded, uint_fast64_t uploadable,
+ bool is_fresh, uint_fast64_t /*snapshot_version*/) {
+ if (auto self = weak_self.lock()) {
+ handle_progress_update(downloaded, downloadable, uploaded, uploadable, is_fresh);
+ }
+ };
+ m_session->set_progress_handler(std::move(wrapped_progress_handler));
+}
+
+void SyncSession::set_sync_transact_callback(std::function<sync::Session::SyncTransactCallback> callback)
+{
+ m_sync_transact_callback = std::move(callback);
+}
+
+void SyncSession::advance_state(std::unique_lock<std::mutex>& lock, const State& state)
+{
+ REALM_ASSERT(lock.owns_lock());
+ REALM_ASSERT(&state != m_state);
+ m_state = &state;
+ m_state->enter_state(lock, *this);
+}
+
+void SyncSession::nonsync_transact_notify(sync::Session::version_type version)
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ m_state->nonsync_transact_notify(lock, *this, version);
+}
+
+void SyncSession::revive_if_needed()
+{
+ util::Optional<std::function<SyncBindSessionHandler>&> handler;
+ {
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ if (m_state->revive_if_needed(lock, *this))
+ handler = m_config.bind_session_handler;
+ }
+ if (handler)
+ handler.value()(m_realm_path, m_config, shared_from_this());
+}
+
+void SyncSession::handle_reconnect()
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ m_state->handle_reconnect(lock, *this);
+}
+
+void SyncSession::log_out()
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ m_state->log_out(lock, *this);
+}
+
+void SyncSession::close()
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ m_state->close(lock, *this);
+}
+
+void SyncSession::unregister(std::unique_lock<std::mutex>& lock)
+{
+ REALM_ASSERT(lock.owns_lock());
+ REALM_ASSERT(m_state == &State::inactive); // Must stop an active session before unregistering.
+
+ lock.unlock();
+ SyncManager::shared().unregister_session(m_realm_path);
+}
+
+bool SyncSession::wait_for_upload_completion(std::function<void(std::error_code)> callback)
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ return m_state->wait_for_completion(*this, std::move(callback), &sync::Session::async_wait_for_upload_completion);
+}
+
+bool SyncSession::wait_for_download_completion(std::function<void(std::error_code)> callback)
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ return m_state->wait_for_completion(*this, std::move(callback), &sync::Session::async_wait_for_download_completion);
+}
+
+uint64_t SyncSession::register_progress_notifier(std::function<SyncProgressNotifierCallback> notifier,
+ NotifierType direction, bool is_streaming)
+{
+ std::function<void()> invocation;
+ uint64_t token_value = 0;
+ {
+ std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
+ token_value = m_progress_notifier_token++;
+ NotifierPackage package{std::move(notifier), is_streaming, direction};
+ if (!m_current_progress) {
+ // Simply register the package, since we have no data yet.
+ m_notifiers.emplace(token_value, std::move(package));
+ return token_value;
+ }
+ package.update(*m_current_progress, m_latest_progress_data_is_fresh);
+ bool skip_registration = false;
+ invocation = package.create_invocation(*m_current_progress, skip_registration);
+ if (skip_registration) {
+ token_value = 0;
+ } else {
+ m_notifiers.emplace(token_value, std::move(package));
+ }
+ }
+ invocation();
+ return token_value;
+}
+
+void SyncSession::unregister_progress_notifier(uint64_t token)
+{
+ std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
+ m_notifiers.erase(token);
+}
+
+void SyncSession::refresh_access_token(std::string access_token, util::Optional<std::string> server_url)
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ if (!m_server_url && !server_url) {
+ // The first time this method is called, the server URL must be provided.
+ return;
+ }
+ m_state->refresh_access_token(lock, *this, std::move(access_token), server_url);
+}
+
+void SyncSession::override_server(std::string address, int port)
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ m_state->override_server(lock, *this, std::move(address), port);
+}
+
+void SyncSession::set_multiplex_identifier(std::string multiplex_identity)
+{
+ m_multiplex_identity = std::move(multiplex_identity);
+}
+
+SyncSession::PublicState SyncSession::state() const
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+ if (m_state == &State::waiting_for_access_token) {
+ return PublicState::WaitingForAccessToken;
+ } else if (m_state == &State::active) {
+ return PublicState::Active;
+ } else if (m_state == &State::dying) {
+ return PublicState::Dying;
+ } else if (m_state == &State::inactive) {
+ return PublicState::Inactive;
+ }
+ REALM_UNREACHABLE();
+}
+
+// Represents a reference to the SyncSession from outside of the sync subsystem.
+// We attempt to keep the SyncSession in an active state as long as it has an external reference.
+class SyncSession::ExternalReference {
+public:
+ ExternalReference(std::shared_ptr<SyncSession> session) : m_session(std::move(session))
+ {}
+
+ ~ExternalReference()
+ {
+ m_session->did_drop_external_reference();
+ }
+
+private:
+ std::shared_ptr<SyncSession> m_session;
+};
+
+std::shared_ptr<SyncSession> SyncSession::external_reference()
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+
+ if (auto external_reference = m_external_reference.lock())
+ return std::shared_ptr<SyncSession>(external_reference, this);
+
+ auto external_reference = std::make_shared<ExternalReference>(shared_from_this());
+ m_external_reference = external_reference;
+ return std::shared_ptr<SyncSession>(external_reference, this);
+}
+
+std::shared_ptr<SyncSession> SyncSession::existing_external_reference()
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+
+ if (auto external_reference = m_external_reference.lock())
+ return std::shared_ptr<SyncSession>(external_reference, this);
+
+ return nullptr;
+}
+
+void SyncSession::did_drop_external_reference()
+{
+ std::unique_lock<std::mutex> lock(m_state_mutex);
+
+ // If the session is being resurrected we should not close the session.
+ if (!m_external_reference.expired())
+ return;
+
+ m_state->close(lock, *this);
+}