added iOS source code
[wl-app.git] / iOS / Pods / Realm / include / core / realm / sync / protocol.hpp
1 /*************************************************************************
2  *
3  * REALM CONFIDENTIAL
4  * __________________
5  *
6  *  [2011] - [2016] Realm Inc
7  *  All Rights Reserved.
8  *
9  * NOTICE:  All information contained herein is, and remains
10  * the property of Realm Incorporated and its suppliers,
11  * if any.  The intellectual and technical concepts contained
12  * herein are proprietary to Realm Incorporated
13  * and its suppliers and may be covered by U.S. and Foreign Patents,
14  * patents in process, and are protected by trade secret or copyright law.
15  * Dissemination of this information or reproduction of this material
16  * is strictly forbidden unless prior written permission is obtained
17  * from Realm Incorporated.
18  *
19  **************************************************************************/
20 #ifndef REALM_SYNC_PROTOCOL_HPP
21 #define REALM_SYNC_PROTOCOL_HPP
22
23 #include <system_error>
24
25 #include <realm/util/buffer_stream.hpp>
26 #include <realm/util/compression.hpp>
27 #include <realm/util/logger.hpp>
28 #include <realm/util/memory_stream.hpp>
29 #include <realm/util/optional.hpp>
30 #include <realm/impl/clamped_hex_dump.hpp>
31 #include <realm/sync/transform.hpp>
32 #include <realm/sync/history.hpp>
33
34 #include <realm/sync/client.hpp> // Get rid of this?
35
36
37 // NOTE: The protocol specification is in `/doc/protocol.md`
38
39
40 namespace realm {
41 namespace sync {
42
43 // Protocol versions:
44 //
45 //   1 Initial version.
46 //
47 //   2 Introduces the UNBOUND message (sent from server to client in
48 //     response to a BIND message).
49 //
50 //   3 Introduces the ERROR message (sent from server to client before the
51 //     server closes a connection). Introduces MARK message from client to
52 //     server, and MARK response message from server to client as a way for the
53 //     client to wait for download to complete.
54 //
55 //   4 User token and signature are now passed as a single string (see
56 //     /doc/protocol.md for details). Also, `application_ident` parameter
57 //     removed from IDENT message.
58 //
59 //   5 IDENT message renamed to CLIENT, and ALLOC message (client->server)
60 //     renamed to IDENT. Also, <client info> parameter added to CLIENT
61 //     message. Also, the protocol has been changed to make the clients
62 //     acquisition of a server allocated file identifier pair be part of a
63 //     session from the servers point of view. File identifier and version
64 //     parameters moved from the BIND message to a new IDENT message sent by
65 //     client when it has obtained the file identifier pair. Both the new IDENT
66 //     message and the ALLOC message sent by the server are now properly
67 //     associated with a session.
68 //
69 //   6 Server session IDs have been added to the IDENT, DOWNLOAD, and PROGRESS
70 //     messages, and the "Divergent history" error code was added as an
71 //     indication that a server version / session ID pair does not match the
72 //     server's history.
73 //
74 //   7 FIXME: Who introduced version 7? Please describe what changed.
75 //
76 //   8 Error code (`bad_authentication`) moved from 200-range to 300-range
77 //     because it is now session specific. Other error codes were renumbered.
78 //
79 //   9 New format of the DOWNLOAD message to support progress reporting on the
80 //     client
81 //
82 //  10 Error codes reordered (now categorized as either connection or session
83 //     level errors).
84 //
85 //  11 Bugfixes in Link List and ChangeLinkTargets merge rules, that
86 //     make previous versions incompatible.
87 //
88 //  12 FIXME What was 12?
89 //
90 //  13 Bugfixes in Link List and ChangeLinkTargets merge rules, that
91 //     make previous versions incompatible.
92 //
93 //  14 Further bugfixes related to primary keys and link lists. Add support for
94 //     LinkListSwap.
95 //
96 //  15 Deleting an object with a primary key deletes all objects on other
97 //     with the same primary key.
98 //
99 //  16 Downloadable bytes added to DOWNLOAD message. It is used for download progress
100 //     by the client
101 //
102 //  17 Added PING and PONG messages. It is used for rtt monitoring and dead
103 //     connection detection by both the client and the server.
104 //
105 //  18 Enhanced the session_ident to accept values of size up to at least 63 bits.
106 //
107 //  19 New instruction log format with stable object IDs and arrays of
108 //     primitives (Generalized LinkList* commands to Container* commands)
109 //     Message format is identical to version 18.
110 //
111 //  20 Added support for log compaction in DOWNLOAD message.
112 //
113 //  21 Removed "class_" prefix in instructions referencing tables.
114 //
115 //  22 Fixed a bug in the merge rule of MOVE vs SWAP.
116
117 constexpr int get_current_protocol_version() noexcept
118 {
119     return 22;
120 }
121
122 /// \brief Protocol errors discovered by the server, and reported to the client
123 /// by way of ERROR messages.
124 ///
125 /// These errors will be reported to the client-side application via the error
126 /// handlers of the affected sessions.
127 ///
128 /// ATTENTION: Please remember to update is_session_level_error() when
129 /// adding/removing error codes.
130 enum class ProtocolError {
131     // Connection level and protocol errors
132     connection_closed            = 100, // Connection closed (no error)
133     other_error                  = 101, // Other connection level error
134     unknown_message              = 102, // Unknown type of input message
135     bad_syntax                   = 103, // Bad syntax in input message head
136     limits_exceeded              = 104, // Limits exceeded in input message
137     wrong_protocol_version       = 105, // Wrong protocol version (CLIENT)
138     bad_session_ident            = 106, // Bad session identifier in input message
139     reuse_of_session_ident       = 107, // Overlapping reuse of session identifier (BIND)
140     bound_in_other_session       = 108, // Client file bound in other session (IDENT)
141     bad_message_order            = 109, // Bad input message order
142     bad_decompression            = 110, // Error in decompression (UPLOAD)
143     bad_changeset_header_syntax  = 111, // Bad syntax in a changeset header (UPLOAD)
144     bad_changeset_size           = 112, // Bad size specified in changeset header (UPLOAD)
145     bad_changesets               = 113, // Bad changesets (UPLOAD)
146
147     // Session level errors
148     session_closed               = 200, // Session closed (no error)
149     other_session_error          = 201, // Other session level error
150     token_expired                = 202, // Access token expired
151     bad_authentication           = 203, // Bad user authentication (BIND, REFRESH)
152     illegal_realm_path           = 204, // Illegal Realm path (BIND)
153     no_such_realm                = 205, // No such Realm (BIND)
154     permission_denied            = 206, // Permission denied (BIND, REFRESH)
155     bad_server_file_ident        = 207, // Bad server file identifier (IDENT) (obsolete!)
156     bad_client_file_ident        = 208, // Bad client file identifier (IDENT)
157     bad_server_version           = 209, // Bad server version (IDENT, UPLOAD)
158     bad_client_version           = 210, // Bad client version (IDENT, UPLOAD)
159     diverging_histories          = 211, // Diverging histories (IDENT)
160     bad_changeset                = 212, // Bad changeset (UPLOAD)
161     disabled_session             = 213, // Disabled session
162     partial_sync_disabled        = 214, // Partial sync disabled (BIND)
163 };
164
165 inline constexpr bool is_session_level_error(ProtocolError error)
166 {
167     return int(error) >= 200 && int(error) <= 299;
168 }
169
170 /// Returns null if the specified protocol error code is not defined by
171 /// ProtocolError.
172 const char* get_protocol_error_message(int error_code) noexcept;
173
174 const std::error_category& protocol_error_category() noexcept;
175
176 std::error_code make_error_code(ProtocolError) noexcept;
177
178 } // namespace sync
179 } // namespace realm
180
181 namespace std {
182
183 template<> struct is_error_code_enum<realm::sync::ProtocolError> {
184     static const bool value = true;
185 };
186
187 } // namespace std
188
189 namespace realm {
190 namespace sync {
191 namespace protocol {
192
193 using OutputBuffer = util::ResettableExpandableBufferOutputStream;
194 using session_ident_type = uint_fast64_t;
195 using file_ident_type = uint_fast64_t;
196 using version_type = uint_fast64_t;
197 using timestamp_type  = uint_fast64_t;
198 using request_ident_type    = uint_fast64_t;
199 using ReceivedChangesets = std::vector<Transformer::RemoteChangeset>;
200
201
202 class ClientProtocol {
203 public:
204     util::Logger& logger;
205
206     enum class Error {
207         unknown_message             = 101, // Unknown type of input message
208         bad_syntax                  = 102, // Bad syntax in input message head
209         limits_exceeded             = 103, // Limits exceeded in input message
210         bad_changeset_header_syntax = 108, // Bad syntax in changeset header (DOWNLOAD)
211         bad_changeset_size          = 109, // Bad changeset size in changeset header (DOWNLOAD)
212         bad_server_version          = 111, // Bad server version in changeset header (DOWNLOAD)
213         bad_error_code              = 114, ///< Bad error code (ERROR)
214         bad_decompression           = 115, // Error in decompression (DOWNLOAD)
215     };
216
217     ClientProtocol(util::Logger& logger);
218
219
220     /// Messages sent by the client.
221
222     void make_client_message(OutputBuffer& out, const std::string& client_info);
223
224     void make_bind_message(OutputBuffer& out, session_ident_type session_ident,
225                            const std::string& server_path,
226                            const std::string& signed_user_token,
227                            bool need_file_ident_pair);
228
229     void make_refresh_message(OutputBuffer& out, session_ident_type session_ident,
230                               const std::string& signed_user_token);
231
232     void make_ident_message(OutputBuffer& out, session_ident_type session_ident,
233                             file_ident_type server_file_ident,
234                             file_ident_type client_file_ident,
235                             int_fast64_t client_file_ident_secret,
236                             SyncProgress progress);
237
238     class UploadMessageBuilder {
239     public:
240         util::Logger& logger;
241
242         UploadMessageBuilder(util::Logger& logger,
243                              OutputBuffer& body_buffer,
244                              std::vector<char>& compression_buffer,
245                              util::compression::CompressMemoryArena& compress_memory_arena);
246
247         void add_changeset(version_type client_version, version_type server_version,
248                            timestamp_type timestamp, ChunkedBinaryData changeset);
249
250         void make_upload_message(OutputBuffer& out, session_ident_type session_ident);
251
252     private:
253         size_t m_num_changesets = 0;
254         OutputBuffer& m_body_buffer;
255         std::vector<char>& m_compression_buffer;
256         util::compression::CompressMemoryArena& m_compress_memory_arena;
257     };
258
259     UploadMessageBuilder make_upload_message_builder();
260
261     void make_upload_message(OutputBuffer& out, session_ident_type session_ident,
262                              version_type client_version, version_type server_version,
263                              size_t changeset_size, timestamp_type timestamp,
264                              const std::unique_ptr<char[]>& body_buffer);
265
266     void make_unbind_message(OutputBuffer& out, session_ident_type session_ident);
267
268     void make_mark_message(OutputBuffer& out, session_ident_type session_ident,
269                            request_ident_type request_ident);
270
271     void make_ping(OutputBuffer& out, uint_fast64_t timestamp, uint_fast64_t rtt);
272
273
274     // Messages received by the client.
275
276     // parse_pong_received takes a (WebSocket) pong and parses it.
277     // The result of the parsing is handled by an object of type Connection.
278     // Typically, Connection would be the Connection class from client.cpp
279     template <typename Connection>
280     void parse_pong_received(Connection& connection, const char* data, size_t size)
281     {
282         util::MemoryInputStream in;
283         in.set_buffer(data, data + size);
284         in.unsetf(std::ios_base::skipws);
285
286         uint_fast64_t timestamp;
287
288         char newline;
289         in >> timestamp >> newline;
290         bool good_syntax = in && size_t(in.tellg()) == size && newline == '\n';
291         if (!good_syntax)
292             goto bad_syntax;
293
294         connection.receive_pong(timestamp);
295         return;
296
297     bad_syntax:
298         logger.error("Bad syntax in input message '%1'",
299                      StringData(data, size));
300         connection.handle_protocol_error(Error::bad_syntax); // Throws
301         return;
302     }
303
304     // parse_message_received takes a (WebSocket) message and parses it.
305     // The result of the parsing is handled by an object of type Connection.
306     // Typically, Connection would be the Connection class from client.cpp
307     template <typename Connection>
308     void parse_message_received(Connection& connection, const char* data, size_t size)
309     {
310         util::MemoryInputStream in;
311         in.set_buffer(data, data + size);
312         in.unsetf(std::ios_base::skipws);
313         size_t header_size = 0;
314         std::string message_type;
315         in >> message_type;
316
317         if (message_type == "download") {
318             session_ident_type session_ident;
319             SyncProgress progress;
320             int is_body_compressed;
321             size_t uncompressed_body_size, compressed_body_size;
322             char sp_1, sp_2, sp_3, sp_4, sp_5, sp_6, sp_7, sp_8, sp_9, sp_10, newline;
323
324             in >> sp_1 >> session_ident >> sp_2 >> progress.scan_server_version >> sp_3 >>
325                 progress.scan_client_version >> sp_4 >> progress.latest_server_version >>
326                 sp_5 >> progress.latest_server_session_ident >> sp_6 >>
327                 progress.latest_client_version >> sp_7 >> progress.downloadable_bytes >>
328                 sp_8 >> is_body_compressed >> sp_9 >> uncompressed_body_size >> sp_10 >>
329                 compressed_body_size >> newline; // Throws
330
331             bool good_syntax = in && sp_1 == ' ' && sp_2 == ' ' &&
332                 sp_3 == ' ' && sp_4 == ' ' && sp_5 == ' ' && sp_6 == ' ' &&
333                 sp_7 == ' ' && sp_8 == ' ' && sp_9 == ' ' && sp_10 == ' ' &&
334                 newline == '\n';
335             if (!good_syntax)
336                 goto bad_syntax;
337             header_size = size_t(in.tellg());
338             if (uncompressed_body_size > s_max_body_size)
339                 goto limits_exceeded;
340
341             size_t body_size = is_body_compressed ? compressed_body_size : uncompressed_body_size;
342             if (header_size + body_size != size)
343                 goto bad_syntax;
344
345             BinaryData body(data + header_size, body_size);
346             BinaryData uncompressed_body;
347
348             std::unique_ptr<char[]> uncompressed_body_buffer;
349             // if is_body_compressed == true, we must decompress the received body.
350             if (is_body_compressed) {
351                 uncompressed_body_buffer.reset(new char[uncompressed_body_size]);
352                 std::error_code ec = util::compression::decompress(body.data(),  compressed_body_size,
353                                                                    uncompressed_body_buffer.get(),
354                                                                    uncompressed_body_size);
355
356                 if (ec) {
357                     logger.error("compression::inflate: %1", ec.message());
358                     connection.handle_protocol_error(Error::bad_decompression);
359                     return;
360                 }
361
362                 uncompressed_body = BinaryData(uncompressed_body_buffer.get(), uncompressed_body_size);
363             }
364             else {
365                 uncompressed_body = body;
366             }
367
368             logger.debug("Download message compression: is_body_compressed = %1, "
369                          "compressed_body_size=%2, uncompressed_body_size=%3",
370                          is_body_compressed, compressed_body_size, uncompressed_body_size);
371
372             util::MemoryInputStream in;
373             in.unsetf(std::ios_base::skipws);
374             in.set_buffer(uncompressed_body.data(), uncompressed_body.data() + uncompressed_body_size);
375
376             ReceivedChangesets received_changesets;
377
378             // Loop through the body and find the changesets.
379             size_t position = 0;
380             while (position < uncompressed_body_size) {
381                 version_type server_version;
382                 version_type client_version;
383                 timestamp_type origin_timestamp;
384                 file_ident_type origin_client_file_ident;
385                 size_t original_changeset_size, changeset_size;
386                 char sp_1, sp_2, sp_3, sp_4, sp_5, sp_6;
387
388                 in >> server_version >> sp_1 >> client_version >> sp_2 >> origin_timestamp >>
389                     sp_3 >> origin_client_file_ident >> sp_4 >> original_changeset_size >>
390                     sp_5 >> changeset_size >> sp_6;
391
392                 bool good_syntax = in && sp_1 == ' ' && sp_2 == ' ' &&
393                     sp_3 == ' ' && sp_4 == ' ' && sp_5 == ' ' && sp_6 == ' ';
394
395                 if (!good_syntax) {
396                     logger.error("Bad changeset header syntax");
397                     connection.handle_protocol_error(Error::bad_changeset_header_syntax);
398                     return;
399                 }
400
401                 // Update position to the end of the change set
402                 position = size_t(in.tellg()) + changeset_size;
403
404                 if (position > uncompressed_body_size) {
405                     logger.error("Bad changeset size");
406                     connection.handle_protocol_error(Error::bad_changeset_size);
407                     return;
408                 }
409
410                 if (server_version == 0) {
411                     // The received changeset can never have version 0.
412                     logger.error("Bad server version");
413                     connection.handle_protocol_error(Error::bad_server_version);
414                     return;
415                 }
416
417                 BinaryData changeset_data(uncompressed_body.data() + size_t(in.tellg()), changeset_size);
418                 in.seekg(position);
419
420                 if (logger.would_log(util::Logger::Level::trace)) {
421                     logger.trace("Received: DOWNLOAD CHANGESET(server_version=%1, client_version=%2, "
422                                   "origin_timestamp=%3, origin_client_file_ident=%4, original_changeset_size=%5, changeset_size=%6)",
423                                   server_version, client_version, origin_timestamp,
424                                   origin_client_file_ident, original_changeset_size, changeset_size); // Throws
425                     logger.trace("Changeset: %1",
426                                  _impl::clamped_hex_dump(changeset_data)); // Throws
427                 }
428
429                 Transformer::RemoteChangeset changeset_2(server_version, client_version,
430                                                          changeset_data, origin_timestamp,
431                                                          origin_client_file_ident);
432                 changeset_2.original_changeset_size = original_changeset_size;
433                 received_changesets.push_back(changeset_2); // Throws
434             }
435
436             connection.receive_download_message(session_ident, progress, received_changesets); // Throws
437             return;
438         }
439         if (message_type == "unbound") {
440             session_ident_type session_ident;
441             char sp_1, newline;
442             in >> sp_1 >> session_ident >> newline; // Throws
443             bool good_syntax = in && size_t(in.tellg()) == size && sp_1 == ' ' &&
444                 newline == '\n';
445             if (!good_syntax)
446                 goto bad_syntax;
447             header_size = size_t(in.tellg());
448
449             connection.receive_unbound_message(session_ident); // Throws
450             return;
451         }
452         if (message_type == "error") {
453             int error_code;
454             size_t message_size;
455             bool try_again;
456             session_ident_type session_ident;
457             char sp_1, sp_2, sp_3, sp_4, newline;
458             in >> sp_1 >> error_code >> sp_2 >> message_size >> sp_3 >> try_again >> sp_4 >>
459                 session_ident >> newline; // Throws
460             bool good_syntax = in && sp_1 == ' ' && sp_2 == ' ' && sp_3 == ' ' &&
461                 sp_4 == ' ' && newline == '\n';
462             if (!good_syntax)
463                 goto bad_syntax;
464             header_size = size_t(in.tellg());
465             if (header_size + message_size != size)
466                 goto bad_syntax;
467
468             bool unknown_error = !get_protocol_error_message(error_code);
469             if (unknown_error) {
470                 logger.error("Bad error code"); // Throws
471                 connection.handle_protocol_error(Error::bad_error_code);
472                 return;
473             }
474
475             std::string message{data + header_size, message_size}; // Throws (copy)
476
477             connection.receive_error_message(error_code, message_size, try_again, session_ident, message); // Throws
478             return;
479         }
480         if (message_type == "mark") {
481             session_ident_type session_ident;
482             request_ident_type request_ident;
483             char sp_1, sp_2, newline;
484             in >> sp_1 >> session_ident >> sp_2 >> request_ident >> newline; // Throws
485             bool good_syntax = in && size_t(in.tellg()) == size && sp_1 == ' ' &&
486                 sp_2 == ' ' && newline == '\n';
487             if (!good_syntax)
488                 goto bad_syntax;
489             header_size = size_t(in.tellg());
490
491             connection.receive_mark_message(session_ident, request_ident); // Throws
492             return;
493         }
494         if (message_type == "alloc") {
495             session_ident_type session_ident;
496             file_ident_type server_file_ident, client_file_ident;
497             int_fast64_t client_file_ident_secret;
498             char sp_1, sp_2, sp_3, sp_4, newline;
499             in >> sp_1 >> session_ident >> sp_2 >> server_file_ident >> sp_3 >>
500                 client_file_ident >> sp_4 >> client_file_ident_secret >> newline; // Throws
501             bool good_syntax = in && size_t(in.tellg()) == size && sp_1 == ' ' &&
502                 sp_2 == ' ' && sp_3 == ' ' && sp_4 == ' ' && newline == '\n';
503             if (!good_syntax)
504                 goto bad_syntax;
505             header_size = size_t(in.tellg());
506
507             connection.receive_alloc_message(session_ident,server_file_ident, client_file_ident,
508                                              client_file_ident_secret); // Throws
509             return;
510         }
511
512         logger.error("Unknown input message type '%1'",
513                      StringData(data, size));
514         connection.handle_protocol_error(Error::unknown_message);
515         return;
516     bad_syntax:
517         logger.error("Bad syntax in input message '%1'",
518                      StringData(data, size));
519         connection.handle_protocol_error(Error::bad_syntax);
520         return;
521     limits_exceeded:
522         logger.error("Limits exceeded in input message '%1'",
523                      StringData(data, header_size));
524         connection.handle_protocol_error(Error::limits_exceeded);
525         return;
526     }
527
528 private:
529     static constexpr size_t s_max_body_size = std::numeric_limits<size_t>::max();
530
531     // Permanent buffer to use for building messages.
532     OutputBuffer m_output_buffer;
533
534     // Permanent buffers to use for internal purposes such as compression.
535     std::vector<char> m_buffer;
536
537     util::compression::CompressMemoryArena m_compress_memory_arena;
538 };
539
540
541 class ServerProtocol {
542 public:
543     util::Logger& logger;
544
545     enum class Error {
546         unknown_message             = 101, // Unknown type of input message
547         bad_syntax                  = 102, // Bad syntax in input message head
548         limits_exceeded             = 103, // Limits exceeded in input message
549         bad_decompression           = 104, // Error in decompression (UPLOAD)
550         bad_changeset_header_syntax = 105, // Bad syntax in changeset header (UPLOAD)
551         bad_changeset_size          = 106, // Changeset size doesn't fit in message (UPLOAD)
552     };
553
554     ServerProtocol(util::Logger& logger);
555
556     // Messages sent by the server to the client
557
558     void make_alloc_message(OutputBuffer& out, session_ident_type session_ident,
559                             file_ident_type server_file_ident,
560                             file_ident_type client_file_ident,
561                             std::int_fast64_t client_file_ident_secret);
562
563     void make_unbound_message(OutputBuffer& out, session_ident_type session_ident);
564
565
566     struct ChangesetInfo {
567         version_type server_version;
568         version_type client_version;
569         HistoryEntry entry;
570         size_t original_size;
571     };
572
573     void make_download_message(int protocol_version, OutputBuffer& out, session_ident_type session_ident,
574                                version_type scan_server_version,
575                                version_type scan_client_version,
576                                version_type latest_server_version,
577                                int_fast64_t latest_server_session_ident,
578                                version_type latest_client_version,
579                                uint_fast64_t downloadable_bytes,
580                                std::size_t num_changesets, BinaryData body);
581
582     void make_error_message(OutputBuffer& out, ProtocolError error_code,
583                             const char* message, size_t message_size,
584                             bool try_again, session_ident_type session_ident);
585
586     void make_mark_message(OutputBuffer& out, session_ident_type session_ident,
587                            request_ident_type request_ident);
588
589     void make_pong(OutputBuffer& out, uint_fast64_t timestamp);
590
591     // Messages received by the server.
592
593     // parse_ping_received takes a (WebSocket) ping and parses it.
594     // The result of the parsing is handled by an object of type Connection.
595     // Typically, Connection would be the Connection class from server.cpp
596     template <typename Connection>
597     void parse_ping_received(Connection& connection, const char* data, size_t size)
598     {
599         util::MemoryInputStream in;
600         in.set_buffer(data, data + size);
601         in.unsetf(std::ios_base::skipws);
602
603         int_fast64_t timestamp, rtt;
604
605         char sp_1, newline;
606         in >> timestamp >> sp_1 >> rtt >> newline;
607         bool good_syntax = in && size_t(in.tellg()) == size && sp_1 == ' ' &&
608             newline == '\n';
609         if (!good_syntax)
610             goto bad_syntax;
611
612         connection.receive_ping(timestamp, rtt);
613         return;
614
615     bad_syntax:
616         logger.error("Bad syntax in PING message '%1'",
617                      StringData(data, size));
618         connection.handle_protocol_error(Error::bad_syntax);
619         return;
620     }
621
622     // UploadChangeset is used to store received changesets in
623     // the UPLOAD message.
624     struct UploadChangeset {
625         version_type client_version;
626         version_type server_version;
627         timestamp_type timestamp;
628         BinaryData changeset;
629     };
630
631     // parse_message_received takes a (WebSocket) message and parses it.
632     // The result of the parsing is handled by an object of type Connection.
633     // Typically, Connection would be the Connection class from server.cpp
634     template <typename Connection>
635     void parse_message_received(Connection& connection, const char* data, size_t size)
636     {
637         util::MemoryInputStream in;
638         in.set_buffer(data, data + size);
639         in.unsetf(std::ios_base::skipws);
640         size_t header_size = 0;
641         std::string message_type;
642         in >> message_type;
643
644         if (message_type == "upload") {
645             session_ident_type session_ident;
646             int is_body_compressed;
647             size_t uncompressed_body_size, compressed_body_size;
648             char sp_1, sp_2, sp_3, sp_4, newline;
649             in >> sp_1 >> session_ident >> sp_2 >> is_body_compressed >> sp_3 >>
650                 uncompressed_body_size >> sp_4 >> compressed_body_size >>
651                 newline;
652             bool good_syntax = in && sp_1 == ' ' && sp_2 == ' ' && sp_3 == ' ' &&
653                 sp_4 == ' ' && newline == '\n';
654
655             if (!good_syntax)
656                 goto bad_syntax;
657             header_size = size_t(in.tellg());
658             if (uncompressed_body_size > s_max_body_size)
659                 goto limits_exceeded;
660
661             size_t body_size = is_body_compressed ? compressed_body_size : uncompressed_body_size;
662             if (header_size + body_size != size)
663                 goto bad_syntax;
664
665             BinaryData body(data + header_size, body_size);
666
667             BinaryData uncompressed_body;
668             std::unique_ptr<char[]> uncompressed_body_buffer;
669             // if is_body_compressed == true, we must decompress the received body.
670             if (is_body_compressed) {
671                 uncompressed_body_buffer.reset(new char[uncompressed_body_size]);
672                 std::error_code ec = util::compression::decompress(body.data(),  compressed_body_size,
673                                                                    uncompressed_body_buffer.get(),
674                                                                    uncompressed_body_size);
675
676                 if (ec) {
677                     logger.error("compression::inflate: %1", ec.message());
678                     connection.handle_protocol_error(Error::bad_decompression);
679                     return;
680                 }
681
682                 uncompressed_body = BinaryData(uncompressed_body_buffer.get(), uncompressed_body_size);
683             }
684             else {
685                 uncompressed_body = body;
686             }
687
688             logger.debug("Upload message compression: is_body_compressed = %1, "
689                          "compressed_body_size=%2, uncompressed_body_size=%3",
690                          is_body_compressed, compressed_body_size, uncompressed_body_size);
691
692             util::MemoryInputStream in;
693             in.unsetf(std::ios_base::skipws);
694             in.set_buffer(uncompressed_body.data(), uncompressed_body.data() + uncompressed_body_size);
695
696             std::vector<UploadChangeset> upload_changesets;
697
698             // Loop through the body and find the changesets.
699             size_t position = 0;
700             while (position < uncompressed_body_size) {
701                 version_type client_version;
702                 version_type server_version;
703                 timestamp_type timestamp;
704                 size_t changeset_size;
705                 char sp_1, sp_2, sp_3, sp_4;
706
707                 in >> client_version >> sp_1 >> server_version >> sp_2 >> timestamp >>
708                     sp_3 >> changeset_size >> sp_4;
709
710                 bool good_syntax = in && sp_1 == ' ' && sp_2 == ' ' &&
711                     sp_3 == ' ' && sp_4 == ' ';
712
713                 if (!good_syntax) {
714                     logger.error("Bad changeset header syntax");
715                     connection.handle_protocol_error(Error::bad_changeset_header_syntax);
716                     return;
717                 }
718
719                 // Update position to the end of the change set
720                 position = size_t(in.tellg()) + changeset_size;
721
722                 if (position > uncompressed_body_size) {
723                     logger.error("Bad changeset size");
724                     connection.handle_protocol_error(Error::bad_changeset_size);
725                     return;
726                 }
727
728                 BinaryData changeset_data(uncompressed_body.data() +
729                                           size_t(in.tellg()), changeset_size);
730                 in.seekg(position);
731
732                 if (logger.would_log(util::Logger::Level::trace)) {
733                     logger.trace(
734                         "Received: UPLOAD CHANGESET(client_version=%1, "
735                         "server_version=%2, timestamp=%3, changeset_size=%4)",
736                         client_version, server_version, timestamp,
737                         changeset_size); // Throws
738                     logger.trace("Changeset: %1",
739                                  _impl::clamped_hex_dump(changeset_data)); // Throws
740                 }
741
742                 UploadChangeset upload_changeset {
743                     client_version,
744                     server_version,
745                     timestamp,
746                     changeset_data
747                 };
748
749                 upload_changesets.push_back(upload_changeset); // Throws
750             }
751
752             connection.receive_upload_message(session_ident,upload_changesets); // Throws
753             return;
754         }
755         if (message_type == "mark") {
756             session_ident_type session_ident;
757             request_ident_type request_ident;
758             char sp_1, sp_2, newline;
759             in >> sp_1 >> session_ident >> sp_2 >> request_ident >> newline;
760             bool good_syntax = in && size_t(in.tellg()) == size &&
761                 sp_1 == ' ' && sp_2 == ' ' && newline == '\n';
762             if (!good_syntax)
763                 goto bad_syntax;
764             header_size = size;
765
766             connection.receive_mark_message(session_ident, request_ident); // Throws
767             return;
768         }
769         if (message_type == "bind") {
770             session_ident_type session_ident;
771             size_t path_size;
772             size_t signed_user_token_size;
773             bool need_file_ident_pair;
774             char sp_1, sp_2, sp_3, sp_4, newline;
775             in >> sp_1 >> session_ident >> sp_2 >> path_size >> sp_3 >>
776                 signed_user_token_size >> sp_4 >> need_file_ident_pair >>
777                 newline;
778             bool good_syntax = in && sp_1 == ' ' && sp_2 == ' ' &&
779                 sp_3 == ' ' && sp_4 == ' ' && newline == '\n';
780             if (!good_syntax)
781                 goto bad_syntax;
782             header_size = size_t(in.tellg());
783             if (path_size == 0)
784                 goto bad_syntax;
785             if (path_size > s_max_path_size)
786                 goto limits_exceeded;
787             if (signed_user_token_size > s_max_signed_user_token_size)
788                 goto limits_exceeded;
789             if (header_size + path_size + signed_user_token_size != size)
790                 goto bad_syntax;
791
792             std::string path {data + header_size, path_size}; // Throws
793             std::string signed_user_token {data + header_size + path_size,
794                 signed_user_token_size}; // Throws
795
796             connection.receive_bind_message(session_ident, std::move(path),
797                                             std::move(signed_user_token),
798                                             need_file_ident_pair); // Throws
799             return;
800         }
801         if (message_type == "refresh") {
802             session_ident_type session_ident;
803             size_t signed_user_token_size;
804             char sp_1, sp_2, newline;
805             in >> sp_1 >> session_ident >> sp_2 >> signed_user_token_size >>
806                 newline;
807             bool good_syntax = in && sp_1 == ' ' && sp_2 == ' ' && newline == '\n';
808             if (!good_syntax)
809                 goto bad_syntax;
810             header_size = size_t(in.tellg());
811             if (signed_user_token_size > s_max_signed_user_token_size)
812                 goto limits_exceeded;
813             if (header_size + signed_user_token_size != size)
814                 goto bad_syntax;
815
816             std::string signed_user_token {data + header_size, signed_user_token_size};
817
818             connection.receive_refresh_message(session_ident, std::move(signed_user_token)); // Throws
819             return;
820         }
821         if (message_type == "ident") {
822             session_ident_type session_ident;
823             file_ident_type server_file_ident, client_file_ident;
824             int_fast64_t client_file_ident_secret;
825             version_type scan_server_version, scan_client_version, latest_server_version;
826             int_fast64_t latest_server_session_ident;
827             char sp_1, sp_2, sp_3, sp_4, sp_5, sp_6, sp_7, sp_8, newline;
828             in >> sp_1 >> session_ident >> sp_2 >> server_file_ident >> sp_3 >>
829                 client_file_ident >> sp_4 >> client_file_ident_secret >> sp_5 >>
830                 scan_server_version >> sp_6 >> scan_client_version >> sp_7 >>
831                 latest_server_version >> sp_8 >> latest_server_session_ident >>
832                 newline;
833             bool good_syntax = in && size_t(in.tellg()) == size && sp_1 == ' ' &&
834                 sp_2 == ' ' && sp_3 == ' ' && sp_4 == ' ' && sp_5 == ' ' &&
835                 sp_6 == ' ' && sp_7 == ' ' && sp_8 == ' ' && newline == '\n';
836             if (!good_syntax)
837                 goto bad_syntax;
838             header_size = size;
839
840             connection.receive_ident_message(session_ident, server_file_ident, client_file_ident,
841                                              client_file_ident_secret, scan_server_version,
842                                              scan_client_version, latest_server_version,
843                                              latest_server_session_ident); // Throws
844             return;
845         }
846         if (message_type == "unbind") {
847             session_ident_type session_ident;
848             char sp_1, newline;
849             in >> sp_1 >> session_ident >> newline;
850             bool good_syntax = in && size_t(in.tellg()) == size &&
851                 sp_1 == ' ' && newline == '\n';
852             if (!good_syntax)
853                 goto bad_syntax;
854             header_size = size;
855
856             connection.receive_unbind_message(session_ident); // Throws
857             return;
858         }
859         if (message_type == "client") {
860             int_fast64_t protocol_version;
861             char sp_1, sp_2, newline;
862             size_t client_info_size;
863             in >> sp_1 >> protocol_version >> sp_2 >> client_info_size >> newline;
864             bool good_syntax = in && sp_1 == ' ' && sp_2 == ' ' && newline == '\n';
865             if (!good_syntax)
866                 goto bad_syntax;
867             header_size = size_t(in.tellg());
868             bool limits_exceeded = (client_info_size > s_max_client_info_size);
869             if (limits_exceeded)
870                 goto limits_exceeded;
871             if (header_size + client_info_size != size)
872                 goto bad_syntax;
873
874             std::string client_info {data + header_size, client_info_size}; // Throws
875
876             connection.receive_client_message(protocol_version, std::move(client_info)); // Throws
877             return;
878         }
879
880         // unknown message
881         if (size < 256)
882             logger.error("Unknown input message type '%1'", StringData(data, size)); // Throws
883         else
884             logger.error("Unknown input message type '%1'.......", StringData(data, 256)); // Throws
885
886         connection.handle_protocol_error(Error::unknown_message);
887         return;
888
889     bad_syntax:
890         logger.error("Bad syntax in input message '%1'",
891                      StringData(data, size));
892         connection.handle_protocol_error(Error::bad_syntax); // Throws
893         return;
894     limits_exceeded:
895         logger.error("Limits exceeded in input message '%1'",
896                      StringData(data, header_size));
897         connection.handle_protocol_error(Error::limits_exceeded); // Throws
898         return;
899     }
900
901     void insert_single_changeset_download_message(OutputBuffer& out, const ChangesetInfo& changeset_info);
902
903 private:
904     static constexpr size_t s_max_head_size              =  256;
905     static constexpr size_t s_max_signed_user_token_size = 2048;
906     static constexpr size_t s_max_client_info_size       = 1024;
907     static constexpr size_t s_max_path_size              = 1024;
908     static constexpr size_t s_max_body_size = std::numeric_limits<size_t>::max();
909
910     util::compression::CompressMemoryArena m_compress_memory_arena;
911
912     // Permanent buffer to use for internal purposes such as compression.
913     std::vector<char> m_buffer;
914
915     // Outputbuffer to use for internal purposes such as creating the
916     // download body.
917     OutputBuffer m_output_buffer;
918 };
919
920 // make_authorization_header() makes the value of the Authorization header used in the
921 // sync Websocket handshake.
922 std::string make_authorization_header(const std::string& signed_user_token);
923
924 // parse_authorization_header() parses the value of the Authorization header and returns
925 // the signed_user_token. None is returned in case of syntax error.
926 util::Optional<StringData> parse_authorization_header(const std::string& authorization_header);
927
928 } // namespace protocol
929 } // namespace sync
930 } // namespace realm
931
932 #endif // REALM_SYNC_PROTOCOL_HPP