added iOS source code
[wl-app.git] / iOS / Pods / Realm / Realm / ObjectStore / src / sync / sync_session.cpp
1 ////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright 2016 Realm Inc.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 ////////////////////////////////////////////////////////////////////////////
18
19 #include "sync/sync_session.hpp"
20
21 #include "sync/impl/sync_client.hpp"
22 #include "sync/impl/sync_file.hpp"
23 #include "sync/impl/sync_metadata.hpp"
24 #include "sync/sync_manager.hpp"
25 #include "sync/sync_user.hpp"
26
27 #include <realm/sync/client.hpp>
28 #include <realm/sync/protocol.hpp>
29
30
31 using namespace realm;
32 using namespace realm::_impl;
33 using namespace realm::_impl::sync_session_states;
34
35 using SessionWaiterPointer = void(sync::Session::*)(std::function<void(std::error_code)>);
36
37 constexpr const char SyncError::c_original_file_path_key[];
38 constexpr const char SyncError::c_recovery_file_path_key[];
39
40 /// A state which a `SyncSession` can currently be within. State classes handle various actions
41 /// and state transitions.
42 ///
43 /// STATES:
44 ///
45 /// WAITING_FOR_ACCESS_TOKEN: upon entering this state, the binding is informed
46 /// that the session wants an access token. The session is now waiting for the
47 /// binding to provide the token.
48 /// From: INACTIVE
49 /// To:
50 ///    * ACTIVE: when the binding successfully refreshes the token
51 ///    * INACTIVE: if asked to log out, or if asked to close and the stop policy
52 ///                is Immediate.
53 ///
54 /// ACTIVE: the session is connected to the Realm Object Server and is actively
55 /// transferring data.
56 /// From: WAITING_FOR_ACCESS_TOKEN, DYING
57 /// To:
58 ///    * WAITING_FOR_ACCESS_TOKEN: if the session is informed (through the error
59 ///                                handler) that the token expired
60 ///    * INACTIVE: if asked to log out, or if asked to close and the stop policy
61 ///                is Immediate.
62 ///    * DYING: if asked to close and the stop policy is AfterChangesUploaded
63 ///
64 /// DYING: the session is performing clean-up work in preparation to be destroyed.
65 /// From: ACTIVE
66 /// To:
67 ///    * INACTIVE: when the clean-up work completes, if the session wasn't
68 ///                revived, or if explicitly asked to log out before the
69 ///                clean-up work begins
70 ///    * ACTIVE: if the session is revived
71 ///
72 /// INACTIVE: the user owning this session has logged out, the `sync::Session`
73 /// owned by this session is destroyed, and the session is quiescent.
74 /// Note that a session briefly enters this state before being destroyed, but
75 /// it can also enter this state and stay there if the user has been logged out.
76 /// From: initial, WAITING_FOR_ACCESS_TOKEN, ACTIVE, DYING
77 /// To:
78 ///    * WAITING_FOR_ACCESS_TOKEN: if the session is revived
79 ///
80 struct SyncSession::State {
81     virtual ~State() { }
82
83     // Move the given session into this state. All state transitions MUST be carried out through this method.
84     virtual void enter_state(std::unique_lock<std::mutex>&, SyncSession&) const { }
85
86     virtual void refresh_access_token(std::unique_lock<std::mutex>&,
87                                       SyncSession&, std::string,
88                                       const util::Optional<std::string>&) const { }
89
90     // Returns true iff the lock is still locked when the method returns.
91     virtual bool access_token_expired(std::unique_lock<std::mutex>&, SyncSession&) const { return true; }
92
93     virtual void nonsync_transact_notify(std::unique_lock<std::mutex>&, SyncSession&, sync::Session::version_type) const { }
94
95     // Perform any work needed to reactivate a session that is not already active.
96     // Returns true iff the session should ask the binding to get a token for `bind()`.
97     virtual bool revive_if_needed(std::unique_lock<std::mutex>&, SyncSession&) const { return false; }
98
99     // Perform any work needed to respond to the application regaining network connectivity.
100     virtual void handle_reconnect(std::unique_lock<std::mutex>&, SyncSession&) const { };
101
102     // The user that owns this session has been logged out, and the session should take appropriate action.
103     virtual void log_out(std::unique_lock<std::mutex>&, SyncSession&) const { }
104
105     // The session should be closed and moved to `inactive`, in accordance with its stop policy and other state.
106     virtual void close(std::unique_lock<std::mutex>&, SyncSession&) const { }
107
108     // Returns true iff the error has been fully handled and the error handler should immediately return.
109     virtual bool handle_error(std::unique_lock<std::mutex>&, SyncSession&, const SyncError&) const { return false; }
110
111     // Register a handler to wait for sync session uploads, downloads, or synchronization.
112     // PRECONDITION: the session state lock must be held at the time this method is called, until after it returns.
113     // Returns true iff the handler was registered, either immediately or placed in a queue for later registration.
114     virtual bool wait_for_completion(SyncSession&,
115                                      std::function<void(std::error_code)>,
116                                      SessionWaiterPointer) const {
117         return false;
118     }
119
120     virtual void override_server(std::unique_lock<std::mutex>&, SyncSession&, std::string, int) const { }
121
122     static const State& waiting_for_access_token;
123     static const State& active;
124     static const State& dying;
125     static const State& inactive;
126 };
127
128 struct sync_session_states::WaitingForAccessToken : public SyncSession::State {
129     void enter_state(std::unique_lock<std::mutex>&, SyncSession& session) const override
130     {
131         session.m_deferred_close = false;
132     }
133
134     void refresh_access_token(std::unique_lock<std::mutex>& lock, SyncSession& session,
135                               std::string access_token,
136                               const util::Optional<std::string>& server_url) const override
137     {
138         session.create_sync_session();
139
140         // Since the sync session was previously unbound, it's safe to do this from the
141         // calling thread.
142         if (!session.m_server_url) {
143             session.m_server_url = server_url;
144         }
145         if (session.m_session_has_been_bound) {
146             session.m_session->refresh(std::move(access_token));
147             session.m_session->cancel_reconnect_delay();
148         } else {
149             session.m_session->bind(*session.m_server_url, std::move(access_token));
150             session.m_session_has_been_bound = true;
151         }
152
153         if (session.m_server_override)
154             session.m_session->override_server(session.m_server_override->address, session.m_server_override->port);
155
156         // Register all the pending wait-for-completion blocks.
157         for (auto& package : session.m_completion_wait_packages) {
158             (*session.m_session.*package.waiter)(std::move(package.callback));
159         }
160         session.m_completion_wait_packages.clear();
161
162         // Handle any deferred commit notification.
163         if (session.m_deferred_commit_notification) {
164             session.m_session->nonsync_transact_notify(*session.m_deferred_commit_notification);
165             session.m_deferred_commit_notification = util::none;
166         }
167
168         session.advance_state(lock, active);
169         if (session.m_deferred_close) {
170             session.m_state->close(lock, session);
171         }
172     }
173
174     void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
175     {
176         session.advance_state(lock, inactive);
177     }
178
179     bool revive_if_needed(std::unique_lock<std::mutex>&, SyncSession& session) const override
180     {
181         session.m_deferred_close = false;
182         return false;
183     }
184
185     void handle_reconnect(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
186     {
187         // Ask the binding to retry getting the token for this session.
188         std::shared_ptr<SyncSession> session_ptr = session.shared_from_this();
189         lock.unlock();
190         session.m_config.bind_session_handler(session_ptr->m_realm_path, session_ptr->m_config, session_ptr);
191     }
192
193     void nonsync_transact_notify(std::unique_lock<std::mutex>&,
194                                  SyncSession& session,
195                                  sync::Session::version_type version) const override
196     {
197         // Notify at first available opportunity.
198         session.m_deferred_commit_notification = version;
199     }
200
201     void close(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
202     {
203         switch (session.m_config.stop_policy) {
204             case SyncSessionStopPolicy::Immediately:
205                 // Immediately kill the session.
206                 session.advance_state(lock, inactive);
207                 break;
208             case SyncSessionStopPolicy::LiveIndefinitely:
209             case SyncSessionStopPolicy::AfterChangesUploaded:
210                 // Defer handling closing the session until after the login response succeeds.
211                 session.m_deferred_close = true;
212                 break;
213         }
214     }
215
216     bool wait_for_completion(SyncSession& session,
217                              std::function<void(std::error_code)> callback,
218                              SessionWaiterPointer waiter) const override
219     {
220         session.m_completion_wait_packages.push_back({ waiter, std::move(callback) });
221         return true;
222     }
223
224     void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
225                          std::string address, int port) const override
226     {
227         session.m_server_override = SyncSession::ServerOverride{address, port};
228     }
229 };
230
231 struct sync_session_states::Active : public SyncSession::State {
232     void refresh_access_token(std::unique_lock<std::mutex>&, SyncSession& session,
233                               std::string access_token,
234                               const util::Optional<std::string>&) const override
235     {
236         session.m_session->refresh(std::move(access_token));
237         // Cancel the session's reconnection delay. This is important if the
238         // token is being refreshed as a response to a 202 (token expired)
239         // error, or similar non-fatal sync errors.
240         session.m_session->cancel_reconnect_delay();
241     }
242
243     bool access_token_expired(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
244     {
245         session.advance_state(lock, waiting_for_access_token);
246         std::shared_ptr<SyncSession> session_ptr = session.shared_from_this();
247         lock.unlock();
248         session.m_config.bind_session_handler(session_ptr->m_realm_path, session_ptr->m_config, session_ptr);
249         return false;
250     }
251
252     void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
253     {
254         session.advance_state(lock, inactive);
255     }
256
257     void nonsync_transact_notify(std::unique_lock<std::mutex>&, SyncSession& session,
258                                  sync::Session::version_type version) const override
259     {
260         // Fully ready sync session, notify immediately.
261         session.m_session->nonsync_transact_notify(version);
262     }
263
264     void close(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
265     {
266         switch (session.m_config.stop_policy) {
267             case SyncSessionStopPolicy::Immediately:
268                 session.advance_state(lock, inactive);
269                 break;
270             case SyncSessionStopPolicy::LiveIndefinitely:
271                 // Don't do anything; session lives forever.
272                 break;
273             case SyncSessionStopPolicy::AfterChangesUploaded:
274                 // Wait for all pending changes to upload.
275                 session.advance_state(lock, dying);
276                 break;
277         }
278     }
279
280     bool wait_for_completion(SyncSession& session,
281                              std::function<void(std::error_code)> callback,
282                              SessionWaiterPointer waiter) const override
283     {
284         REALM_ASSERT(session.m_session);
285         (*session.m_session.*waiter)(std::move(callback));
286         return true;
287     }
288
289     void handle_reconnect(std::unique_lock<std::mutex>&, SyncSession& session) const override
290     {
291         session.m_session->cancel_reconnect_delay();
292     }
293
294     void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
295                          std::string address, int port) const override
296     {
297         session.m_server_override = SyncSession::ServerOverride{address, port};
298         session.m_session->override_server(address, port);
299     }
300 };
301
302 struct sync_session_states::Dying : public SyncSession::State {
303     void enter_state(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
304     {
305         // If we have no session, we cannot possibly upload anything.
306         if (!session.m_session) {
307             session.advance_state(lock, inactive);
308             return;
309         }
310
311         size_t current_death_count = ++session.m_death_count;
312         std::weak_ptr<SyncSession> weak_session = session.shared_from_this();
313         session.m_session->async_wait_for_upload_completion([weak_session, current_death_count](std::error_code) {
314             if (auto session = weak_session.lock()) {
315                 std::unique_lock<std::mutex> lock(session->m_state_mutex);
316                 if (session->m_state == &State::dying && session->m_death_count == current_death_count) {
317                     session->advance_state(lock, inactive);
318                 }
319             }
320         });
321     }
322
323     bool handle_error(std::unique_lock<std::mutex>& lock, SyncSession& session, const SyncError& error) const override
324     {
325         if (error.is_fatal) {
326             session.advance_state(lock, inactive);
327         }
328         // If the error isn't fatal, don't change state, but don't
329         // allow it to be reported either.
330         // FIXME: What if the token expires while a session is dying?
331         // Should we allow the token to be refreshed so that changes
332         // can finish being uploaded?
333         return true;
334     }
335
336     bool revive_if_needed(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
337     {
338         // Revive.
339         session.advance_state(lock, active);
340         return false;
341     }
342
343     void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
344     {
345         session.advance_state(lock, inactive);
346     }
347
348     bool wait_for_completion(SyncSession& session,
349                              std::function<void(std::error_code)> callback,
350                              SessionWaiterPointer waiter) const override
351     {
352         REALM_ASSERT(session.m_session);
353         (*session.m_session.*waiter)(std::move(callback));
354         return true;
355     }
356
357     void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
358                          std::string address, int port) const override
359     {
360         session.m_server_override = SyncSession::ServerOverride{address, port};
361         session.m_session->override_server(address, port);
362     }
363 };
364
365 struct sync_session_states::Inactive : public SyncSession::State {
366     void enter_state(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
367     {
368         // Inform any queued-up completion handlers that they were cancelled.
369         for (auto& package : session.m_completion_wait_packages) {
370             package.callback(util::error::operation_aborted);
371         }
372         session.m_completion_wait_packages.clear();
373         session.m_session = nullptr;
374         session.unregister(lock);
375     }
376
377     bool revive_if_needed(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
378     {
379         session.advance_state(lock, waiting_for_access_token);
380         return true;
381     }
382
383     bool wait_for_completion(SyncSession& session,
384                              std::function<void(std::error_code)> callback,
385                              SessionWaiterPointer waiter) const override
386     {
387         session.m_completion_wait_packages.push_back({ waiter, std::move(callback) });
388         return true;
389     }
390
391     void override_server(std::unique_lock<std::mutex>&, SyncSession& session,
392                          std::string address, int port) const override
393     {
394         session.m_server_override = SyncSession::ServerOverride{address, port};
395     }
396 };
397
398
399 const SyncSession::State& SyncSession::State::waiting_for_access_token = WaitingForAccessToken();
400 const SyncSession::State& SyncSession::State::active = Active();
401 const SyncSession::State& SyncSession::State::dying = Dying();
402 const SyncSession::State& SyncSession::State::inactive = Inactive();
403
404 SyncSession::SyncSession(SyncClient& client, std::string realm_path, SyncConfig config)
405 : m_state(&State::inactive)
406 , m_config(std::move(config))
407 , m_realm_path(std::move(realm_path))
408 , m_client(client)
409 {
410     // Sync history validation ensures that the history within the Realm file is in a format that can be used
411     // by the version of realm-sync that we're using. Validation is enabled by default when the binding manually
412     // opens a sync session (via `SyncManager::get_session`), but is disabled when the sync session is opened
413     // as a side effect of opening a `Realm`. In that case, the sync history has already been validated by the
414     // act of opening the `Realm` so it's not necessary to repeat it here.
415     if (m_config.validate_sync_history) {
416         Realm::Config realm_config;
417         realm_config.path = m_realm_path;
418         realm_config.schema_mode = SchemaMode::Additive;
419         realm_config.force_sync_history = true;
420         realm_config.cache = false;
421
422         if (m_config.realm_encryption_key) {
423             realm_config.encryption_key.resize(64);
424             std::copy(m_config.realm_encryption_key->begin(), m_config.realm_encryption_key->end(),
425                       realm_config.encryption_key.begin());
426         }
427
428         // FIXME: Opening a Realm only to discard it is relatively expensive. It may be preferable to have
429         // realm-sync open the Realm when the `sync::Session` is created since it can continue to use it.
430         Realm::get_shared_realm(realm_config); // Throws
431    }
432 }
433
434 std::string SyncSession::get_recovery_file_path()
435 {
436     return util::reserve_unique_file_name(SyncManager::shared().recovery_directory_path(),
437                                           util::create_timestamped_template("recovered_realm"));
438 }
439
440 void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup)
441 {
442     // Add a SyncFileActionMetadata marking the Realm as needing to be deleted.
443     std::string recovery_path;
444     auto original_path = path();
445     error.user_info[SyncError::c_original_file_path_key] = original_path;
446     if (should_backup == ShouldBackup::yes) {
447         recovery_path = get_recovery_file_path();
448         error.user_info[SyncError::c_recovery_file_path_key] = recovery_path;
449     }
450     using Action = SyncFileActionMetadata::Action;
451     auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm;
452     SyncManager::shared().perform_metadata_update([this,
453                                                    action,
454                                                    original_path=std::move(original_path),
455                                                    recovery_path=std::move(recovery_path)](const auto& manager) {
456         manager.make_file_action_metadata(original_path, m_config.realm_url(), m_config.user->identity(),
457                                           action, std::move(recovery_path));
458     });
459 }
460
461 // This method should only be called from within the error handler callback registered upon the underlying `m_session`.
462 void SyncSession::handle_error(SyncError error)
463 {
464     enum class NextStateAfterError { none, inactive, error };
465     auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
466     auto error_code = error.error_code;
467
468     {
469         // See if the current state wishes to take responsibility for handling the error.
470         std::unique_lock<std::mutex> lock(m_state_mutex);
471         if (m_state->handle_error(lock, *this, error)) {
472             return;
473         }
474     }
475
476     if (error_code.category() == realm::sync::protocol_error_category()) {
477         using ProtocolError = realm::sync::ProtocolError;
478         switch (static_cast<ProtocolError>(error_code.value())) {
479             // Connection level errors
480             case ProtocolError::connection_closed:
481             case ProtocolError::other_error:
482                 // Not real errors, don't need to be reported to the binding.
483                 return;
484             case ProtocolError::unknown_message:
485             case ProtocolError::bad_syntax:
486             case ProtocolError::limits_exceeded:
487             case ProtocolError::wrong_protocol_version:
488             case ProtocolError::bad_session_ident:
489             case ProtocolError::reuse_of_session_ident:
490             case ProtocolError::bound_in_other_session:
491             case ProtocolError::bad_message_order:
492             case ProtocolError::bad_client_version:
493             case ProtocolError::illegal_realm_path:
494             case ProtocolError::no_such_realm:
495             case ProtocolError::bad_changeset:
496             case ProtocolError::bad_changeset_header_syntax:
497             case ProtocolError::bad_changeset_size:
498             case ProtocolError::bad_changesets:
499             case ProtocolError::bad_decompression:
500             case ProtocolError::partial_sync_disabled:
501                 break;
502             // Session errors
503             case ProtocolError::session_closed:
504             case ProtocolError::other_session_error:
505             case ProtocolError::disabled_session:
506                 // The binding doesn't need to be aware of these because they are strictly informational, and do not
507                 // represent actual errors.
508                 return;
509             case ProtocolError::token_expired: {
510                 std::unique_lock<std::mutex> lock(m_state_mutex);
511                 // This isn't an error from the binding's point of view. If we're connected we'll
512                 // simply ask the binding to log in again.
513                 m_state->access_token_expired(lock, *this);
514                 return;
515             }
516             case ProtocolError::bad_authentication: {
517                 std::shared_ptr<SyncUser> user_to_invalidate;
518                 next_state = NextStateAfterError::none;
519                 {
520                     std::unique_lock<std::mutex> lock(m_state_mutex);
521                     user_to_invalidate = user();
522                     cancel_pending_waits();
523                 }
524                 if (user_to_invalidate)
525                     user_to_invalidate->invalidate();
526                 break;
527             }
528             case ProtocolError::permission_denied: {
529                 next_state = NextStateAfterError::inactive;
530                 update_error_and_mark_file_for_deletion(error, ShouldBackup::no);
531                 break;
532             }
533             case ProtocolError::bad_server_file_ident:
534             case ProtocolError::bad_client_file_ident:
535             case ProtocolError::bad_server_version:
536             case ProtocolError::diverging_histories:
537                 next_state = NextStateAfterError::inactive;
538                 update_error_and_mark_file_for_deletion(error, ShouldBackup::yes);
539                 break;
540         }
541     } else if (error_code.category() == realm::sync::client_error_category()) {
542         using ClientError = realm::sync::Client::Error;
543         switch (static_cast<ClientError>(error_code.value())) {
544             case ClientError::connection_closed:
545             case ClientError::pong_timeout:
546                 // Not real errors, don't need to be reported to the binding.
547                 return;
548             case ClientError::unknown_message:
549             case ClientError::bad_syntax:
550             case ClientError::limits_exceeded:
551             case ClientError::bad_session_ident:
552             case ClientError::bad_message_order:
553             case ClientError::bad_file_ident_pair:
554             case ClientError::bad_progress:
555             case ClientError::bad_changeset_header_syntax:
556             case ClientError::bad_changeset_size:
557             case ClientError::bad_origin_file_ident:
558             case ClientError::bad_server_version:
559             case ClientError::bad_changeset:
560             case ClientError::bad_request_ident:
561             case ClientError::bad_error_code:
562             case ClientError::bad_compression:
563             case ClientError::bad_client_version:
564             case ClientError::ssl_server_cert_rejected:
565                 // Don't do anything special for these errors.
566                 // Future functionality may require special-case handling for existing
567                 // errors, or newly introduced error codes.
568                 break;
569         }
570     } else {
571         // Unrecognized error code.
572         error.is_unrecognized_by_client = true;
573     }
574     switch (next_state) {
575         case NextStateAfterError::none:
576             break;
577         case NextStateAfterError::inactive: {
578             std::unique_lock<std::mutex> lock(m_state_mutex);
579             advance_state(lock, State::inactive);
580             break;
581         }
582         case NextStateAfterError::error: {
583             std::unique_lock<std::mutex> lock(m_state_mutex);
584             cancel_pending_waits();
585             break;
586         }
587     }
588     if (m_config.error_handler) {
589         m_config.error_handler(shared_from_this(), std::move(error));
590     }
591 }
592
593 void SyncSession::cancel_pending_waits()
594 {
595     // Inform any queued-up completion handlers that they were cancelled.
596     for (auto& package : m_completion_wait_packages) {
597         package.callback(util::error::operation_aborted);
598     }
599     m_completion_wait_packages.clear();
600 }
601
602 void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable,
603                                          uint64_t uploaded, uint64_t uploadable, bool is_fresh)
604 {
605     std::vector<std::function<void()>> invocations;
606     {
607         std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
608         m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded};
609         m_latest_progress_data_is_fresh = is_fresh;
610
611         for (auto it = m_notifiers.begin(); it != m_notifiers.end();) {
612             auto& package = it->second;
613             package.update(*m_current_progress, is_fresh);
614
615             bool should_delete = false;
616             invocations.emplace_back(package.create_invocation(*m_current_progress, should_delete));
617
618             it = (should_delete ? m_notifiers.erase(it) : std::next(it));
619         }
620     }
621     // Run the notifiers only after we've released the lock.
622     for (auto& invocation : invocations) {
623         invocation();
624     }
625 }
626
627 void SyncSession::NotifierPackage::update(const Progress& current_progress, bool data_is_fresh)
628 {
629     if (is_streaming || captured_transferrable || !data_is_fresh)
630         return;
631
632     captured_transferrable = direction == NotifierType::download ? current_progress.downloadable
633                                                                  : current_progress.uploadable;
634 }
635
636 // PRECONDITION: `update()` must first be called on the same package.
637 std::function<void()> SyncSession::NotifierPackage::create_invocation(const Progress& current_progress,
638                                                                       bool& is_expired) const
639 {
640     // It's possible for a non-streaming notifier to not yet have fresh transferrable bytes data.
641     // In that case, we don't call it at all.
642     // NOTE: `update()` is always called before `create_invocation()`, and will
643     // set `captured_transferrable` on the notifier package if fresh data has
644     // been received and the package is for a non-streaming notifier.
645     if (!is_streaming && !captured_transferrable)
646         return [](){ };
647
648     bool is_download = direction == NotifierType::download;
649     uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
650     uint64_t transferrable;
651     if (is_streaming) {
652         transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
653     } else {
654         transferrable = *captured_transferrable;
655     }
656     // A notifier is expired if at least as many bytes have been transferred
657     // as were originally considered transferrable.
658     is_expired = !is_streaming && transferred >= *captured_transferrable;
659     return [=, package=*this](){
660         package.notifier(transferred, transferrable);
661     };
662 }
663
664 void SyncSession::create_sync_session()
665 {
666     if (m_session)
667         return;
668
669     sync::Session::Config session_config;
670     session_config.changeset_cooker = m_config.transformer;
671     session_config.encryption_key = m_config.realm_encryption_key;
672     session_config.verify_servers_ssl_certificate = m_config.client_validate_ssl;
673     session_config.ssl_trust_certificate_path = m_config.ssl_trust_certificate_path;
674     session_config.ssl_verify_callback = m_config.ssl_verify_callback;
675     session_config.multiplex_ident = m_multiplex_identity;
676     m_session = m_client.make_session(m_realm_path, std::move(session_config));
677
678     // The next time we get a token, call `bind()` instead of `refresh()`.
679     m_session_has_been_bound = false;
680
681     // Configure the error handler.
682     std::weak_ptr<SyncSession> weak_self = shared_from_this();
683     auto wrapped_handler = [this, weak_self](std::error_code error_code, bool is_fatal, std::string message) {
684         auto self = weak_self.lock();
685         if (!self) {
686             // An error was delivered after the session it relates to was destroyed. There's nothing useful
687             // we can do with it.
688             return;
689         }
690         handle_error(SyncError{error_code, std::move(message), is_fatal});
691     };
692     m_session->set_error_handler(std::move(wrapped_handler));
693
694     // Configure the sync transaction callback.
695     auto wrapped_callback = [this, weak_self](VersionID old_version, VersionID new_version) {
696         if (auto self = weak_self.lock()) {
697             if (m_sync_transact_callback) {
698                 m_sync_transact_callback(old_version, new_version);
699             }
700         }
701     };
702     m_session->set_sync_transact_callback(std::move(wrapped_callback));
703
704     // Set up the wrapped progress handler callback
705     auto wrapped_progress_handler = [this, weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
706                                                       uint_fast64_t uploaded, uint_fast64_t uploadable,
707                                                       bool is_fresh, uint_fast64_t /*snapshot_version*/) {
708         if (auto self = weak_self.lock()) {
709             handle_progress_update(downloaded, downloadable, uploaded, uploadable, is_fresh);
710         }
711     };
712     m_session->set_progress_handler(std::move(wrapped_progress_handler));
713 }
714
715 void SyncSession::set_sync_transact_callback(std::function<sync::Session::SyncTransactCallback> callback)
716 {
717     m_sync_transact_callback = std::move(callback);
718 }
719
720 void SyncSession::advance_state(std::unique_lock<std::mutex>& lock, const State& state)
721 {
722     REALM_ASSERT(lock.owns_lock());
723     REALM_ASSERT(&state != m_state);
724     m_state = &state;
725     m_state->enter_state(lock, *this);
726 }
727
728 void SyncSession::nonsync_transact_notify(sync::Session::version_type version)
729 {
730     std::unique_lock<std::mutex> lock(m_state_mutex);
731     m_state->nonsync_transact_notify(lock, *this, version);
732 }
733
734 void SyncSession::revive_if_needed()
735 {
736     util::Optional<std::function<SyncBindSessionHandler>&> handler;
737     {
738         std::unique_lock<std::mutex> lock(m_state_mutex);
739         if (m_state->revive_if_needed(lock, *this))
740             handler = m_config.bind_session_handler;
741     }
742     if (handler)
743         handler.value()(m_realm_path, m_config, shared_from_this());
744 }
745
746 void SyncSession::handle_reconnect()
747 {
748     std::unique_lock<std::mutex> lock(m_state_mutex);
749     m_state->handle_reconnect(lock, *this);
750 }
751
752 void SyncSession::log_out()
753 {
754     std::unique_lock<std::mutex> lock(m_state_mutex);
755     m_state->log_out(lock, *this);
756 }
757
758 void SyncSession::close()
759 {
760     std::unique_lock<std::mutex> lock(m_state_mutex);
761     m_state->close(lock, *this);
762 }
763
764 void SyncSession::unregister(std::unique_lock<std::mutex>& lock)
765 {
766     REALM_ASSERT(lock.owns_lock());
767     REALM_ASSERT(m_state == &State::inactive); // Must stop an active session before unregistering.
768
769     lock.unlock();
770     SyncManager::shared().unregister_session(m_realm_path);
771 }
772
773 bool SyncSession::wait_for_upload_completion(std::function<void(std::error_code)> callback)
774 {
775     std::unique_lock<std::mutex> lock(m_state_mutex);
776     return m_state->wait_for_completion(*this, std::move(callback), &sync::Session::async_wait_for_upload_completion);
777 }
778
779 bool SyncSession::wait_for_download_completion(std::function<void(std::error_code)> callback)
780 {
781     std::unique_lock<std::mutex> lock(m_state_mutex);
782     return m_state->wait_for_completion(*this, std::move(callback), &sync::Session::async_wait_for_download_completion);
783 }
784
785 uint64_t SyncSession::register_progress_notifier(std::function<SyncProgressNotifierCallback> notifier,
786                                                  NotifierType direction, bool is_streaming)
787 {
788     std::function<void()> invocation;
789     uint64_t token_value = 0;
790     {
791         std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
792         token_value = m_progress_notifier_token++;
793         NotifierPackage package{std::move(notifier), is_streaming, direction};
794         if (!m_current_progress) {
795             // Simply register the package, since we have no data yet.
796             m_notifiers.emplace(token_value, std::move(package));
797             return token_value;
798         }
799         package.update(*m_current_progress, m_latest_progress_data_is_fresh);
800         bool skip_registration = false;
801         invocation = package.create_invocation(*m_current_progress, skip_registration);
802         if (skip_registration) {
803             token_value = 0;
804         } else {
805             m_notifiers.emplace(token_value, std::move(package));
806         }
807     }
808     invocation();
809     return token_value;
810 }
811
812 void SyncSession::unregister_progress_notifier(uint64_t token)
813 {
814     std::lock_guard<std::mutex> lock(m_progress_notifier_mutex);
815     m_notifiers.erase(token);
816 }
817
818 void SyncSession::refresh_access_token(std::string access_token, util::Optional<std::string> server_url)
819 {
820     std::unique_lock<std::mutex> lock(m_state_mutex);
821     if (!m_server_url && !server_url) {
822         // The first time this method is called, the server URL must be provided.
823         return;
824     }
825     m_state->refresh_access_token(lock, *this, std::move(access_token), server_url);
826 }
827
828 void SyncSession::override_server(std::string address, int port)
829 {
830     std::unique_lock<std::mutex> lock(m_state_mutex);
831     m_state->override_server(lock, *this, std::move(address), port);
832 }
833
834 void SyncSession::set_multiplex_identifier(std::string multiplex_identity)
835 {
836     m_multiplex_identity = std::move(multiplex_identity);
837 }
838
839 SyncSession::PublicState SyncSession::state() const
840 {
841     std::unique_lock<std::mutex> lock(m_state_mutex);
842     if (m_state == &State::waiting_for_access_token) {
843         return PublicState::WaitingForAccessToken;
844     } else if (m_state == &State::active) {
845         return PublicState::Active;
846     } else if (m_state == &State::dying) {
847         return PublicState::Dying;
848     } else if (m_state == &State::inactive) {
849         return PublicState::Inactive;
850     }
851     REALM_UNREACHABLE();
852 }
853
854 // Represents a reference to the SyncSession from outside of the sync subsystem.
855 // We attempt to keep the SyncSession in an active state as long as it has an external reference.
856 class SyncSession::ExternalReference {
857 public:
858     ExternalReference(std::shared_ptr<SyncSession> session) : m_session(std::move(session))
859     {}
860
861     ~ExternalReference()
862     {
863         m_session->did_drop_external_reference();
864     }
865
866 private:
867     std::shared_ptr<SyncSession> m_session;
868 };
869
870 std::shared_ptr<SyncSession> SyncSession::external_reference()
871 {
872     std::unique_lock<std::mutex> lock(m_state_mutex);
873
874     if (auto external_reference = m_external_reference.lock())
875         return std::shared_ptr<SyncSession>(external_reference, this);
876
877     auto external_reference = std::make_shared<ExternalReference>(shared_from_this());
878     m_external_reference = external_reference;
879     return std::shared_ptr<SyncSession>(external_reference, this);
880 }
881
882 std::shared_ptr<SyncSession> SyncSession::existing_external_reference()
883 {
884     std::unique_lock<std::mutex> lock(m_state_mutex);
885
886     if (auto external_reference = m_external_reference.lock())
887         return std::shared_ptr<SyncSession>(external_reference, this);
888
889     return nullptr;
890 }
891
892 void SyncSession::did_drop_external_reference()
893 {
894     std::unique_lock<std::mutex> lock(m_state_mutex);
895
896     // If the session is being resurrected we should not close the session.
897     if (!m_external_reference.expired())
898         return;
899
900     m_state->close(lock, *this);
901 }