X-Git-Url: https://git.mdrn.pl/wl-app.git/blobdiff_plain/53b27422d140022594fc241cca91c3183be57bca..48b2fe9f7c2dc3d9aeaaa6dbfb27c7da4f3235ff:/iOS/Pods/Realm/Realm/ObjectStore/src/sync/sync_session.cpp diff --git a/iOS/Pods/Realm/Realm/ObjectStore/src/sync/sync_session.cpp b/iOS/Pods/Realm/Realm/ObjectStore/src/sync/sync_session.cpp new file mode 100644 index 0000000..dea0b41 --- /dev/null +++ b/iOS/Pods/Realm/Realm/ObjectStore/src/sync/sync_session.cpp @@ -0,0 +1,901 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2016 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "sync/sync_session.hpp" + +#include "sync/impl/sync_client.hpp" +#include "sync/impl/sync_file.hpp" +#include "sync/impl/sync_metadata.hpp" +#include "sync/sync_manager.hpp" +#include "sync/sync_user.hpp" + +#include +#include + + +using namespace realm; +using namespace realm::_impl; +using namespace realm::_impl::sync_session_states; + +using SessionWaiterPointer = void(sync::Session::*)(std::function); + +constexpr const char SyncError::c_original_file_path_key[]; +constexpr const char SyncError::c_recovery_file_path_key[]; + +/// A state which a `SyncSession` can currently be within. State classes handle various actions +/// and state transitions. +/// +/// STATES: +/// +/// WAITING_FOR_ACCESS_TOKEN: upon entering this state, the binding is informed +/// that the session wants an access token. The session is now waiting for the +/// binding to provide the token. +/// From: INACTIVE +/// To: +/// * ACTIVE: when the binding successfully refreshes the token +/// * INACTIVE: if asked to log out, or if asked to close and the stop policy +/// is Immediate. +/// +/// ACTIVE: the session is connected to the Realm Object Server and is actively +/// transferring data. +/// From: WAITING_FOR_ACCESS_TOKEN, DYING +/// To: +/// * WAITING_FOR_ACCESS_TOKEN: if the session is informed (through the error +/// handler) that the token expired +/// * INACTIVE: if asked to log out, or if asked to close and the stop policy +/// is Immediate. +/// * DYING: if asked to close and the stop policy is AfterChangesUploaded +/// +/// DYING: the session is performing clean-up work in preparation to be destroyed. +/// From: ACTIVE +/// To: +/// * INACTIVE: when the clean-up work completes, if the session wasn't +/// revived, or if explicitly asked to log out before the +/// clean-up work begins +/// * ACTIVE: if the session is revived +/// +/// INACTIVE: the user owning this session has logged out, the `sync::Session` +/// owned by this session is destroyed, and the session is quiescent. +/// Note that a session briefly enters this state before being destroyed, but +/// it can also enter this state and stay there if the user has been logged out. +/// From: initial, WAITING_FOR_ACCESS_TOKEN, ACTIVE, DYING +/// To: +/// * WAITING_FOR_ACCESS_TOKEN: if the session is revived +/// +struct SyncSession::State { + virtual ~State() { } + + // Move the given session into this state. All state transitions MUST be carried out through this method. + virtual void enter_state(std::unique_lock&, SyncSession&) const { } + + virtual void refresh_access_token(std::unique_lock&, + SyncSession&, std::string, + const util::Optional&) const { } + + // Returns true iff the lock is still locked when the method returns. + virtual bool access_token_expired(std::unique_lock&, SyncSession&) const { return true; } + + virtual void nonsync_transact_notify(std::unique_lock&, SyncSession&, sync::Session::version_type) const { } + + // Perform any work needed to reactivate a session that is not already active. + // Returns true iff the session should ask the binding to get a token for `bind()`. + virtual bool revive_if_needed(std::unique_lock&, SyncSession&) const { return false; } + + // Perform any work needed to respond to the application regaining network connectivity. + virtual void handle_reconnect(std::unique_lock&, SyncSession&) const { }; + + // The user that owns this session has been logged out, and the session should take appropriate action. + virtual void log_out(std::unique_lock&, SyncSession&) const { } + + // The session should be closed and moved to `inactive`, in accordance with its stop policy and other state. + virtual void close(std::unique_lock&, SyncSession&) const { } + + // Returns true iff the error has been fully handled and the error handler should immediately return. + virtual bool handle_error(std::unique_lock&, SyncSession&, const SyncError&) const { return false; } + + // Register a handler to wait for sync session uploads, downloads, or synchronization. + // PRECONDITION: the session state lock must be held at the time this method is called, until after it returns. + // Returns true iff the handler was registered, either immediately or placed in a queue for later registration. + virtual bool wait_for_completion(SyncSession&, + std::function, + SessionWaiterPointer) const { + return false; + } + + virtual void override_server(std::unique_lock&, SyncSession&, std::string, int) const { } + + static const State& waiting_for_access_token; + static const State& active; + static const State& dying; + static const State& inactive; +}; + +struct sync_session_states::WaitingForAccessToken : public SyncSession::State { + void enter_state(std::unique_lock&, SyncSession& session) const override + { + session.m_deferred_close = false; + } + + void refresh_access_token(std::unique_lock& lock, SyncSession& session, + std::string access_token, + const util::Optional& server_url) const override + { + session.create_sync_session(); + + // Since the sync session was previously unbound, it's safe to do this from the + // calling thread. + if (!session.m_server_url) { + session.m_server_url = server_url; + } + if (session.m_session_has_been_bound) { + session.m_session->refresh(std::move(access_token)); + session.m_session->cancel_reconnect_delay(); + } else { + session.m_session->bind(*session.m_server_url, std::move(access_token)); + session.m_session_has_been_bound = true; + } + + if (session.m_server_override) + session.m_session->override_server(session.m_server_override->address, session.m_server_override->port); + + // Register all the pending wait-for-completion blocks. + for (auto& package : session.m_completion_wait_packages) { + (*session.m_session.*package.waiter)(std::move(package.callback)); + } + session.m_completion_wait_packages.clear(); + + // Handle any deferred commit notification. + if (session.m_deferred_commit_notification) { + session.m_session->nonsync_transact_notify(*session.m_deferred_commit_notification); + session.m_deferred_commit_notification = util::none; + } + + session.advance_state(lock, active); + if (session.m_deferred_close) { + session.m_state->close(lock, session); + } + } + + void log_out(std::unique_lock& lock, SyncSession& session) const override + { + session.advance_state(lock, inactive); + } + + bool revive_if_needed(std::unique_lock&, SyncSession& session) const override + { + session.m_deferred_close = false; + return false; + } + + void handle_reconnect(std::unique_lock& lock, SyncSession& session) const override + { + // Ask the binding to retry getting the token for this session. + std::shared_ptr session_ptr = session.shared_from_this(); + lock.unlock(); + session.m_config.bind_session_handler(session_ptr->m_realm_path, session_ptr->m_config, session_ptr); + } + + void nonsync_transact_notify(std::unique_lock&, + SyncSession& session, + sync::Session::version_type version) const override + { + // Notify at first available opportunity. + session.m_deferred_commit_notification = version; + } + + void close(std::unique_lock& lock, SyncSession& session) const override + { + switch (session.m_config.stop_policy) { + case SyncSessionStopPolicy::Immediately: + // Immediately kill the session. + session.advance_state(lock, inactive); + break; + case SyncSessionStopPolicy::LiveIndefinitely: + case SyncSessionStopPolicy::AfterChangesUploaded: + // Defer handling closing the session until after the login response succeeds. + session.m_deferred_close = true; + break; + } + } + + bool wait_for_completion(SyncSession& session, + std::function callback, + SessionWaiterPointer waiter) const override + { + session.m_completion_wait_packages.push_back({ waiter, std::move(callback) }); + return true; + } + + void override_server(std::unique_lock&, SyncSession& session, + std::string address, int port) const override + { + session.m_server_override = SyncSession::ServerOverride{address, port}; + } +}; + +struct sync_session_states::Active : public SyncSession::State { + void refresh_access_token(std::unique_lock&, SyncSession& session, + std::string access_token, + const util::Optional&) const override + { + session.m_session->refresh(std::move(access_token)); + // Cancel the session's reconnection delay. This is important if the + // token is being refreshed as a response to a 202 (token expired) + // error, or similar non-fatal sync errors. + session.m_session->cancel_reconnect_delay(); + } + + bool access_token_expired(std::unique_lock& lock, SyncSession& session) const override + { + session.advance_state(lock, waiting_for_access_token); + std::shared_ptr session_ptr = session.shared_from_this(); + lock.unlock(); + session.m_config.bind_session_handler(session_ptr->m_realm_path, session_ptr->m_config, session_ptr); + return false; + } + + void log_out(std::unique_lock& lock, SyncSession& session) const override + { + session.advance_state(lock, inactive); + } + + void nonsync_transact_notify(std::unique_lock&, SyncSession& session, + sync::Session::version_type version) const override + { + // Fully ready sync session, notify immediately. + session.m_session->nonsync_transact_notify(version); + } + + void close(std::unique_lock& lock, SyncSession& session) const override + { + switch (session.m_config.stop_policy) { + case SyncSessionStopPolicy::Immediately: + session.advance_state(lock, inactive); + break; + case SyncSessionStopPolicy::LiveIndefinitely: + // Don't do anything; session lives forever. + break; + case SyncSessionStopPolicy::AfterChangesUploaded: + // Wait for all pending changes to upload. + session.advance_state(lock, dying); + break; + } + } + + bool wait_for_completion(SyncSession& session, + std::function callback, + SessionWaiterPointer waiter) const override + { + REALM_ASSERT(session.m_session); + (*session.m_session.*waiter)(std::move(callback)); + return true; + } + + void handle_reconnect(std::unique_lock&, SyncSession& session) const override + { + session.m_session->cancel_reconnect_delay(); + } + + void override_server(std::unique_lock&, SyncSession& session, + std::string address, int port) const override + { + session.m_server_override = SyncSession::ServerOverride{address, port}; + session.m_session->override_server(address, port); + } +}; + +struct sync_session_states::Dying : public SyncSession::State { + void enter_state(std::unique_lock& lock, SyncSession& session) const override + { + // If we have no session, we cannot possibly upload anything. + if (!session.m_session) { + session.advance_state(lock, inactive); + return; + } + + size_t current_death_count = ++session.m_death_count; + std::weak_ptr weak_session = session.shared_from_this(); + session.m_session->async_wait_for_upload_completion([weak_session, current_death_count](std::error_code) { + if (auto session = weak_session.lock()) { + std::unique_lock lock(session->m_state_mutex); + if (session->m_state == &State::dying && session->m_death_count == current_death_count) { + session->advance_state(lock, inactive); + } + } + }); + } + + bool handle_error(std::unique_lock& lock, SyncSession& session, const SyncError& error) const override + { + if (error.is_fatal) { + session.advance_state(lock, inactive); + } + // If the error isn't fatal, don't change state, but don't + // allow it to be reported either. + // FIXME: What if the token expires while a session is dying? + // Should we allow the token to be refreshed so that changes + // can finish being uploaded? + return true; + } + + bool revive_if_needed(std::unique_lock& lock, SyncSession& session) const override + { + // Revive. + session.advance_state(lock, active); + return false; + } + + void log_out(std::unique_lock& lock, SyncSession& session) const override + { + session.advance_state(lock, inactive); + } + + bool wait_for_completion(SyncSession& session, + std::function callback, + SessionWaiterPointer waiter) const override + { + REALM_ASSERT(session.m_session); + (*session.m_session.*waiter)(std::move(callback)); + return true; + } + + void override_server(std::unique_lock&, SyncSession& session, + std::string address, int port) const override + { + session.m_server_override = SyncSession::ServerOverride{address, port}; + session.m_session->override_server(address, port); + } +}; + +struct sync_session_states::Inactive : public SyncSession::State { + void enter_state(std::unique_lock& lock, SyncSession& session) const override + { + // Inform any queued-up completion handlers that they were cancelled. + for (auto& package : session.m_completion_wait_packages) { + package.callback(util::error::operation_aborted); + } + session.m_completion_wait_packages.clear(); + session.m_session = nullptr; + session.unregister(lock); + } + + bool revive_if_needed(std::unique_lock& lock, SyncSession& session) const override + { + session.advance_state(lock, waiting_for_access_token); + return true; + } + + bool wait_for_completion(SyncSession& session, + std::function callback, + SessionWaiterPointer waiter) const override + { + session.m_completion_wait_packages.push_back({ waiter, std::move(callback) }); + return true; + } + + void override_server(std::unique_lock&, SyncSession& session, + std::string address, int port) const override + { + session.m_server_override = SyncSession::ServerOverride{address, port}; + } +}; + + +const SyncSession::State& SyncSession::State::waiting_for_access_token = WaitingForAccessToken(); +const SyncSession::State& SyncSession::State::active = Active(); +const SyncSession::State& SyncSession::State::dying = Dying(); +const SyncSession::State& SyncSession::State::inactive = Inactive(); + +SyncSession::SyncSession(SyncClient& client, std::string realm_path, SyncConfig config) +: m_state(&State::inactive) +, m_config(std::move(config)) +, m_realm_path(std::move(realm_path)) +, m_client(client) +{ + // Sync history validation ensures that the history within the Realm file is in a format that can be used + // by the version of realm-sync that we're using. Validation is enabled by default when the binding manually + // opens a sync session (via `SyncManager::get_session`), but is disabled when the sync session is opened + // as a side effect of opening a `Realm`. In that case, the sync history has already been validated by the + // act of opening the `Realm` so it's not necessary to repeat it here. + if (m_config.validate_sync_history) { + Realm::Config realm_config; + realm_config.path = m_realm_path; + realm_config.schema_mode = SchemaMode::Additive; + realm_config.force_sync_history = true; + realm_config.cache = false; + + if (m_config.realm_encryption_key) { + realm_config.encryption_key.resize(64); + std::copy(m_config.realm_encryption_key->begin(), m_config.realm_encryption_key->end(), + realm_config.encryption_key.begin()); + } + + // FIXME: Opening a Realm only to discard it is relatively expensive. It may be preferable to have + // realm-sync open the Realm when the `sync::Session` is created since it can continue to use it. + Realm::get_shared_realm(realm_config); // Throws + } +} + +std::string SyncSession::get_recovery_file_path() +{ + return util::reserve_unique_file_name(SyncManager::shared().recovery_directory_path(), + util::create_timestamped_template("recovered_realm")); +} + +void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup) +{ + // Add a SyncFileActionMetadata marking the Realm as needing to be deleted. + std::string recovery_path; + auto original_path = path(); + error.user_info[SyncError::c_original_file_path_key] = original_path; + if (should_backup == ShouldBackup::yes) { + recovery_path = get_recovery_file_path(); + error.user_info[SyncError::c_recovery_file_path_key] = recovery_path; + } + using Action = SyncFileActionMetadata::Action; + auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm; + SyncManager::shared().perform_metadata_update([this, + action, + original_path=std::move(original_path), + recovery_path=std::move(recovery_path)](const auto& manager) { + manager.make_file_action_metadata(original_path, m_config.realm_url(), m_config.user->identity(), + action, std::move(recovery_path)); + }); +} + +// This method should only be called from within the error handler callback registered upon the underlying `m_session`. +void SyncSession::handle_error(SyncError error) +{ + enum class NextStateAfterError { none, inactive, error }; + auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none; + auto error_code = error.error_code; + + { + // See if the current state wishes to take responsibility for handling the error. + std::unique_lock lock(m_state_mutex); + if (m_state->handle_error(lock, *this, error)) { + return; + } + } + + if (error_code.category() == realm::sync::protocol_error_category()) { + using ProtocolError = realm::sync::ProtocolError; + switch (static_cast(error_code.value())) { + // Connection level errors + case ProtocolError::connection_closed: + case ProtocolError::other_error: + // Not real errors, don't need to be reported to the binding. + return; + case ProtocolError::unknown_message: + case ProtocolError::bad_syntax: + case ProtocolError::limits_exceeded: + case ProtocolError::wrong_protocol_version: + case ProtocolError::bad_session_ident: + case ProtocolError::reuse_of_session_ident: + case ProtocolError::bound_in_other_session: + case ProtocolError::bad_message_order: + case ProtocolError::bad_client_version: + case ProtocolError::illegal_realm_path: + case ProtocolError::no_such_realm: + case ProtocolError::bad_changeset: + case ProtocolError::bad_changeset_header_syntax: + case ProtocolError::bad_changeset_size: + case ProtocolError::bad_changesets: + case ProtocolError::bad_decompression: + case ProtocolError::partial_sync_disabled: + break; + // Session errors + case ProtocolError::session_closed: + case ProtocolError::other_session_error: + case ProtocolError::disabled_session: + // The binding doesn't need to be aware of these because they are strictly informational, and do not + // represent actual errors. + return; + case ProtocolError::token_expired: { + std::unique_lock lock(m_state_mutex); + // This isn't an error from the binding's point of view. If we're connected we'll + // simply ask the binding to log in again. + m_state->access_token_expired(lock, *this); + return; + } + case ProtocolError::bad_authentication: { + std::shared_ptr user_to_invalidate; + next_state = NextStateAfterError::none; + { + std::unique_lock lock(m_state_mutex); + user_to_invalidate = user(); + cancel_pending_waits(); + } + if (user_to_invalidate) + user_to_invalidate->invalidate(); + break; + } + case ProtocolError::permission_denied: { + next_state = NextStateAfterError::inactive; + update_error_and_mark_file_for_deletion(error, ShouldBackup::no); + break; + } + case ProtocolError::bad_server_file_ident: + case ProtocolError::bad_client_file_ident: + case ProtocolError::bad_server_version: + case ProtocolError::diverging_histories: + next_state = NextStateAfterError::inactive; + update_error_and_mark_file_for_deletion(error, ShouldBackup::yes); + break; + } + } else if (error_code.category() == realm::sync::client_error_category()) { + using ClientError = realm::sync::Client::Error; + switch (static_cast(error_code.value())) { + case ClientError::connection_closed: + case ClientError::pong_timeout: + // Not real errors, don't need to be reported to the binding. + return; + case ClientError::unknown_message: + case ClientError::bad_syntax: + case ClientError::limits_exceeded: + case ClientError::bad_session_ident: + case ClientError::bad_message_order: + case ClientError::bad_file_ident_pair: + case ClientError::bad_progress: + case ClientError::bad_changeset_header_syntax: + case ClientError::bad_changeset_size: + case ClientError::bad_origin_file_ident: + case ClientError::bad_server_version: + case ClientError::bad_changeset: + case ClientError::bad_request_ident: + case ClientError::bad_error_code: + case ClientError::bad_compression: + case ClientError::bad_client_version: + case ClientError::ssl_server_cert_rejected: + // Don't do anything special for these errors. + // Future functionality may require special-case handling for existing + // errors, or newly introduced error codes. + break; + } + } else { + // Unrecognized error code. + error.is_unrecognized_by_client = true; + } + switch (next_state) { + case NextStateAfterError::none: + break; + case NextStateAfterError::inactive: { + std::unique_lock lock(m_state_mutex); + advance_state(lock, State::inactive); + break; + } + case NextStateAfterError::error: { + std::unique_lock lock(m_state_mutex); + cancel_pending_waits(); + break; + } + } + if (m_config.error_handler) { + m_config.error_handler(shared_from_this(), std::move(error)); + } +} + +void SyncSession::cancel_pending_waits() +{ + // Inform any queued-up completion handlers that they were cancelled. + for (auto& package : m_completion_wait_packages) { + package.callback(util::error::operation_aborted); + } + m_completion_wait_packages.clear(); +} + +void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, + uint64_t uploaded, uint64_t uploadable, bool is_fresh) +{ + std::vector> invocations; + { + std::lock_guard lock(m_progress_notifier_mutex); + m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded}; + m_latest_progress_data_is_fresh = is_fresh; + + for (auto it = m_notifiers.begin(); it != m_notifiers.end();) { + auto& package = it->second; + package.update(*m_current_progress, is_fresh); + + bool should_delete = false; + invocations.emplace_back(package.create_invocation(*m_current_progress, should_delete)); + + it = (should_delete ? m_notifiers.erase(it) : std::next(it)); + } + } + // Run the notifiers only after we've released the lock. + for (auto& invocation : invocations) { + invocation(); + } +} + +void SyncSession::NotifierPackage::update(const Progress& current_progress, bool data_is_fresh) +{ + if (is_streaming || captured_transferrable || !data_is_fresh) + return; + + captured_transferrable = direction == NotifierType::download ? current_progress.downloadable + : current_progress.uploadable; +} + +// PRECONDITION: `update()` must first be called on the same package. +std::function SyncSession::NotifierPackage::create_invocation(const Progress& current_progress, + bool& is_expired) const +{ + // It's possible for a non-streaming notifier to not yet have fresh transferrable bytes data. + // In that case, we don't call it at all. + // NOTE: `update()` is always called before `create_invocation()`, and will + // set `captured_transferrable` on the notifier package if fresh data has + // been received and the package is for a non-streaming notifier. + if (!is_streaming && !captured_transferrable) + return [](){ }; + + bool is_download = direction == NotifierType::download; + uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded; + uint64_t transferrable; + if (is_streaming) { + transferrable = is_download ? current_progress.downloadable : current_progress.uploadable; + } else { + transferrable = *captured_transferrable; + } + // A notifier is expired if at least as many bytes have been transferred + // as were originally considered transferrable. + is_expired = !is_streaming && transferred >= *captured_transferrable; + return [=, package=*this](){ + package.notifier(transferred, transferrable); + }; +} + +void SyncSession::create_sync_session() +{ + if (m_session) + return; + + sync::Session::Config session_config; + session_config.changeset_cooker = m_config.transformer; + session_config.encryption_key = m_config.realm_encryption_key; + session_config.verify_servers_ssl_certificate = m_config.client_validate_ssl; + session_config.ssl_trust_certificate_path = m_config.ssl_trust_certificate_path; + session_config.ssl_verify_callback = m_config.ssl_verify_callback; + session_config.multiplex_ident = m_multiplex_identity; + m_session = m_client.make_session(m_realm_path, std::move(session_config)); + + // The next time we get a token, call `bind()` instead of `refresh()`. + m_session_has_been_bound = false; + + // Configure the error handler. + std::weak_ptr weak_self = shared_from_this(); + auto wrapped_handler = [this, weak_self](std::error_code error_code, bool is_fatal, std::string message) { + auto self = weak_self.lock(); + if (!self) { + // An error was delivered after the session it relates to was destroyed. There's nothing useful + // we can do with it. + return; + } + handle_error(SyncError{error_code, std::move(message), is_fatal}); + }; + m_session->set_error_handler(std::move(wrapped_handler)); + + // Configure the sync transaction callback. + auto wrapped_callback = [this, weak_self](VersionID old_version, VersionID new_version) { + if (auto self = weak_self.lock()) { + if (m_sync_transact_callback) { + m_sync_transact_callback(old_version, new_version); + } + } + }; + m_session->set_sync_transact_callback(std::move(wrapped_callback)); + + // Set up the wrapped progress handler callback + auto wrapped_progress_handler = [this, weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable, + uint_fast64_t uploaded, uint_fast64_t uploadable, + bool is_fresh, uint_fast64_t /*snapshot_version*/) { + if (auto self = weak_self.lock()) { + handle_progress_update(downloaded, downloadable, uploaded, uploadable, is_fresh); + } + }; + m_session->set_progress_handler(std::move(wrapped_progress_handler)); +} + +void SyncSession::set_sync_transact_callback(std::function callback) +{ + m_sync_transact_callback = std::move(callback); +} + +void SyncSession::advance_state(std::unique_lock& lock, const State& state) +{ + REALM_ASSERT(lock.owns_lock()); + REALM_ASSERT(&state != m_state); + m_state = &state; + m_state->enter_state(lock, *this); +} + +void SyncSession::nonsync_transact_notify(sync::Session::version_type version) +{ + std::unique_lock lock(m_state_mutex); + m_state->nonsync_transact_notify(lock, *this, version); +} + +void SyncSession::revive_if_needed() +{ + util::Optional&> handler; + { + std::unique_lock lock(m_state_mutex); + if (m_state->revive_if_needed(lock, *this)) + handler = m_config.bind_session_handler; + } + if (handler) + handler.value()(m_realm_path, m_config, shared_from_this()); +} + +void SyncSession::handle_reconnect() +{ + std::unique_lock lock(m_state_mutex); + m_state->handle_reconnect(lock, *this); +} + +void SyncSession::log_out() +{ + std::unique_lock lock(m_state_mutex); + m_state->log_out(lock, *this); +} + +void SyncSession::close() +{ + std::unique_lock lock(m_state_mutex); + m_state->close(lock, *this); +} + +void SyncSession::unregister(std::unique_lock& lock) +{ + REALM_ASSERT(lock.owns_lock()); + REALM_ASSERT(m_state == &State::inactive); // Must stop an active session before unregistering. + + lock.unlock(); + SyncManager::shared().unregister_session(m_realm_path); +} + +bool SyncSession::wait_for_upload_completion(std::function callback) +{ + std::unique_lock lock(m_state_mutex); + return m_state->wait_for_completion(*this, std::move(callback), &sync::Session::async_wait_for_upload_completion); +} + +bool SyncSession::wait_for_download_completion(std::function callback) +{ + std::unique_lock lock(m_state_mutex); + return m_state->wait_for_completion(*this, std::move(callback), &sync::Session::async_wait_for_download_completion); +} + +uint64_t SyncSession::register_progress_notifier(std::function notifier, + NotifierType direction, bool is_streaming) +{ + std::function invocation; + uint64_t token_value = 0; + { + std::lock_guard lock(m_progress_notifier_mutex); + token_value = m_progress_notifier_token++; + NotifierPackage package{std::move(notifier), is_streaming, direction}; + if (!m_current_progress) { + // Simply register the package, since we have no data yet. + m_notifiers.emplace(token_value, std::move(package)); + return token_value; + } + package.update(*m_current_progress, m_latest_progress_data_is_fresh); + bool skip_registration = false; + invocation = package.create_invocation(*m_current_progress, skip_registration); + if (skip_registration) { + token_value = 0; + } else { + m_notifiers.emplace(token_value, std::move(package)); + } + } + invocation(); + return token_value; +} + +void SyncSession::unregister_progress_notifier(uint64_t token) +{ + std::lock_guard lock(m_progress_notifier_mutex); + m_notifiers.erase(token); +} + +void SyncSession::refresh_access_token(std::string access_token, util::Optional server_url) +{ + std::unique_lock lock(m_state_mutex); + if (!m_server_url && !server_url) { + // The first time this method is called, the server URL must be provided. + return; + } + m_state->refresh_access_token(lock, *this, std::move(access_token), server_url); +} + +void SyncSession::override_server(std::string address, int port) +{ + std::unique_lock lock(m_state_mutex); + m_state->override_server(lock, *this, std::move(address), port); +} + +void SyncSession::set_multiplex_identifier(std::string multiplex_identity) +{ + m_multiplex_identity = std::move(multiplex_identity); +} + +SyncSession::PublicState SyncSession::state() const +{ + std::unique_lock lock(m_state_mutex); + if (m_state == &State::waiting_for_access_token) { + return PublicState::WaitingForAccessToken; + } else if (m_state == &State::active) { + return PublicState::Active; + } else if (m_state == &State::dying) { + return PublicState::Dying; + } else if (m_state == &State::inactive) { + return PublicState::Inactive; + } + REALM_UNREACHABLE(); +} + +// Represents a reference to the SyncSession from outside of the sync subsystem. +// We attempt to keep the SyncSession in an active state as long as it has an external reference. +class SyncSession::ExternalReference { +public: + ExternalReference(std::shared_ptr session) : m_session(std::move(session)) + {} + + ~ExternalReference() + { + m_session->did_drop_external_reference(); + } + +private: + std::shared_ptr m_session; +}; + +std::shared_ptr SyncSession::external_reference() +{ + std::unique_lock lock(m_state_mutex); + + if (auto external_reference = m_external_reference.lock()) + return std::shared_ptr(external_reference, this); + + auto external_reference = std::make_shared(shared_from_this()); + m_external_reference = external_reference; + return std::shared_ptr(external_reference, this); +} + +std::shared_ptr SyncSession::existing_external_reference() +{ + std::unique_lock lock(m_state_mutex); + + if (auto external_reference = m_external_reference.lock()) + return std::shared_ptr(external_reference, this); + + return nullptr; +} + +void SyncSession::did_drop_external_reference() +{ + std::unique_lock lock(m_state_mutex); + + // If the session is being resurrected we should not close the session. + if (!m_external_reference.expired()) + return; + + m_state->close(lock, *this); +}