added iOS source code
[wl-app.git] / iOS / Pods / Realm / include / core / realm / util / network.hpp
1 /*************************************************************************
2  *
3  * REALM CONFIDENTIAL
4  * __________________
5  *
6  *  [2011] - [2015] Realm Inc
7  *  All Rights Reserved.
8  *
9  * NOTICE:  All information contained herein is, and remains
10  * the property of Realm Incorporated and its suppliers,
11  * if any.  The intellectual and technical concepts contained
12  * herein are proprietary to Realm Incorporated
13  * and its suppliers and may be covered by U.S. and Foreign Patents,
14  * patents in process, and are protected by trade secret or copyright law.
15  * Dissemination of this information or reproduction of this material
16  * is strictly forbidden unless prior written permission is obtained
17  * from Realm Incorporated.
18  *
19  **************************************************************************/
20 #ifndef REALM_UTIL_NETWORK_HPP
21 #define REALM_UTIL_NETWORK_HPP
22
23 #include <cstddef>
24 #include <memory>
25 #include <chrono>
26 #include <string>
27 #include <system_error>
28 #include <ostream>
29
30 #include <sys/types.h>
31
32 #ifdef _WIN32
33 #  include <winsock2.h>
34 #  include <ws2tcpip.h>
35 #  include <stdio.h>
36 #  include <Ws2def.h>
37 #  pragma comment(lib, "Ws2_32.lib")
38 #else
39 #  include <sys/socket.h>
40 #  include <arpa/inet.h>
41 #  include <netdb.h>
42 #endif
43
44 #include <realm/util/features.h>
45 #include <realm/util/assert.hpp>
46 #include <realm/util/bind_ptr.hpp>
47 #include <realm/util/buffer.hpp>
48 #include <realm/util/basic_system_errors.hpp>
49
50 // Linux epoll
51 //
52 // Require Linux kernel version >= 2.6.27 such that we have epoll_create1(),
53 // `O_CLOEXEC`, and `EPOLLRDHUP`.
54 #if defined(__linux__)
55 #  include <linux/version.h>
56 #  if !defined(REALM_HAVE_EPOLL)
57 #    if !defined(REALM_DISABLE_UTIL_NETWORK_EPOLL)
58 #      if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,27)
59 #        define REALM_HAVE_EPOLL 1
60 #      endif
61 #    endif
62 #  endif
63 #endif
64 #if !defined(REALM_HAVE_EPOLL)
65 #  define REALM_HAVE_EPOLL 0
66 #endif
67
68 // FreeBSD Kqueue.
69 //
70 // Available on Mac OS X, FreeBSD, NetBSD, OpenBSD
71 #if (defined(__MACH__) && defined(__APPLE__)) || defined(__FreeBSD__) || \
72     defined(__NetBSD__) || defined(__OpenBSD__)
73 #  if !defined(REALM_HAVE_KQUEUE)
74 #    if !defined(REALM_DISABLE_UTIL_NETWORK_KQUEUE)
75 #      define REALM_HAVE_KQUEUE 1
76 #    endif
77 #  endif
78 #endif
79 #if !defined(REALM_HAVE_KQUEUE)
80 #  define REALM_HAVE_KQUEUE 0
81 #endif
82
83
84
85 // FIXME: Unfinished business around `Address::m_ip_v6_scope_id`.
86
87
88 namespace realm {
89 namespace util {
90
91 /// \brief TCP/IP networking API.
92 ///
93 /// The design of this networking API is heavily inspired by the ASIO C++
94 /// library (http://think-async.com).
95 ///
96 ///
97 /// ### Thread safety
98 ///
99 /// A *service context* is a set of objects consisting of an instance of
100 /// Service, and all the objects that are associated with that instance (\ref
101 /// Resolver, \ref Socket`, \ref Acceptor`, \ref DeadlineTimer, \ref Trigger,
102 /// and \ref ssl::Stream).
103 ///
104 /// In general, it is unsafe for two threads to call functions on the same
105 /// object, or on different objects in the same service context. This also
106 /// applies to destructors. Notable exceptions are the fully thread-safe
107 /// functions, such as Service::post(), Service::stop(), and Service::reset().
108 ///
109 /// On the other hand, it is always safe for two threads to call functions on
110 /// objects belonging to different service contexts.
111 ///
112 /// One implication of these rules is that at most one thread must execute run()
113 /// at any given time, and if one thread is executing run(), then no other
114 /// thread is allowed to access objects in the same service context (with the
115 /// mentioned exceptions).
116 ///
117 /// Unless otherwise specified, free-standing objects, such as \ref
118 /// StreamProtocol, \ref Address, \ref Endpoint, and \ref Endpoint::List are
119 /// fully thread-safe as long as they are not mutated. If one thread is mutating
120 /// such an object, no other thread may access it. Note that these free-standing
121 /// objects are not associcated with an instance of Service, and are therefore
122 /// not part of a service context.
123 ///
124 ///
125 /// ### Comparison with ASIO
126 ///
127 /// There is a crucial difference between the two libraries in regards to the
128 /// guarantees that are provided about the cancelability of asynchronous
129 /// operations. The Realm networking library (this library) considers an
130 /// asynchronous operation to be complete precisely when the completion handler
131 /// starts to execute, and it guarantees that such an operation is cancelable up
132 /// until that point in time. In particular, if `cancel()` is called on a socket
133 /// or a deadline timer object before the completion handler starts to execute,
134 /// then that operation will be canceled, and will receive
135 /// `error::operation_aborted`. This guarantee is possible to provide (and free
136 /// of ambiguities) precisely because this library prohibits multiple threads
137 /// from executing the event loop concurrently, and because `cancel()` is
138 /// allowed to be called only from a completion handler (executed by the event
139 /// loop thread) or while no thread is executing the event loop. This guarantee
140 /// allows for safe destruction of sockets and deadline timers as long as the
141 /// completion handlers react appropriately to `error::operation_aborted`, in
142 /// particular, that they do not attempt to access the socket or deadline timer
143 /// object in such cases.
144 ///
145 /// ASIO, on the other hand, allows for an asynchronous operation to complete
146 /// and become **uncancellable** before the completion handler starts to
147 /// execute. For this reason, it is possible with ASIO to get the completion
148 /// handler of an asynchronous wait operation to start executing and receive an
149 /// error code other than "operation aborted" at a point in time where
150 /// `cancel()` has already been called on the deadline timer object, or even at
151 /// a point in timer where the deadline timer has been destroyed. This seems
152 /// like an inevitable consequence of the fact that ASIO allows for multiple
153 /// threads to execute the event loop concurrently. This generally forces ASIO
154 /// applications to invent ways of extending the lifetime of deadline timer and
155 /// socket objects until the completion handler starts executing.
156 ///
157 /// IMPORTANT: Even if ASIO is used in a way where at most one thread executes
158 /// the event loop, there is still no guarantee that an asynchronous operation
159 /// remains cancelable up until the point in time where the completion handler
160 /// starts to execute.
161 namespace network {
162
163 std::string host_name();
164
165
166 class StreamProtocol;
167 class Address;
168 class Endpoint;
169 class Service;
170 class Resolver;
171 class SocketBase;
172 class Socket;
173 class Acceptor;
174 class DeadlineTimer;
175 class Trigger;
176 class ReadAheadBuffer;
177 namespace ssl {
178 class Stream;
179 } // namespace ssl
180
181
182 /// \brief An IP protocol descriptor.
183 class StreamProtocol {
184 public:
185     static StreamProtocol ip_v4();
186     static StreamProtocol ip_v6();
187
188     bool is_ip_v4() const;
189     bool is_ip_v6() const;
190
191     int protocol() const;
192     int family() const;
193
194     StreamProtocol();
195     ~StreamProtocol() noexcept {}
196
197 private:
198     int m_family;
199     int m_socktype;
200     int m_protocol;
201
202     friend class Resolver;
203     friend class SocketBase;
204 };
205
206
207 /// \brief An IP address (IPv4 or IPv6).
208 class Address {
209 public:
210     bool is_ip_v4() const;
211     bool is_ip_v6() const;
212
213     template<class C, class T>
214     friend std::basic_ostream<C,T>& operator<<(std::basic_ostream<C,T>&, const Address&);
215
216     Address();
217     ~Address() noexcept {}
218
219 private:
220     using ip_v4_type = in_addr;
221     using ip_v6_type = in6_addr;
222     union union_type {
223         ip_v4_type m_ip_v4;
224         ip_v6_type m_ip_v6;
225     };
226     union_type m_union;
227     std::uint_least32_t m_ip_v6_scope_id = 0;
228     bool m_is_ip_v6 = false;
229
230     friend Address make_address(const char*, std::error_code&) noexcept;
231     friend class Endpoint;
232 };
233
234 Address make_address(const char* c_str);
235 Address make_address(const char* c_str, std::error_code& ec) noexcept;
236 Address make_address(const std::string&);
237 Address make_address(const std::string&, std::error_code& ec) noexcept;
238
239
240 /// \brief An IP endpoint.
241 ///
242 /// An IP endpoint is a triplet (`protocol`, `address`, `port`).
243 class Endpoint {
244 public:
245     using port_type = std::uint_fast16_t;
246     class List;
247
248     StreamProtocol protocol() const;
249     Address address() const;
250     port_type port() const;
251
252     Endpoint();
253     Endpoint(const StreamProtocol&, port_type);
254     Endpoint(const Address&, port_type);
255     ~Endpoint() noexcept {}
256
257     using data_type = sockaddr;
258     data_type* data();
259     const data_type* data() const;
260
261 private:
262     StreamProtocol m_protocol;
263
264     using sockaddr_base_type  = sockaddr;
265     using sockaddr_ip_v4_type = sockaddr_in;
266     using sockaddr_ip_v6_type = sockaddr_in6;
267     union sockaddr_union_type {
268         sockaddr_base_type  m_base;
269         sockaddr_ip_v4_type m_ip_v4;
270         sockaddr_ip_v6_type m_ip_v6;
271     };
272     sockaddr_union_type m_sockaddr_union;
273
274     friend class Service;
275     friend class Resolver;
276     friend class SocketBase;
277     friend class Socket;
278 };
279
280
281 /// \brief A list of IP endpoints.
282 class Endpoint::List {
283 public:
284     using iterator = const Endpoint*;
285
286     iterator begin() const noexcept;
287     iterator end() const noexcept;
288     std::size_t size() const noexcept;
289     bool empty() const noexcept;
290
291     List() noexcept = default;
292     List(List&&) noexcept = default;
293     ~List() noexcept = default;
294
295     List& operator=(List&&) noexcept = default;
296
297 private:
298     Buffer<Endpoint> m_endpoints;
299
300     friend class Resolver;
301 };
302
303
304 /// \brief TCP/IP networking service.
305 class Service {
306 public:
307     Service();
308     ~Service() noexcept;
309
310     /// \brief Execute the event loop.
311     ///
312     /// Execute completion handlers of completed asynchronous operations, or
313     /// wait for more completion handlers to become ready for
314     /// execution. Handlers submitted via post() are considered immeditely
315     /// ready. If there are no completion handlers ready for execution, and
316     /// there are no asynchronous operations in progress, run() returns.
317     ///
318     /// All completion handlers, including handlers submitted via post() will be
319     /// executed from run(), that is, by the thread that executes run(). If no
320     /// thread executes run(), then the completion handlers will not be
321     /// executed.
322     ///
323     /// Exceptions thrown by completion handlers will always propagate back
324     /// through run().
325     ///
326     /// Syncronous operations (e.g., Socket::connect()) execute independently of
327     /// the event loop, and do not require that any thread calls run().
328     void run();
329
330     /// @{ \brief Stop event loop execution.
331     ///
332     /// stop() puts the event loop into the stopped mode. If a thread is
333     /// currently executing run(), it will be made to return in a timely
334     /// fashion, that is, without further blocking. If a thread is currently
335     /// blocked in run(), it will be unblocked. Handlers that can be executed
336     /// immediately, may, or may not be executed before run() returns, but new
337     /// handlers submitted by these, will not be executed before run()
338     /// returns. Also, if a handler is submitted by a call to post, and that
339     /// call happens after stop() returns, then that handler is guaranteed to
340     /// not be executed before run() returns (assuming that reset() is not called
341     /// before run() returns).
342     ///
343     /// The event loop will remain in the stopped mode until reset() is
344     /// called. If reset() is called before run() returns, it may, or may not
345     /// cause run() to resume normal operation without returning.
346     ///
347     /// Both stop() and reset() are thread-safe, that is, they may be called by
348     /// any thread. Also, both of these function may be called from completion
349     /// handlers (including posted handlers).
350     void stop() noexcept;
351     void reset() noexcept;
352     /// @}
353
354     /// \brief Submit a handler to be executed by the event loop thread.
355     ///
356     /// Register the sepcified completion handler for immediate asynchronous
357     /// execution. The specified handler will be executed by an expression on
358     /// the form `handler()`. If the the handler object is movable, it will
359     /// never be copied. Otherwise, it will be copied as necessary.
360     ///
361     /// This function is thread-safe, that is, it may be called by any
362     /// thread. It may also be called from other completion handlers.
363     ///
364     /// The handler will never be called as part of the execution of post(). It
365     /// will always be called by a thread that is executing run(). If no thread
366     /// is currently executing run(), the handler will not be executed until a
367     /// thread starts executing run(). If post() is called while another thread
368     /// is executing run(), the handler may be called before post() returns. If
369     /// post() is called from another completion handler, the submitted handler
370     /// is guaranteed to not be called during the execution of post().
371     ///
372     /// Completion handlers added through post() will be executed in the order
373     /// that they are added. More precisely, if post() is called twice to add
374     /// two handlers, A and B, and the execution of post(A) ends before the
375     /// beginning of the execution of post(B), then A is guaranteed to execute
376     /// before B.
377     template<class H> void post(H handler);
378
379 private:
380     enum class Want { nothing = 0, read, write };
381
382     template<class Oper> class OperQueue;
383     class Descriptor;
384     class AsyncOper;
385     class WaitOperBase;
386     class TriggerExecOperBase;
387     class PostOperBase;
388     template<class H> class PostOper;
389     class IoOper;
390     class UnusedOper; // Allocated, but currently unused memory
391
392     template<class S> class BasicStreamOps;
393
394     struct OwnersOperDeleter {
395         void operator()(AsyncOper*) const noexcept;
396     };
397     struct LendersOperDeleter {
398         void operator()(AsyncOper*) const noexcept;
399     };
400     using OwnersOperPtr      = std::unique_ptr<AsyncOper,    OwnersOperDeleter>;
401     using LendersOperPtr     = std::unique_ptr<AsyncOper,    LendersOperDeleter>;
402     using LendersWaitOperPtr = std::unique_ptr<WaitOperBase, LendersOperDeleter>;
403     using LendersIoOperPtr   = std::unique_ptr<IoOper,       LendersOperDeleter>;
404
405     class IoReactor;
406     class Impl;
407     const std::unique_ptr<Impl> m_impl;
408
409     template<class Oper, class... Args>
410     static std::unique_ptr<Oper, LendersOperDeleter> alloc(OwnersOperPtr&, Args&&...);
411
412     template<class Oper> static void execute(std::unique_ptr<Oper, LendersOperDeleter>&);
413
414     using PostOperConstr = PostOperBase*(void* addr, std::size_t size, Impl&, void* cookie);
415     void do_post(PostOperConstr, std::size_t size, void* cookie);
416     template<class H>
417     static PostOperBase* post_oper_constr(void* addr, std::size_t size, Impl&, void* cookie);
418     static void recycle_post_oper(Impl&, PostOperBase*) noexcept;
419     static void trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
420     static void reset_trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
421
422     using clock = std::chrono::steady_clock;
423
424     friend class Resolver;
425     friend class SocketBase;
426     friend class Socket;
427     friend class Acceptor;
428     friend class DeadlineTimer;
429     friend class Trigger;
430     friend class ReadAheadBuffer;
431     friend class ssl::Stream;
432 };
433
434
435 template<class Oper> class Service::OperQueue {
436 public:
437     using LendersOperPtr = std::unique_ptr<Oper, LendersOperDeleter>;
438     bool empty() const noexcept;
439     void push_back(LendersOperPtr) noexcept;
440     template<class Oper2> void push_back(OperQueue<Oper2>&) noexcept;
441     LendersOperPtr pop_front() noexcept;
442     void clear() noexcept;
443     OperQueue() noexcept = default;
444     OperQueue(OperQueue&&) noexcept;
445     ~OperQueue() noexcept;
446 private:
447     Oper* m_back = nullptr;
448     template<class> friend class OperQueue;
449 };
450
451
452 class Service::Descriptor {
453 public:
454 #ifdef _WIN32
455     using native_handle_type = SOCKET;
456 #else
457     using native_handle_type = int;
458 #endif
459
460     Impl& service_impl;
461
462     Descriptor(Impl& service) noexcept;
463     ~Descriptor() noexcept;
464
465     /// \param in_blocking_mode Must be true if, and only if the passed file
466     /// descriptor refers to a file description in which the file status flag
467     /// O_NONBLOCK is not set.
468     ///
469     /// The passed file descriptor must have the file descriptor flag FD_CLOEXEC
470     /// set.
471     void assign(native_handle_type fd, bool in_blocking_mode) noexcept;
472     void close() noexcept;
473
474     bool is_open() const noexcept;
475
476     native_handle_type native_handle() const noexcept;
477     bool in_blocking_mode() const noexcept;
478
479     void accept(Descriptor&, StreamProtocol, Endpoint*, std::error_code&) noexcept;
480     std::size_t read_some(char* buffer, std::size_t size, std::error_code&) noexcept;
481     std::size_t write_some(const char* data, std::size_t size, std::error_code&) noexcept;
482
483     /// \tparam Oper An operation type inherited from IoOper with an initate()
484     /// function that initiates the operation and figures out whether it needs
485     /// to read from, or write to the underlying descriptor to
486     /// proceed. `initiate()` must return Want::read if the operation needs to
487     /// read, or Want::write if the operation needs to write. If the operation
488     /// completes immediately (e.g. due to a failure during initialization),
489     /// `initiate()` must return Want::nothing.
490     template<class Oper, class... Args>
491     void initiate_oper(std::unique_ptr<Oper, LendersOperDeleter>, Args&&...);
492
493     void ensure_blocking_mode();
494     void ensure_nonblocking_mode();
495
496 private:
497     native_handle_type m_fd = -1;
498     bool m_in_blocking_mode; // Not in nonblocking mode
499
500 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
501     bool m_read_ready;
502     bool m_write_ready;
503     bool m_imminent_end_of_input; // Kernel has seen the end of input
504     bool m_is_registered;
505     OperQueue<IoOper> m_suspended_read_ops, m_suspended_write_ops;
506
507     void deregister_for_async() noexcept;
508 #endif
509
510     bool assume_read_would_block() const noexcept;
511     bool assume_write_would_block() const noexcept;
512
513     void set_read_ready(bool) noexcept;
514     void set_write_ready(bool) noexcept;
515
516     void set_nonblock_flag(bool value);
517     void add_initiated_oper(LendersIoOperPtr, Want);
518
519     void do_close() noexcept;
520
521     friend class IoReactor;
522 };
523
524
525 class Resolver {
526 public:
527     class Query;
528
529     Resolver(Service&);
530     ~Resolver() noexcept;
531
532     /// Thread-safe.
533     Service& get_service() noexcept;
534
535     /// @{ \brief Resolve the specified query to one or more endpoints.
536     Endpoint::List resolve(const Query&);
537     Endpoint::List resolve(const Query&, std::error_code&);
538     /// @}
539
540     /// \brief Perform an asynchronous resolve operation.
541     ///
542     /// Initiate an asynchronous resolve operation. The completion handler will
543     /// be called when the operation completes. The operation completes when it
544     /// succeeds, or an error occurs.
545     ///
546     /// The completion handler is always executed by the event loop thread,
547     /// i.e., by a thread that is executing Service::run(). Conversely, the
548     /// completion handler is guaranteed to not be called while no thread is
549     /// executing Service::run(). The execution of the completion handler is
550     /// always deferred to the event loop, meaning that it never happens as a
551     /// synchronous side effect of the execution of async_resolve(), even when
552     /// async_resolve() is executed by the event loop thread. The completion
553     /// handler is guaranteed to be called eventually, as long as there is time
554     /// enough for the operation to complete or fail, and a thread is executing
555     /// Service::run() for long enough.
556     ///
557     /// The operation can be canceled by calling cancel(), and will be
558     /// automatically canceled if the resolver object is destroyed. If the
559     /// operation is canceled, it will fail with `error::operation_aborted`. The
560     /// operation remains cancelable up until the point in time where the
561     /// completion handler starts to execute. This means that if cancel() is
562     /// called before the completion handler starts to execute, then the
563     /// completion handler is guaranteed to have `error::operation_aborted`
564     /// passed to it. This is true regardless of whether cancel() is called
565     /// explicitly or implicitly, such as when the resolver is destroyed.
566     ///
567     /// The specified handler will be executed by an expression on the form
568     /// `handler(ec, endpoints)` where `ec` is the error code and `endpoints` is
569     /// an object of type `Endpoint::List`. If the the handler object is
570     /// movable, it will never be copied. Otherwise, it will be copied as
571     /// necessary.
572     ///
573     /// It is an error to start a new resolve operation (synchronous or
574     /// asynchronous) while an asynchronous resolve operation is in progress via
575     /// the same resolver object. An asynchronous resolve operation is
576     /// considered complete as soon as the completion handler starts to
577     /// execute. This means that a new resolve operation can be started from the
578     /// completion handler.
579     template<class H> void async_resolve(Query, H handler);
580
581     /// \brief Cancel all asynchronous operations.
582     ///
583     /// Cause all incomplete asynchronous operations, that are associated with
584     /// this resolver (at most one), to fail with `error::operation_aborted`. An
585     /// asynchronous operation is complete precisely when its completion handler
586     /// starts executing.
587     ///
588     /// Completion handlers of canceled operations will become immediately ready
589     /// to execute, but will never be executed directly as part of the execution
590     /// of cancel().
591     ///
592     /// Cancellation happens automatically when the resolver object is destroyed.
593     void cancel() noexcept;
594
595 private:
596     class ResolveOperBase;
597     template<class H> class ResolveOper;
598
599     using LendersResolveOperPtr = std::unique_ptr<ResolveOperBase, Service::LendersOperDeleter>;
600
601     Service::Impl& m_service_impl;
602
603     Service::OwnersOperPtr m_resolve_oper;
604
605     void initiate_oper(LendersResolveOperPtr);
606 };
607
608
609 class Resolver::Query {
610 public:
611     enum {
612         /// Locally bound socket endpoint (server side)
613         passive = AI_PASSIVE,
614
615         /// Ignore families without a configured non-loopback address
616         address_configured = AI_ADDRCONFIG
617     };
618
619     Query(std::string service_port, int init_flags = passive|address_configured);
620     Query(const StreamProtocol&, std::string service_port,
621           int init_flags = passive|address_configured);
622     Query(std::string host_name, std::string service_port,
623           int init_flags = address_configured);
624     Query(const StreamProtocol&, std::string host_name, std::string service_port,
625           int init_flags = address_configured);
626
627     ~Query() noexcept;
628
629     int flags() const;
630     StreamProtocol protocol() const;
631     std::string host() const;
632     std::string service() const;
633
634 private:
635     int m_flags;
636     StreamProtocol m_protocol;
637     std::string m_host;    // hostname
638     std::string m_service; // port
639
640     friend class Resolver;
641 };
642
643
644 class SocketBase {
645 public:
646     using native_handle_type = Service::Descriptor::native_handle_type;
647
648     ~SocketBase() noexcept;
649
650     /// Thread-safe.
651     Service& get_service() noexcept;
652
653     bool is_open() const noexcept;
654     native_handle_type native_handle() const noexcept;
655
656     /// @{ \brief Open the socket for use with the specified protocol.
657     ///
658     /// It is an error to call open() on a socket that is already open.
659     void open(const StreamProtocol&);
660     std::error_code open(const StreamProtocol&, std::error_code&);
661     /// @}
662
663     /// \brief Close this socket.
664     ///
665     /// If the socket is open, it will be closed. If it is already closed (or
666     /// never opened), this function does nothing (idempotency).
667     ///
668     /// A socket is automatically closed when destroyed.
669     ///
670     /// When the socket is closed, any incomplete asynchronous operation will be
671     /// canceled (as if cancel() was called).
672     void close() noexcept;
673
674     /// \brief Cancel all asynchronous operations.
675     ///
676     /// Cause all incomplete asynchronous operations, that are associated with
677     /// this socket, to fail with `error::operation_aborted`. An asynchronous
678     /// operation is complete precisely when its completion handler starts
679     /// executing.
680     ///
681     /// Completion handlers of canceled operations will become immediately ready
682     /// to execute, but will never be executed directly as part of the execution
683     /// of cancel().
684     void cancel() noexcept;
685
686     template<class O>
687     void get_option(O& opt) const;
688
689     template<class O>
690     std::error_code get_option(O& opt, std::error_code&) const;
691
692     template<class O>
693     void set_option(const O& opt);
694
695     template<class O>
696     std::error_code set_option(const O& opt, std::error_code&);
697
698     void bind(const Endpoint&);
699     std::error_code bind(const Endpoint&, std::error_code&);
700
701     Endpoint local_endpoint() const;
702     Endpoint local_endpoint(std::error_code&) const;
703
704 private:
705     enum opt_enum {
706         opt_ReuseAddr, ///< `SOL_SOCKET`, `SO_REUSEADDR`
707         opt_Linger,    ///< `SOL_SOCKET`, `SO_LINGER`
708         opt_NoDelay,   ///< `IPPROTO_TCP`, `TCP_NODELAY` (disable the Nagle algorithm)
709     };
710
711     template<class, int, class> class Option;
712
713 public:
714     using reuse_address = Option<bool, opt_ReuseAddr, int>;
715     using no_delay      = Option<bool, opt_NoDelay,   int>;
716
717     // linger struct defined by POSIX sys/socket.h.
718     struct linger_opt;
719     using linger = Option<linger_opt, opt_Linger, struct linger>;
720
721 protected:
722     Service::Descriptor m_desc;
723
724 private:
725     StreamProtocol m_protocol;
726
727 protected:
728     Service::OwnersOperPtr m_read_oper;  // Read or accept
729     Service::OwnersOperPtr m_write_oper; // Write or connect
730
731     SocketBase(Service&);
732
733     const StreamProtocol& get_protocol() const noexcept;
734     std::error_code do_assign(const StreamProtocol&, native_handle_type, std::error_code&);
735     void do_close() noexcept;
736
737     void get_option(opt_enum, void* value_data, std::size_t& value_size, std::error_code&) const;
738     void set_option(opt_enum, const void* value_data, std::size_t value_size, std::error_code&);
739     void map_option(opt_enum, int& level, int& option_name) const;
740
741     friend class Acceptor;
742 };
743
744
745 template<class T, int opt, class U> class SocketBase::Option {
746 public:
747     Option(T value = T());
748     T value() const;
749
750 private:
751     T m_value;
752
753     void get(const SocketBase&, std::error_code&);
754     void set(SocketBase&, std::error_code&) const;
755
756     friend class SocketBase;
757 };
758
759 struct SocketBase::linger_opt {
760     linger_opt(bool enable, int timeout_seconds = 0)
761     {
762         m_linger.l_onoff = enable ? 1 : 0;
763         m_linger.l_linger = timeout_seconds;
764     }
765
766     ::linger m_linger;
767
768     operator ::linger() const { return m_linger; }
769
770     bool enabled() const { return m_linger.l_onoff != 0; }
771     int  timeout() const { return m_linger.l_linger; }
772 };
773
774
775 /// Switching between synchronous and asynchronous operations is allowed, but
776 /// only in a nonoverlapping fashion. That is, a synchronous operation is not
777 /// allowed to run concurrently with an asynchronous one on the same
778 /// socket. Note that an asynchronous operation is considered to be running
779 /// until its completion handler starts executing.
780 class Socket: public SocketBase {
781 public:
782     Socket(Service&);
783
784     /// \brief Create a socket with an already-connected native socket handle.
785     ///
786     /// This constructor is shorthand for creating the socket with the
787     /// one-argument constructor, and then calling the two-argument assign()
788     /// with the specified protocol and native handle.
789     Socket(Service&, const StreamProtocol&, native_handle_type);
790
791     ~Socket() noexcept;
792
793     void connect(const Endpoint&);
794     std::error_code connect(const Endpoint&, std::error_code&);
795
796     /// @{ \brief Perform a synchronous read operation.
797     ///
798     /// read() will not return until the specified buffer is full, or an error
799     /// occurs. Reaching the end of input before the buffer is filled, is
800     /// considered an error, and will cause the operation to fail with
801     /// `network::end_of_input`.
802     ///
803     /// read_until() will not return until the specified buffer contains the
804     /// specified delimiter, or an error occurs. If the buffer is filled before
805     /// the delimiter is found, the operation fails with
806     /// `network::delim_not_found`. Otherwise, if the end of input is reached
807     /// before the delimiter is found, the operation fails with
808     /// `network::end_of_input`. If the operation succeeds, the last byte placed
809     /// in the buffer is the delimiter.
810     ///
811     /// The versions that take a ReadAheadBuffer argument will read through that
812     /// buffer. This allows for fewer larger reads on the underlying
813     /// socket. Since unconsumed data may be left in the read-ahead buffer after
814     /// a read operation returns, it is important that the same read-ahead
815     /// buffer is passed to the next read operation.
816     ///
817     /// The versions of read() and read_until() that do not take an
818     /// `std::error_code&` argument will throw std::system_error on failure.
819     ///
820     /// The versions that do take an `std::error_code&` argument will set \a ec
821     /// to `std::error_code()` on success, and to something else on failure. On
822     /// failure they will return the number of bytes placed in the specified
823     /// buffer before the error occured.
824     ///
825     /// \return The number of bytes places in the specified buffer upon return.
826     std::size_t read(char* buffer, std::size_t size);
827     std::size_t read(char* buffer, std::size_t size, std::error_code& ec);
828     std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&);
829     std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&, std::error_code& ec);
830     std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&);
831     std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&,
832                            std::error_code& ec);
833     /// @}
834
835     /// @{ \brief Perform a synchronous write operation.
836     ///
837     /// write() will not return until all the specified bytes have been written
838     /// to the socket, or an error occurs.
839     ///
840     /// The versions of write() that does not take an `std::error_code&`
841     /// argument will throw std::system_error on failure. When it succeeds, it
842     /// always returns \a size.
843     ///
844     /// The versions that does take an `std::error_code&` argument will set \a
845     /// ec to `std::error_code()` on success, and to something else on
846     /// failure. On success it returns \a size. On faulure it returns the number
847     /// of bytes written before the failure occured.
848     std::size_t write(const char* data, std::size_t size);
849     std::size_t write(const char* data, std::size_t size, std::error_code& ec);
850     /// @}
851
852     /// @{ \brief Read at least one byte from this socket.
853     ///
854     /// If \a size is zero, both versions of read_some() will return zero
855     /// without blocking. Read errors may or may not be detected in this case.
856     ///
857     /// Otherwise, if \a size is greater than zero, and at least one byte is
858     /// immediately available, that is, without blocking, then both versions
859     /// will read at least one byte (but generally as many immediately available
860     /// bytes as will fit into the specified buffer), and return without
861     /// blocking.
862     ///
863     /// Otherwise, both versions will block the calling thread until at least one
864     /// byte becomes available, or an error occurs.
865     ///
866     /// In this context, it counts as an error, if the end of input is reached
867     /// before at least one byte becomes available (see
868     /// `network::end_of_input`).
869     ///
870     /// If no error occurs, both versions will return the number of bytes placed
871     /// in the specified buffer, which is generally as many as are immediately
872     /// available at the time when the first byte becomes available, although
873     /// never more than \a size.
874     ///
875     /// If no error occurs, the three-argument version will set \a ec to
876     /// indicate success.
877     ///
878     /// If an error occurs, the two-argument version will throw
879     /// `std::system_error`, while the three-argument version will set \a ec to
880     /// indicate the error, and return zero.
881     ///
882     /// As long as \a size is greater than zero, the two argument version will
883     /// always return a value that is greater than zero, while the three
884     /// argument version will return a value greater than zero when, and only
885     /// when \a ec is set to indicate success (no error, and no end of input).
886     std::size_t read_some(char* buffer, std::size_t size);
887     std::size_t read_some(char* buffer, std::size_t size, std::error_code& ec);
888     /// @}
889
890     /// @{ \brief Write at least one byte to this socket.
891     ///
892     /// If \a size is zero, both versions of write_some() will return zero
893     /// without blocking. Write errors may or may not be detected in this case.
894     ///
895     /// Otherwise, if \a size is greater than zero, and at least one byte can be
896     /// written immediately, that is, without blocking, then both versions will
897     /// write at least one byte (but generally as many as can be written
898     /// immediately), and return without blocking.
899     ///
900     /// Otherwise, both versions will block the calling thread until at least one
901     /// byte can be written, or an error occurs.
902     ///
903     /// If no error occurs, both versions will return the number of bytes
904     /// written, which is generally as many as can be written immediately at the
905     /// time when the first byte can be written.
906     ///
907     /// If no error occurs, the three-argument version will set \a ec to
908     /// indicate success.
909     ///
910     /// If an error occurs, the two-argument version will throw
911     /// `std::system_error`, while the three-argument version will set \a ec to
912     /// indicate the error, and return zero.
913     ///
914     /// As long as \a size is greater than zero, the two argument version will
915     /// always return a value that is greater than zero, while the three
916     /// argument version will return a value greater than zero when, and only
917     /// when \a ec is set to indicate success.
918     std::size_t write_some(const char* data, std::size_t size);
919     std::size_t write_some(const char* data, std::size_t size, std::error_code&);
920     /// @}
921
922     /// \brief Perform an asynchronous connect operation.
923     ///
924     /// Initiate an asynchronous connect operation. The completion handler is
925     /// called when the operation completes. The operation completes when the
926     /// connection is established, or an error occurs.
927     ///
928     /// The completion handler is always executed by the event loop thread,
929     /// i.e., by a thread that is executing Service::run(). Conversely, the
930     /// completion handler is guaranteed to not be called while no thread is
931     /// executing Service::run(). The execution of the completion handler is
932     /// always deferred to the event loop, meaning that it never happens as a
933     /// synchronous side effect of the execution of async_connect(), even when
934     /// async_connect() is executed by the event loop thread. The completion
935     /// handler is guaranteed to be called eventually, as long as there is time
936     /// enough for the operation to complete or fail, and a thread is executing
937     /// Service::run() for long enough.
938     ///
939     /// The operation can be canceled by calling cancel(), and will be
940     /// automatically canceled if the socket is closed. If the operation is
941     /// canceled, it will fail with `error::operation_aborted`. The operation
942     /// remains cancelable up until the point in time where the completion
943     /// handler starts to execute. This means that if cancel() is called before
944     /// the completion handler starts to execute, then the completion handler is
945     /// guaranteed to have `error::operation_aborted` passed to it. This is true
946     /// regardless of whether cancel() is called explicitly or implicitly, such
947     /// as when the socket is destroyed.
948     ///
949     /// If the socket is not already open, it will be opened as part of the
950     /// connect operation as if by calling `open(ep.protocol())`. If the opening
951     /// operation succeeds, but the connect operation fails, the socket will be
952     /// left in the opened state.
953     ///
954     /// The specified handler will be executed by an expression on the form
955     /// `handler(ec)` where `ec` is the error code. If the the handler object is
956     /// movable, it will never be copied. Otherwise, it will be copied as
957     /// necessary.
958     ///
959     /// It is an error to start a new connect operation (synchronous or
960     /// asynchronous) while an asynchronous connect operation is in progress. An
961     /// asynchronous connect operation is considered complete as soon as the
962     /// completion handler starts to execute.
963     ///
964     /// \param ep The remote endpoint of the connection to be established.
965     template<class H> void async_connect(const Endpoint& ep, H handler);
966
967     /// @{ \brief Perform an asynchronous read operation.
968     ///
969     /// Initiate an asynchronous buffered read operation on the associated
970     /// socket. The completion handler will be called when the operation
971     /// completes, or an error occurs.
972     ///
973     /// async_read() will continue reading until the specified buffer is full,
974     /// or an error occurs. If the end of input is reached before the buffer is
975     /// filled, the operation fails with `network::end_of_input`.
976     ///
977     /// async_read_until() will continue reading until the specified buffer
978     /// contains the specified delimiter, or an error occurs. If the buffer is
979     /// filled before a delimiter is found, the operation fails with
980     /// `network::delim_not_found`. Otherwise, if the end of input is reached
981     /// before a delimiter is found, the operation fails with
982     /// `network::end_of_input`. Otherwise, if the operation succeeds, the last
983     /// byte placed in the buffer is the delimiter.
984     ///
985     /// The versions that take a ReadAheadBuffer argument will read through that
986     /// buffer. This allows for fewer larger reads on the underlying
987     /// socket. Since unconsumed data may be left in the read-ahead buffer after
988     /// a read operation completes, it is important that the same read-ahead
989     /// buffer is passed to the next read operation.
990     ///
991     /// The completion handler is always executed by the event loop thread,
992     /// i.e., by a thread that is executing Service::run(). Conversely, the
993     /// completion handler is guaranteed to not be called while no thread is
994     /// executing Service::run(). The execution of the completion handler is
995     /// always deferred to the event loop, meaning that it never happens as a
996     /// synchronous side effect of the execution of async_read() or
997     /// async_read_until(), even when async_read() or async_read_until() is
998     /// executed by the event loop thread. The completion handler is guaranteed
999     /// to be called eventually, as long as there is time enough for the
1000     /// operation to complete or fail, and a thread is executing Service::run()
1001     /// for long enough.
1002     ///
1003     /// The operation can be canceled by calling cancel() on the associated
1004     /// socket, and will be automatically canceled if the associated socket is
1005     /// closed. If the operation is canceled, it will fail with
1006     /// `error::operation_aborted`. The operation remains cancelable up until
1007     /// the point in time where the completion handler starts to execute. This
1008     /// means that if cancel() is called before the completion handler starts to
1009     /// execute, then the completion handler is guaranteed to have
1010     /// `error::operation_aborted` passed to it. This is true regardless of
1011     /// whether cancel() is called explicitly or implicitly, such as when the
1012     /// socket is destroyed.
1013     ///
1014     /// The specified handler will be executed by an expression on the form
1015     /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1016     /// bytes placed in the buffer (of type `std::size_t`). `n` is guaranteed to
1017     /// be less than, or equal to \a size. If the the handler object is movable,
1018     /// it will never be copied. Otherwise, it will be copied as necessary.
1019     ///
1020     /// It is an error to start a read operation before the associated socket is
1021     /// connected.
1022     ///
1023     /// It is an error to start a new read operation (synchronous or
1024     /// asynchronous) while an asynchronous read operation is in progress. An
1025     /// asynchronous read operation is considered complete as soon as the
1026     /// completion handler starts executing. This means that a new read
1027     /// operation can be started from the completion handler of another
1028     /// asynchronous buffered read operation.
1029     template<class H> void async_read(char* buffer, std::size_t size, H handler);
1030     template<class H> void async_read(char* buffer, std::size_t size, ReadAheadBuffer&, H handler);
1031     template<class H> void async_read_until(char* buffer, std::size_t size, char delim,
1032                                             ReadAheadBuffer&, H handler);
1033     /// @}
1034
1035     /// \brief Perform an asynchronous write operation.
1036     ///
1037     /// Initiate an asynchronous write operation. The completion handler is
1038     /// called when the operation completes. The operation completes when all
1039     /// the specified bytes have been written to the socket, or an error occurs.
1040     ///
1041     /// The completion handler is always executed by the event loop thread,
1042     /// i.e., by a thread that is executing Service::run(). Conversely, the
1043     /// completion handler is guaranteed to not be called while no thread is
1044     /// executing Service::run(). The execution of the completion handler is
1045     /// always deferred to the event loop, meaning that it never happens as a
1046     /// synchronous side effect of the execution of async_write(), even when
1047     /// async_write() is executed by the event loop thread. The completion
1048     /// handler is guaranteed to be called eventually, as long as there is time
1049     /// enough for the operation to complete or fail, and a thread is executing
1050     /// Service::run() for long enough.
1051     ///
1052     /// The operation can be canceled by calling cancel(), and will be
1053     /// automatically canceled if the socket is closed. If the operation is
1054     /// canceled, it will fail with `error::operation_aborted`. The operation
1055     /// remains cancelable up until the point in time where the completion
1056     /// handler starts to execute. This means that if cancel() is called before
1057     /// the completion handler starts to execute, then the completion handler is
1058     /// guaranteed to have `error::operation_aborted` passed to it. This is true
1059     /// regardless of whether cancel() is called explicitly or implicitly, such
1060     /// as when the socket is destroyed.
1061     ///
1062     /// The specified handler will be executed by an expression on the form
1063     /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1064     /// bytes written (of type `std::size_t`). If the the handler object is
1065     /// movable, it will never be copied. Otherwise, it will be copied as
1066     /// necessary.
1067     ///
1068     /// It is an error to start an asynchronous write operation before the
1069     /// socket is connected.
1070     ///
1071     /// It is an error to start a new write operation (synchronous or
1072     /// asynchronous) while an asynchronous write operation is in progress. An
1073     /// asynchronous write operation is considered complete as soon as the
1074     /// completion handler starts to execute. This means that a new write
1075     /// operation can be started from the completion handler of another
1076     /// asynchronous write operation.
1077     template<class H> void async_write(const char* data, std::size_t size, H handler);
1078
1079     template<class H> void async_read_some(char* buffer, std::size_t size, H handler);
1080     template<class H> void async_write_some(const char* data, std::size_t size, H handler);
1081
1082     enum shutdown_type {
1083 #ifdef _WIN32
1084         /// Shutdown the receiving side of the socket.
1085         shutdown_receive = SD_RECEIVE,
1086
1087         /// Shutdown the sending side of the socket.
1088         shutdown_send = SD_SEND,
1089
1090         /// Shutdown both sending and receiving side of the socket.
1091         shutdown_both = SD_BOTH
1092 #else
1093         shutdown_receive = SHUT_RD,
1094         shutdown_send = SHUT_WR,
1095         shutdown_both = SHUT_RDWR
1096 #endif
1097     };
1098
1099     /// @{ \brief Shut down the connected sockets sending and/or receiving
1100     /// side.
1101     ///
1102     /// It is an error to call this function when the socket is not both open
1103     /// and connected.
1104     void shutdown(shutdown_type);
1105     std::error_code shutdown(shutdown_type, std::error_code&);
1106     /// @}
1107
1108     /// @{ \brief Initialize socket with an already-connected native socket
1109     /// handle.
1110     ///
1111     /// The specified native handle must refer to a socket that is already fully
1112     /// open and connected.
1113     ///
1114     /// If the assignment operation succeeds, this socket object has taken
1115     /// ownership of the specified native handle, and the handle will be closed
1116     /// when the socket object is destroyed, (or when close() is called). If the
1117     /// operation fails, the caller still owns the specified native handle.
1118     ///
1119     /// It is an error to call connect() or async_connect() on a socket object
1120     /// that is initialized this way (unless it is first closed).
1121     ///
1122     /// It is an error to call this function on a socket object that is already
1123     /// open.
1124     void assign(const StreamProtocol&, native_handle_type);
1125     std::error_code assign(const StreamProtocol&, native_handle_type, std::error_code&);
1126     /// @}
1127
1128     /// Returns a reference to this socket, as this socket is the lowest layer
1129     /// of a stream.
1130     Socket& lowest_layer() noexcept;
1131
1132 private:
1133     using Want = Service::Want;
1134     using StreamOps = Service::BasicStreamOps<Socket>;
1135
1136     class ConnectOperBase;
1137     template<class H> class ConnectOper;
1138
1139     using LendersConnectOperPtr = std::unique_ptr<ConnectOperBase, Service::LendersOperDeleter>;
1140
1141     // `ec` untouched on success, but no immediate completion
1142     bool initiate_async_connect(const Endpoint&, std::error_code& ec);
1143     // `ec` untouched on success
1144     std::error_code finalize_async_connect(std::error_code& ec) noexcept;
1145
1146     // See Service::BasicStreamOps for details on these these 6 functions.
1147     void do_init_read_async(std::error_code&, Want&) noexcept;
1148     void do_init_write_async(std::error_code&, Want&) noexcept;
1149     std::size_t do_read_some_sync(char* buffer, std::size_t size,
1150                                   std::error_code&) noexcept;
1151     std::size_t do_write_some_sync(const char* data, std::size_t size,
1152                                    std::error_code&) noexcept;
1153     std::size_t do_read_some_async(char* buffer, std::size_t size,
1154                                    std::error_code&, Want&) noexcept;
1155     std::size_t do_write_some_async(const char* data, std::size_t size,
1156                                     std::error_code&, Want&) noexcept;
1157
1158     friend class Service::BasicStreamOps<Socket>;
1159     friend class Service::BasicStreamOps<ssl::Stream>;
1160     friend class ReadAheadBuffer;
1161     friend class ssl::Stream;
1162 };
1163
1164
1165 /// Switching between synchronous and asynchronous operations is allowed, but
1166 /// only in a nonoverlapping fashion. That is, a synchronous operation is not
1167 /// allowed to run concurrently with an asynchronous one on the same
1168 /// acceptor. Note that an asynchronous operation is considered to be running
1169 /// until its completion handler starts executing.
1170 class Acceptor: public SocketBase {
1171 public:
1172     Acceptor(Service&);
1173     ~Acceptor() noexcept;
1174
1175     static constexpr int max_connections = SOMAXCONN;
1176
1177     void listen(int backlog = max_connections);
1178     std::error_code listen(int backlog, std::error_code&);
1179
1180     void accept(Socket&);
1181     void accept(Socket&, Endpoint&);
1182     std::error_code accept(Socket&, std::error_code&);
1183     std::error_code accept(Socket&, Endpoint&, std::error_code&);
1184
1185     /// @{ \brief Perform an asynchronous accept operation.
1186     ///
1187     /// Initiate an asynchronous accept operation. The completion handler will
1188     /// be called when the operation completes. The operation completes when the
1189     /// connection is accepted, or an error occurs. If the operation succeeds,
1190     /// the specified local socket will have become connected to a remote
1191     /// socket.
1192     ///
1193     /// The completion handler is always executed by the event loop thread,
1194     /// i.e., by a thread that is executing Service::run(). Conversely, the
1195     /// completion handler is guaranteed to not be called while no thread is
1196     /// executing Service::run(). The execution of the completion handler is
1197     /// always deferred to the event loop, meaning that it never happens as a
1198     /// synchronous side effect of the execution of async_accept(), even when
1199     /// async_accept() is executed by the event loop thread. The completion
1200     /// handler is guaranteed to be called eventually, as long as there is time
1201     /// enough for the operation to complete or fail, and a thread is executing
1202     /// Service::run() for long enough.
1203     ///
1204     /// The operation can be canceled by calling cancel(), and will be
1205     /// automatically canceled if the acceptor is closed. If the operation is
1206     /// canceled, it will fail with `error::operation_aborted`. The operation
1207     /// remains cancelable up until the point in time where the completion
1208     /// handler starts to execute. This means that if cancel() is called before
1209     /// the completion handler starts to execute, then the completion handler is
1210     /// guaranteed to have `error::operation_aborted` passed to it. This is true
1211     /// regardless of whether cancel() is called explicitly or implicitly, such
1212     /// as when the acceptor is destroyed.
1213     ///
1214     /// The specified handler will be executed by an expression on the form
1215     /// `handler(ec)` where `ec` is the error code. If the the handler object is
1216     /// movable, it will never be copied. Otherwise, it will be copied as
1217     /// necessary.
1218     ///
1219     /// It is an error to start a new accept operation (synchronous or
1220     /// asynchronous) while an asynchronous accept operation is in progress. An
1221     /// asynchronous accept operation is considered complete as soon as the
1222     /// completion handler starts executing. This means that a new accept
1223     /// operation can be started from the completion handler.
1224     ///
1225     /// \param sock This is the local socket, that upon successful completion
1226     /// will have become connected to the remote socket. It must be in the
1227     /// closed state (Socket::is_open()) when async_accept() is called.
1228     ///
1229     /// \param ep Upon completion, the remote peer endpoint will have been
1230     /// assigned to this variable.
1231     template<class H> void async_accept(Socket& sock, H handler);
1232     template<class H> void async_accept(Socket& sock, Endpoint& ep, H handler);
1233     /// @}
1234
1235 private:
1236     using Want = Service::Want;
1237
1238     class AcceptOperBase;
1239     template<class H> class AcceptOper;
1240
1241     using LendersAcceptOperPtr = std::unique_ptr<AcceptOperBase, Service::LendersOperDeleter>;
1242
1243     std::error_code accept(Socket&, Endpoint*, std::error_code&);
1244     Want do_accept_async(Socket&, Endpoint*, std::error_code&) noexcept;
1245
1246     template<class H> void async_accept(Socket&, Endpoint*, H);
1247 };
1248
1249
1250 /// \brief A timer object supporting asynchronous wait operations.
1251 class DeadlineTimer {
1252 public:
1253     DeadlineTimer(Service&);
1254     ~DeadlineTimer() noexcept;
1255
1256     /// Thread-safe.
1257     Service& get_service() noexcept;
1258
1259     /// \brief Perform an asynchronous wait operation.
1260     ///
1261     /// Initiate an asynchronous wait operation. The completion handler becomes
1262     /// ready to execute when the expiration time is reached, or an error occurs
1263     /// (cancellation counts as an error here). The expiration time is the time
1264     /// of initiation plus the specified delay. The error code passed to the
1265     /// complition handler will **never** indicate success, unless the
1266     /// expiration time was reached.
1267     ///
1268     /// The completion handler is always executed by the event loop thread,
1269     /// i.e., by a thread that is executing Service::run(). Conversely, the
1270     /// completion handler is guaranteed to not be called while no thread is
1271     /// executing Service::run(). The execution of the completion handler is
1272     /// always deferred to the event loop, meaning that it never happens as a
1273     /// synchronous side effect of the execution of async_wait(), even when
1274     /// async_wait() is executed by the event loop thread. The completion
1275     /// handler is guaranteed to be called eventually, as long as there is time
1276     /// enough for the operation to complete or fail, and a thread is executing
1277     /// Service::run() for long enough.
1278     ///
1279     /// The operation can be canceled by calling cancel(), and will be
1280     /// automatically canceled if the timer is destroyed. If the operation is
1281     /// canceled, it will fail with `error::operation_aborted`. The operation
1282     /// remains cancelable up until the point in time where the completion
1283     /// handler starts to execute. This means that if cancel() is called before
1284     /// the completion handler starts to execute, then the completion handler is
1285     /// guaranteed to have `error::operation_aborted` passed to it. This is true
1286     /// regardless of whether cancel() is called explicitly or implicitly, such
1287     /// as when the timer is destroyed.
1288     ///
1289     /// The specified handler will be executed by an expression on the form
1290     /// `handler(ec)` where `ec` is the error code. If the the handler object is
1291     /// movable, it will never be copied. Otherwise, it will be copied as
1292     /// necessary.
1293     ///
1294     /// It is an error to start a new asynchronous wait operation while an
1295     /// another one is in progress. An asynchronous wait operation is in
1296     /// progress until its completion handler starts executing.
1297     template<class R, class P, class H>
1298     void async_wait(std::chrono::duration<R,P> delay, H handler);
1299
1300     /// \brief Cancel an asynchronous wait operation.
1301     ///
1302     /// If an asynchronous wait operation, that is associated with this deadline
1303     /// timer, is in progress, cause it to fail with
1304     /// `error::operation_aborted`. An asynchronous wait operation is in
1305     /// progress until its completion handler starts executing.
1306     ///
1307     /// Completion handlers of canceled operations will become immediately ready
1308     /// to execute, but will never be executed directly as part of the execution
1309     /// of cancel().
1310     ///
1311     /// Cancellation happens automatically when the timer object is destroyed.
1312     void cancel() noexcept;
1313
1314 private:
1315     template<class H> class WaitOper;
1316
1317     using clock = Service::clock;
1318
1319     Service::Impl& m_service_impl;
1320     Service::OwnersOperPtr m_wait_oper;
1321
1322     void add_oper(Service::LendersWaitOperPtr);
1323 };
1324
1325
1326 /// \brief Register a function whose invocation can be triggered repeatedly.
1327 ///
1328 /// While the function is always executed by the event loop thread, the
1329 /// triggering of its execution can be done by any thread, and the triggering
1330 /// operation is guaranteed to never throw.
1331 ///
1332 /// The function is guaranteed to not be called after the Trigger object is
1333 /// destroyed. It is safe, though, to destroy the Trigger object during the
1334 /// execution of the function.
1335 ///
1336 /// Note that even though the trigger() function is thread-safe, the Trigger
1337 /// object, as a whole, is not. In particular, construction and destruction must
1338 /// not be considered thread-safe.
1339 ///
1340 /// ### Relation to post()
1341 ///
1342 /// For a particular execution of trigger() and a particular invocation of
1343 /// Service::post(), if the execution of trigger() ends before the execution of
1344 /// Service::post() begins, then it is guaranteed that the function associated
1345 /// with the trigger gets to execute at least once after the execution of
1346 /// trigger() begins, and before the post handler gets to execute.
1347 class Trigger {
1348 public:
1349     template<class F> Trigger(Service&, F func);
1350     ~Trigger() noexcept;
1351
1352     Trigger() noexcept = default;
1353     Trigger(Trigger&&) noexcept = default;
1354     Trigger& operator=(Trigger&&) noexcept = default;
1355
1356     /// \brief Trigger another invocation of the associated function.
1357     ///
1358     /// An invocation of trigger() puts the Trigger object into the triggered
1359     /// state. It remains in the triggered state until shortly before the
1360     /// function starts to execute. While the Trigger object is in the triggered
1361     /// state, trigger() has no effect. This means that the number of executions
1362     /// of the function will generally be less that the number of times
1363     /// trigger() is invoked().
1364     ///
1365     /// A particular invocation of trigger() ensures that there will be at least
1366     /// one invocation of the associated function whose execution begins after
1367     /// the beginning of the execution of trigger(), so long as the event loop
1368     /// thread does not exit prematurely from run().
1369     ///
1370     /// If trigger() is invoked from the event loop thread, the next execution
1371     /// of the associated function will not begin until after trigger returns(),
1372     /// effectively preventing reentrancy for the associated function.
1373     ///
1374     /// If trigger() is invoked from another thread, the associated function may
1375     /// start to execute before trigger() returns.
1376     ///
1377     /// Note that the associated function can retrigger itself, i.e., if the
1378     /// associated function calls trigger(), then that will lead to another
1379     /// invocation of the associated function, but not until the first
1380     /// invocation ends (no reentrance).
1381     ///
1382     /// This function is thread-safe.
1383     void trigger() noexcept;
1384
1385 private:
1386     template<class H> class ExecOper;
1387
1388     util::bind_ptr<Service::TriggerExecOperBase> m_exec_oper;
1389 };
1390
1391
1392 class ReadAheadBuffer {
1393 public:
1394     ReadAheadBuffer();
1395
1396     /// Discard any buffered data.
1397     void clear() noexcept;
1398
1399 private:
1400     using Want = Service::Want;
1401
1402     char* m_begin = nullptr;
1403     char* m_end   = nullptr;
1404     static constexpr std::size_t s_size = 1024;
1405     const std::unique_ptr<char[]> m_buffer;
1406
1407     bool empty() const noexcept;
1408     bool read(char*& begin, char* end, int delim, std::error_code&) noexcept;
1409     template<class S> void refill_sync(S& stream, std::error_code&) noexcept;
1410     template<class S> bool refill_async(S& stream, std::error_code&, Want&) noexcept;
1411
1412     template<class> friend class Service::BasicStreamOps;
1413 };
1414
1415
1416 enum errors {
1417     /// End of input.
1418     end_of_input = 1,
1419
1420     /// Delimiter not found.
1421     delim_not_found,
1422
1423     /// Host not found (authoritative).
1424     host_not_found,
1425
1426     /// Host not found (non-authoritative).
1427     host_not_found_try_again,
1428
1429     /// The query is valid but does not have associated address data.
1430     no_data,
1431
1432     /// A non-recoverable error occurred.
1433     no_recovery,
1434
1435     /// The service is not supported for the given socket type.
1436     service_not_found,
1437
1438     /// The socket type is not supported.
1439     socket_type_not_supported,
1440
1441     /// Premature end of input (e.g., end of input before reception of SSL
1442     /// shutdown alert).
1443     premature_end_of_input
1444 };
1445
1446 std::error_code make_error_code(errors);
1447
1448 } // namespace network
1449 } // namespace util
1450 } // namespace realm
1451
1452 namespace std {
1453
1454 template<> class is_error_code_enum<realm::util::network::errors> {
1455 public:
1456     static const bool value = true;
1457 };
1458
1459 } // namespace std
1460
1461 namespace realm {
1462 namespace util {
1463 namespace network {
1464
1465
1466
1467
1468
1469 // Implementation
1470
1471 // ---------------- StreamProtocol ----------------
1472
1473 inline StreamProtocol StreamProtocol::ip_v4()
1474 {
1475     StreamProtocol prot;
1476     prot.m_family = AF_INET;
1477     return prot;
1478 }
1479
1480 inline StreamProtocol StreamProtocol::ip_v6()
1481 {
1482     StreamProtocol prot;
1483     prot.m_family = AF_INET6;
1484     return prot;
1485 }
1486
1487 inline bool StreamProtocol::is_ip_v4() const
1488 {
1489     return m_family == AF_INET;
1490 }
1491
1492 inline bool StreamProtocol::is_ip_v6() const
1493 {
1494     return m_family == AF_INET6;
1495 }
1496
1497 inline int StreamProtocol::family() const
1498 {
1499     return m_family;
1500 }
1501
1502 inline int StreamProtocol::protocol() const
1503 {
1504     return m_protocol;
1505 }
1506
1507 inline StreamProtocol::StreamProtocol():
1508     m_family{AF_UNSPEC},     // Allow both IPv4 and IPv6
1509     m_socktype{SOCK_STREAM}, // Or SOCK_DGRAM for UDP
1510     m_protocol{0}            // Any protocol
1511 {
1512 }
1513
1514 // ---------------- Address ----------------
1515
1516 inline bool Address::is_ip_v4() const
1517 {
1518     return !m_is_ip_v6;
1519 }
1520
1521 inline bool Address::is_ip_v6() const
1522 {
1523     return m_is_ip_v6;
1524 }
1525
1526 template<class C, class T>
1527 inline std::basic_ostream<C,T>& operator<<(std::basic_ostream<C,T>& out, const Address& addr)
1528 {
1529     // FIXME: Not taking `addr.m_ip_v6_scope_id` into account. What does ASIO
1530     // do?
1531     union buffer_union {
1532         char ip_v4[INET_ADDRSTRLEN];
1533         char ip_v6[INET6_ADDRSTRLEN];
1534     };
1535     char buffer[sizeof (buffer_union)];
1536     int af = addr.m_is_ip_v6 ? AF_INET6 : AF_INET;
1537 #ifdef _WIN32
1538     void* src = const_cast<void*>(reinterpret_cast<const void*>(&addr.m_union));
1539 #else
1540     const void* src = &addr.m_union;
1541 #endif
1542     const char* ret = ::inet_ntop(af, src, buffer, sizeof buffer);
1543     if (ret == 0) {
1544         std::error_code ec = make_basic_system_error_code(errno);
1545         throw std::system_error(ec);
1546     }
1547     out << ret;
1548     return out;
1549 }
1550
1551 inline Address::Address()
1552 {
1553     m_union.m_ip_v4 = ip_v4_type();
1554 }
1555
1556 inline Address make_address(const char* c_str)
1557 {
1558     std::error_code ec;
1559     Address addr = make_address(c_str, ec);
1560     if (ec)
1561         throw std::system_error(ec);
1562     return addr;
1563 }
1564
1565 inline Address make_address(const std::string& str)
1566 {
1567     std::error_code ec;
1568     Address addr = make_address(str, ec);
1569     if (ec)
1570         throw std::system_error(ec);
1571     return addr;
1572 }
1573
1574 inline Address make_address(const std::string& str, std::error_code& ec) noexcept
1575 {
1576     return make_address(str.c_str(), ec);
1577 }
1578
1579 // ---------------- Endpoint ----------------
1580
1581 inline StreamProtocol Endpoint::protocol() const
1582 {
1583     return m_protocol;
1584 }
1585
1586 inline Address Endpoint::address() const
1587 {
1588     Address addr;
1589     if (m_protocol.is_ip_v4()) {
1590         addr.m_union.m_ip_v4 = m_sockaddr_union.m_ip_v4.sin_addr;
1591     }
1592     else {
1593         addr.m_union.m_ip_v6 = m_sockaddr_union.m_ip_v6.sin6_addr;
1594         addr.m_ip_v6_scope_id = m_sockaddr_union.m_ip_v6.sin6_scope_id;
1595         addr.m_is_ip_v6 = true;
1596     }
1597     return addr;
1598 }
1599
1600 inline Endpoint::port_type Endpoint::port() const
1601 {
1602     return ntohs(m_protocol.is_ip_v4() ? m_sockaddr_union.m_ip_v4.sin_port :
1603                  m_sockaddr_union.m_ip_v6.sin6_port);
1604 }
1605
1606 inline Endpoint::data_type* Endpoint::data()
1607 {
1608     return &m_sockaddr_union.m_base;
1609 }
1610
1611 inline const Endpoint::data_type* Endpoint::data() const
1612 {
1613     return &m_sockaddr_union.m_base;
1614 }
1615
1616 inline Endpoint::Endpoint():
1617     Endpoint{StreamProtocol::ip_v4(), 0}
1618 {
1619 }
1620
1621 inline Endpoint::Endpoint(const StreamProtocol& protocol, port_type port):
1622     m_protocol{protocol}
1623 {
1624     int family = m_protocol.family();
1625     if (family == AF_INET) {
1626         m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
1627         m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
1628         m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1629         m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
1630     }
1631     else if (family == AF_INET6) {
1632         m_sockaddr_union.m_ip_v6 = sockaddr_ip_v6_type(); // Clear
1633         m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
1634         m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
1635     }
1636     else {
1637         m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
1638         m_sockaddr_union.m_ip_v4.sin_family = AF_UNSPEC;
1639         m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1640         m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
1641     }
1642 }
1643
1644 inline Endpoint::Endpoint(const Address& addr, port_type port)
1645 {
1646     if (addr.m_is_ip_v6) {
1647         m_protocol = StreamProtocol::ip_v6();
1648         m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
1649         m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
1650         m_sockaddr_union.m_ip_v6.sin6_flowinfo = 0;
1651         m_sockaddr_union.m_ip_v6.sin6_addr = addr.m_union.m_ip_v6;
1652         m_sockaddr_union.m_ip_v6.sin6_scope_id = addr.m_ip_v6_scope_id;
1653     }
1654     else {
1655         m_protocol = StreamProtocol::ip_v4();
1656         m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
1657         m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1658         m_sockaddr_union.m_ip_v4.sin_addr = addr.m_union.m_ip_v4;
1659     }
1660 }
1661
1662 inline Endpoint::List::iterator Endpoint::List::begin() const noexcept
1663 {
1664     return m_endpoints.data();
1665 }
1666
1667 inline Endpoint::List::iterator Endpoint::List::end() const noexcept
1668 {
1669     return m_endpoints.data() + m_endpoints.size();
1670 }
1671
1672 inline std::size_t Endpoint::List::size() const noexcept
1673 {
1674     return m_endpoints.size();
1675 }
1676
1677 inline bool Endpoint::List::empty() const noexcept
1678 {
1679     return m_endpoints.size() == 0;
1680 }
1681
1682 // ---------------- Service::OperQueue ----------------
1683
1684 template<class Oper> inline bool Service::OperQueue<Oper>::empty() const noexcept
1685 {
1686     return !m_back;
1687 }
1688
1689 template<class Oper> inline void Service::OperQueue<Oper>::push_back(LendersOperPtr op) noexcept
1690 {
1691     REALM_ASSERT(!op->m_next);
1692     if (m_back) {
1693         op->m_next = m_back->m_next;
1694         m_back->m_next = op.get();
1695     }
1696     else {
1697         op->m_next = op.get();
1698     }
1699     m_back = op.release();
1700 }
1701
1702 template<class Oper> template<class Oper2>
1703 inline void Service::OperQueue<Oper>::push_back(OperQueue<Oper2>& q) noexcept
1704 {
1705     if (!q.m_back)
1706         return;
1707     if (m_back)
1708         std::swap(m_back->m_next, q.m_back->m_next);
1709     m_back = q.m_back;
1710     q.m_back = nullptr;
1711 }
1712
1713 template<class Oper> inline auto Service::OperQueue<Oper>::pop_front() noexcept -> LendersOperPtr
1714 {
1715     Oper* op = nullptr;
1716     if (m_back) {
1717         op = static_cast<Oper*>(m_back->m_next);
1718         if (op != m_back) {
1719             m_back->m_next = op->m_next;
1720         }
1721         else {
1722             m_back = nullptr;
1723         }
1724         op->m_next = nullptr;
1725     }
1726     return LendersOperPtr(op);
1727 }
1728
1729 template<class Oper> inline void Service::OperQueue<Oper>::clear() noexcept
1730 {
1731     if (m_back) {
1732         LendersOperPtr op(m_back);
1733         while (op->m_next != m_back)
1734             op.reset(static_cast<Oper*>(op->m_next));
1735         m_back = nullptr;
1736     }
1737 }
1738
1739 template<class Oper> inline Service::OperQueue<Oper>::OperQueue(OperQueue&& q) noexcept:
1740     m_back{q.m_back}
1741 {
1742     q.m_back = nullptr;
1743 }
1744
1745 template<class Oper> inline Service::OperQueue<Oper>::~OperQueue() noexcept
1746 {
1747     clear();
1748 }
1749
1750 // ---------------- Service::Descriptor ----------------
1751
1752 inline Service::Descriptor::Descriptor(Impl& s) noexcept:
1753     service_impl{s}
1754 {
1755 }
1756
1757 inline Service::Descriptor::~Descriptor() noexcept
1758 {
1759     if (is_open())
1760         close();
1761 }
1762
1763 inline void Service::Descriptor::assign(native_handle_type fd, bool in_blocking_mode) noexcept
1764 {
1765     REALM_ASSERT(!is_open());
1766     m_fd = fd;
1767     m_in_blocking_mode = in_blocking_mode;
1768 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1769     m_read_ready  = false;
1770     m_write_ready = false;
1771     m_imminent_end_of_input = false;
1772     m_is_registered = false;
1773 #endif
1774 }
1775
1776 inline void Service::Descriptor::close() noexcept
1777 {
1778     REALM_ASSERT(is_open());
1779 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1780     if (m_is_registered)
1781         deregister_for_async();
1782     m_is_registered = false;
1783 #endif
1784     do_close();
1785 }
1786
1787 inline bool Service::Descriptor::is_open() const noexcept
1788 {
1789     return (m_fd != -1);
1790 }
1791
1792 inline auto Service::Descriptor::native_handle() const noexcept -> native_handle_type
1793 {
1794     return m_fd;
1795 }
1796
1797 inline bool Service::Descriptor::in_blocking_mode() const noexcept
1798 {
1799     return m_in_blocking_mode;
1800 }
1801
1802 template<class Oper, class... Args>
1803 inline void Service::Descriptor::initiate_oper(std::unique_ptr<Oper, LendersOperDeleter> op,
1804                                                Args&&... args)
1805 {
1806     Service::Want want = op->initiate(std::forward<Args>(args)...); // Throws
1807     add_initiated_oper(std::move(op), want); // Throws
1808 }
1809
1810 inline void Service::Descriptor::ensure_blocking_mode()
1811 {
1812     // Assuming that descriptors are either used mostly in blocking mode, or
1813     // mostly in nonblocking mode.
1814     if (REALM_UNLIKELY(!m_in_blocking_mode)) {
1815         bool value = false;
1816         set_nonblock_flag(value); // Throws
1817         m_in_blocking_mode = true;
1818     }
1819 }
1820
1821 inline void Service::Descriptor::ensure_nonblocking_mode()
1822 {
1823     // Assuming that descriptors are either used mostly in blocking mode, or
1824     // mostly in nonblocking mode.
1825     if (REALM_UNLIKELY(m_in_blocking_mode)) {
1826         bool value = true;
1827         set_nonblock_flag(value); // Throws
1828         m_in_blocking_mode = false;
1829     }
1830 }
1831
1832 inline bool Service::Descriptor::assume_read_would_block() const noexcept
1833 {
1834 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1835     return !m_in_blocking_mode && !m_read_ready;
1836 #else
1837     return false;
1838 #endif
1839 }
1840
1841 inline bool Service::Descriptor::assume_write_would_block() const noexcept
1842 {
1843 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1844     return !m_in_blocking_mode && !m_write_ready;
1845 #else
1846     return false;
1847 #endif
1848 }
1849
1850 inline void Service::Descriptor::set_read_ready(bool value) noexcept
1851 {
1852 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1853     m_read_ready = value;
1854 #else
1855     // No-op
1856     static_cast<void>(value);
1857 #endif
1858 }
1859
1860 inline void Service::Descriptor::set_write_ready(bool value) noexcept
1861 {
1862 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1863     m_write_ready = value;
1864 #else
1865     // No-op
1866     static_cast<void>(value);
1867 #endif
1868 }
1869
1870 // ---------------- Service ----------------
1871
1872 class Service::AsyncOper {
1873 public:
1874     bool in_use() const noexcept;
1875     bool is_complete() const noexcept;
1876     bool is_canceled() const noexcept;
1877     void cancel() noexcept;
1878     /// Every object of type \ref AsyncOper must be destroyed either by a call
1879     /// to this function or to recycle(). This function recycles the operation
1880     /// object (commits suicide), even if it throws.
1881     virtual void recycle_and_execute() = 0;
1882     /// Every object of type \ref AsyncOper must be destroyed either by a call
1883     /// to recycle_and_execute() or to this function. This function destroys the
1884     /// object (commits suicide).
1885     virtual void recycle() noexcept = 0;
1886     /// Must be called when the owner dies, and the object is in use (not an
1887     /// instance of UnusedOper).
1888     virtual void orphan()  noexcept = 0;
1889 protected:
1890     AsyncOper(std::size_t size, bool in_use) noexcept;
1891     virtual ~AsyncOper() noexcept {}
1892     void set_is_complete(bool value) noexcept;
1893     template<class H, class... Args>
1894     void do_recycle_and_execute(bool orphaned, H& handler, Args&&...);
1895     void do_recycle(bool orphaned) noexcept;
1896 private:
1897     std::size_t m_size; // Allocated number of bytes
1898     bool m_in_use = false;
1899     // Set to true when the operation completes successfully or fails. If the
1900     // operation is canceled before this happens, it will never be set to
1901     // true. Always false when not in use
1902     bool m_complete = false;
1903     // Set to true when the operation is canceled. Always false when not in use.
1904     bool m_canceled = false;
1905     AsyncOper* m_next = nullptr; // Always null when not in use
1906     template<class H, class... Args>
1907     void do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler, Args...);
1908     friend class Service;
1909 };
1910
1911 class Service::WaitOperBase: public AsyncOper {
1912 public:
1913     WaitOperBase(std::size_t size, DeadlineTimer& timer,
1914                  clock::time_point expiration_time) noexcept:
1915         AsyncOper{size, true}, // Second argument is `in_use`
1916         m_timer{&timer},
1917         m_expiration_time{expiration_time}
1918     {
1919     }
1920     void expired() noexcept
1921     {
1922         set_is_complete(true);
1923     }
1924     void recycle() noexcept override final
1925     {
1926         bool orphaned = !m_timer;
1927         REALM_ASSERT(orphaned);
1928         // Note: do_recycle() commits suicide.
1929         do_recycle(orphaned);
1930     }
1931     void orphan() noexcept override final
1932     {
1933         m_timer = nullptr;
1934     }
1935 protected:
1936     DeadlineTimer* m_timer;
1937     clock::time_point m_expiration_time;
1938     friend class Service;
1939 };
1940
1941 class Service::TriggerExecOperBase: public AsyncOper, public AtomicRefCountBase {
1942 public:
1943     TriggerExecOperBase(Impl& service) noexcept:
1944         AsyncOper{0, false}, // First arg is `size` (unused), second arg is `in_use`
1945         m_service{&service}
1946     {
1947     }
1948     void recycle() noexcept override final
1949     {
1950         REALM_ASSERT(in_use());
1951         REALM_ASSERT(!m_service);
1952         // Note: Potential suicide when `self` goes out of scope
1953         util::bind_ptr<TriggerExecOperBase> self{this, bind_ptr_base::adopt_tag{}};
1954     }
1955     void orphan() noexcept override final
1956     {
1957         REALM_ASSERT(m_service);
1958         m_service = nullptr;
1959     }
1960     void trigger() noexcept
1961     {
1962         REALM_ASSERT(m_service);
1963         Service::trigger_exec(*m_service, *this);
1964     }
1965 protected:
1966     Impl* m_service;
1967 };
1968
1969 class Service::PostOperBase: public AsyncOper {
1970 public:
1971     PostOperBase(std::size_t size, Impl& service) noexcept:
1972         AsyncOper{size, true}, // Second argument is `in_use`
1973         m_service{service}
1974     {
1975     }
1976     void recycle() noexcept override final
1977     {
1978         // Service::recycle_post_oper() destroys this operation object
1979         Service::recycle_post_oper(m_service, this);
1980     }
1981     void orphan() noexcept override final
1982     {
1983         REALM_ASSERT(false); // Never called
1984     }
1985 protected:
1986     Impl& m_service;
1987 };
1988
1989 template<class H> class Service::PostOper: public PostOperBase {
1990 public:
1991     PostOper(std::size_t size, Impl& service, H handler):
1992         PostOperBase{size, service},
1993         m_handler{std::move(handler)}
1994     {
1995     }
1996     void recycle_and_execute() override final
1997     {
1998         // Recycle the operation object before the handler is exceuted, such
1999         // that the memory is available for a new post operation that might be
2000         // initiated during the execution of the handler.
2001         bool was_recycled = false;
2002         try {
2003             H handler = std::move(m_handler); // Throws
2004             // Service::recycle_post_oper() destroys this operation object
2005             Service::recycle_post_oper(m_service, this);
2006             was_recycled = true;
2007             handler(); // Throws
2008         }
2009         catch (...) {
2010             if (!was_recycled) {
2011                 // Service::recycle_post_oper() destroys this operation object
2012                 Service::recycle_post_oper(m_service, this);
2013             }
2014             throw;
2015         }
2016     }
2017 private:
2018     H m_handler;
2019 };
2020
2021 class Service::IoOper: public AsyncOper {
2022 public:
2023     IoOper(std::size_t size) noexcept:
2024         AsyncOper{size, true} // Second argument is `in_use`
2025     {
2026     }
2027     virtual Descriptor& descriptor() noexcept = 0;
2028     /// Advance this operation and figure out out whether it needs to read from,
2029     /// or write to the underlying descriptor to advance further. This function
2030     /// must return Want::read if the operation needs to read, or Want::write if
2031     /// the operation needs to write to advance further. If the operation
2032     /// completes (due to success or failure), this function must return
2033     /// Want::nothing.
2034     virtual Want advance() noexcept = 0;
2035 };
2036
2037 class Service::UnusedOper: public AsyncOper {
2038 public:
2039     UnusedOper(std::size_t size) noexcept:
2040         AsyncOper{size, false} // Second argument is `in_use`
2041     {
2042     }
2043     void recycle_and_execute() override final
2044     {
2045         // Must never be called
2046         REALM_ASSERT(false);
2047     }
2048     void recycle() noexcept override final
2049     {
2050         // Must never be called
2051         REALM_ASSERT(false);
2052     }
2053     void orphan() noexcept override final
2054     {
2055         // Must never be called
2056         REALM_ASSERT(false);
2057     }
2058 };
2059
2060 // `S` must be a stream class with the following member functions:
2061 //
2062 //    Socket& lowest_layer() noexcept;
2063 //
2064 //    void do_init_read_async(std::error_code& ec, Want& want) noexcept;
2065 //    void do_init_write_async(std::error_code& ec, Want& want) noexcept;
2066 //
2067 //    std::size_t do_read_some_sync(char* buffer, std::size_t size,
2068 //                                  std::error_code& ec) noexcept;
2069 //    std::size_t do_write_some_sync(const char* data, std::size_t size,
2070 //                                   std::error_code& ec) noexcept;
2071 //    std::size_t do_read_some_async(char* buffer, std::size_t size,
2072 //                                   std::error_code& ec, Want& want) noexcept;
2073 //    std::size_t do_write_some_async(const char* data, std::size_t size,
2074 //                                    std::error_code& ec, Want& want) noexcept;
2075 //
2076 // If an error occurs during any of these 6 functions, the `ec` argument must be
2077 // set accordingly. Otherwise the `ec` argument must be set to
2078 // `std::error_code()`.
2079 //
2080 // The do_init_*_async() functions must update the `want` argument to indicate
2081 // how the operation must be initiated:
2082 //
2083 //    Want::read      Wait for read readiness, then call do_*_some_async().
2084 //    Want::write     Wait for write readiness, then call do_*_some_async().
2085 //    Want::nothing   Call do_*_some_async() immediately without waiting for
2086 //                    read or write readiness.
2087 //
2088 // If end-of-input occurs while reading, do_read_some_*() must fail, set `ec` to
2089 // `network::end_of_input`, and return zero.
2090 //
2091 // If an error occurs during reading or writing, do_*_some_sync() must set `ec`
2092 // accordingly (to something other than `std::system_error()`) and return
2093 // zero. Otherwise they must set `ec` to `std::system_error()` and return the
2094 // number of bytes read or written, which **must** be at least 1. If the
2095 // underlying socket is in nonblocking mode, and no bytes could be immediately
2096 // read or written these functions must fail with
2097 // `error::resource_unavailable_try_again`.
2098 //
2099 // If an error occurs during reading or writing, do_*_some_async() must set `ec`
2100 // accordingly (to something other than `std::system_error()`), `want` to
2101 // `Want::nothing`, and return zero. Otherwise they must set `ec` to
2102 // `std::system_error()` and return the number of bytes read or written, which
2103 // must be zero if no bytes could be immediately read or written. Note, in this
2104 // case it is not an error if the underlying socket is in nonblocking mode, and
2105 // no bytes could be immediately read or written. When these functions succeed,
2106 // but return zero because no bytes could be immediately read or written, they
2107 // must set `want` to something other than `Want::nothing`.
2108 //
2109 // If no error occurs, do_*_some_async() must set `want` to indicate how the
2110 // operation should proceed if additional data needs to be read or written, or
2111 // if no bytes were transferred:
2112 //
2113 //    Want::read      Wait for read readiness, then call do_*_some_async() again.
2114 //    Want::write     Wait for write readiness, then call do_*_some_async() again.
2115 //    Want::nothing   Call do_*_some_async() again without waiting for read or
2116 //                    write readiness.
2117 //
2118 // NOTE: If, for example, do_read_some_async() sets `want` to `Want::write`, it
2119 // means that the stream needs to write data to the underlying TCP socket before
2120 // it is able to deliver any additional data to the caller. While such a
2121 // situation will never occur on a raw TCP socket, it can occur on an SSL stream
2122 // (Secure Socket Layer).
2123 //
2124 // When do_*_some_async() returns `n`, at least one of the following conditions
2125 // must be true:
2126 //
2127 //    n > 0                     Bytes were transferred.
2128 //    ec != std::error_code()   An error occured.
2129 //    want != Want::nothing     Wait for read/write readiness.
2130 //
2131 // This is of critical importance, as it is the only way we can avoid falling
2132 // into a busy loop of repeated invocations of do_*_some_async().
2133 //
2134 // NOTE: do_*_some_async() are allowed to set `want` to `Want::read` or
2135 // `Want::write`, even when they succesfully transfer a nonzero number of bytes.
2136 template<class S> class Service::BasicStreamOps {
2137 public:
2138     class StreamOper;
2139     class ReadOperBase;
2140     class WriteOperBase;
2141     class BufferedReadOperBase;
2142     template<class H> class ReadOper;
2143     template<class H> class WriteOper;
2144     template<class H> class BufferedReadOper;
2145
2146     using LendersReadOperPtr          = std::unique_ptr<ReadOperBase,         LendersOperDeleter>;
2147     using LendersWriteOperPtr         = std::unique_ptr<WriteOperBase,        LendersOperDeleter>;
2148     using LendersBufferedReadOperPtr  = std::unique_ptr<BufferedReadOperBase, LendersOperDeleter>;
2149
2150     // Synchronous read
2151     static std::size_t read(S& stream, char* buffer, std::size_t size,
2152                             std::error_code& ec)
2153     {
2154         REALM_ASSERT(!stream.lowest_layer().m_read_oper ||
2155                      !stream.lowest_layer().m_read_oper->in_use());
2156         stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2157         char* begin = buffer;
2158         char* end   = buffer + size;
2159         char* curr  = begin;
2160         for (;;) {
2161             if (curr == end) {
2162                 ec = std::error_code(); // Success
2163                 break;
2164             }
2165             char* buffer_2 = curr;
2166             std::size_t size_2 = std::size_t(end - curr);
2167             std::size_t n = stream.do_read_some_sync(buffer_2, size_2, ec);
2168             if (REALM_UNLIKELY(ec))
2169                 break;
2170             REALM_ASSERT(n > 0);
2171             REALM_ASSERT(n <= size_2);
2172             curr += n;
2173         }
2174         std::size_t n = std::size_t(curr - begin);
2175         return n;
2176     }
2177
2178     // Synchronous write
2179     static std::size_t write(S& stream, const char* data, std::size_t size,
2180                              std::error_code& ec)
2181     {
2182         REALM_ASSERT(!stream.lowest_layer().m_write_oper ||
2183                      !stream.lowest_layer().m_write_oper->in_use());
2184         stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2185         const char* begin = data;
2186         const char* end   = data + size;
2187         const char* curr  = begin;
2188         for (;;) {
2189             if (curr == end) {
2190                 ec = std::error_code(); // Success
2191                 break;
2192             }
2193             const char* data_2 = curr;
2194             std::size_t size_2 = std::size_t(end - curr);
2195             std::size_t n = stream.do_write_some_sync(data_2, size_2, ec);
2196             if (REALM_UNLIKELY(ec))
2197                 break;
2198             REALM_ASSERT(n > 0);
2199             REALM_ASSERT(n <= size_2);
2200             curr += n;
2201         }
2202         std::size_t n = std::size_t(curr - begin);
2203         return n;
2204     }
2205
2206     // Synchronous read
2207     static std::size_t buffered_read(S& stream, char* buffer, std::size_t size, int delim,
2208                                      ReadAheadBuffer& rab, std::error_code& ec)
2209     {
2210         REALM_ASSERT(!stream.lowest_layer().m_read_oper ||
2211                      !stream.lowest_layer().m_read_oper->in_use());
2212         stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2213         char* begin = buffer;
2214         char* end   = buffer + size;
2215         char* curr  = begin;
2216         for (;;) {
2217             bool complete = rab.read(curr, end, delim, ec);
2218             if (complete)
2219                 break;
2220
2221             rab.refill_sync(stream, ec);
2222             if (REALM_UNLIKELY(ec))
2223                 break;
2224         }
2225         std::size_t n = (curr - begin);
2226         return n;
2227     }
2228
2229     // Synchronous read
2230     static std::size_t read_some(S& stream, char* buffer, std::size_t size,
2231                                  std::error_code& ec)
2232     {
2233         REALM_ASSERT(!stream.lowest_layer().m_read_oper ||
2234                      !stream.lowest_layer().m_read_oper->in_use());
2235         stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2236         return stream.do_read_some_sync(buffer, size, ec);
2237     }
2238
2239     // Synchronous write
2240     static std::size_t write_some(S& stream, const char* data, std::size_t size,
2241                                   std::error_code& ec)
2242     {
2243         REALM_ASSERT(!stream.lowest_layer().m_write_oper ||
2244                      !stream.lowest_layer().m_write_oper->in_use());
2245         stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2246         return stream.do_write_some_sync(data, size, ec);
2247     }
2248
2249     template<class H>
2250     static void async_read(S& stream, char* buffer, std::size_t size, bool is_read_some, H handler)
2251     {
2252         char* begin = buffer;
2253         char* end   = buffer + size;
2254         LendersReadOperPtr op =
2255             Service::alloc<ReadOper<H>>(stream.lowest_layer().m_read_oper, stream, is_read_some,
2256                                         begin, end, std::move(handler)); // Throws
2257         stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws
2258     }
2259
2260     template<class H>
2261     static void async_write(S& stream, const char* data, std::size_t size, bool is_write_some,
2262                             H handler)
2263     {
2264         const char* begin = data;
2265         const char* end   = data + size;
2266         LendersWriteOperPtr op =
2267             Service::alloc<WriteOper<H>>(stream.lowest_layer().m_write_oper, stream, is_write_some,
2268                                          begin, end, std::move(handler)); // Throws
2269         stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws
2270     }
2271
2272     template<class H>
2273     static void async_buffered_read(S& stream, char* buffer, std::size_t size, int delim,
2274                                     ReadAheadBuffer& rab, H handler)
2275     {
2276         char* begin = buffer;
2277         char* end   = buffer + size;
2278         LendersBufferedReadOperPtr op =
2279             Service::alloc<BufferedReadOper<H>>(stream.lowest_layer().m_read_oper, stream,
2280                                                 begin, end, delim, rab,
2281                                                 std::move(handler)); // Throws
2282         stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws
2283     }
2284 };
2285
2286 template<class S> class Service::BasicStreamOps<S>::StreamOper: public IoOper {
2287 public:
2288     StreamOper(std::size_t size, S& stream) noexcept:
2289         IoOper{size},
2290         m_stream{&stream}
2291     {
2292     }
2293     void recycle() noexcept override final
2294     {
2295         bool orphaned = !m_stream;
2296         REALM_ASSERT(orphaned);
2297         // Note: do_recycle() commits suicide.
2298         do_recycle(orphaned);
2299     }
2300     void orphan() noexcept override final
2301     {
2302         m_stream = nullptr;
2303     }
2304     Descriptor& descriptor() noexcept override final
2305     {
2306         return m_stream->lowest_layer().m_desc;
2307     }
2308 protected:
2309     S* m_stream;
2310     std::error_code m_error_code;
2311 };
2312
2313 template<class S> class Service::BasicStreamOps<S>::ReadOperBase: public StreamOper {
2314 public:
2315     ReadOperBase(std::size_t size, S& stream, bool is_read_some, char* begin, char* end) noexcept:
2316         StreamOper{size, stream},
2317         m_is_read_some{is_read_some},
2318         m_begin{begin},
2319         m_end{end}
2320     {
2321     }
2322     Want initiate()
2323     {
2324         auto& s = *this;
2325         REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
2326         REALM_ASSERT(!s.is_complete());
2327         REALM_ASSERT(s.m_curr <= s.m_end);
2328         Want want = Want::nothing;
2329         if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
2330             s.set_is_complete(true); // Success
2331         }
2332         else {
2333             s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
2334             s.m_stream->do_init_read_async(s.m_error_code, want);
2335             if (want == Want::nothing) {
2336                 if (REALM_UNLIKELY(s.m_error_code)) {
2337                     s.set_is_complete(true); // Failure
2338                 }
2339                 else {
2340                     want = advance();
2341                 }
2342             }
2343         }
2344         return want;
2345     }
2346     Want advance() noexcept override final
2347     {
2348         auto& s = *this;
2349         REALM_ASSERT(!s.is_complete());
2350         REALM_ASSERT(!s.is_canceled());
2351         REALM_ASSERT(!s.m_error_code);
2352         REALM_ASSERT(s.m_curr < s.m_end);
2353         REALM_ASSERT(!s.m_is_read_some || s.m_curr == m_begin);
2354         for (;;) {
2355             // Read into callers buffer
2356             char* buffer = s.m_curr;
2357             std::size_t size = std::size_t(s.m_end - s.m_curr);
2358             Want want = Want::nothing;
2359             std::size_t n = s.m_stream->do_read_some_async(buffer, size, s.m_error_code, want);
2360             REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
2361             bool got_nothing = (n == 0);
2362             if (got_nothing) {
2363                 if (REALM_UNLIKELY(s.m_error_code)) {
2364                     s.set_is_complete(true); // Failure
2365                     return Want::nothing;
2366                 }
2367                 // Got nothing, but want something
2368                 return want;
2369             }
2370             REALM_ASSERT(!s.m_error_code);
2371             // Check for completion
2372             REALM_ASSERT(n <= size);
2373             s.m_curr += n;
2374             if (s.m_is_read_some || s.m_curr == s.m_end) {
2375                 s.set_is_complete(true); // Success
2376                 return Want::nothing;
2377             }
2378             if (want != Want::nothing)
2379                 return want;
2380             REALM_ASSERT(n < size);
2381         }
2382     }
2383 protected:
2384     const bool m_is_read_some;
2385     char* const m_begin;    // May be dangling after cancellation
2386     char* const m_end;      // May be dangling after cancellation
2387     char* m_curr = m_begin; // May be dangling after cancellation
2388 };
2389
2390 template<class S> class Service::BasicStreamOps<S>::WriteOperBase: public StreamOper {
2391 public:
2392     WriteOperBase(std::size_t size, S& stream, bool is_write_some,
2393                   const char* begin, const char* end) noexcept:
2394         StreamOper{size, stream},
2395         m_is_write_some{is_write_some},
2396         m_begin{begin},
2397         m_end{end}
2398     {
2399     }
2400     Want initiate()
2401     {
2402         auto& s = *this;
2403         REALM_ASSERT(this == s.m_stream->lowest_layer().m_write_oper.get());
2404         REALM_ASSERT(!s.is_complete());
2405         REALM_ASSERT(s.m_curr <= s.m_end);
2406         Want want = Want::nothing;
2407         if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
2408             s.set_is_complete(true); // Success
2409         }
2410         else {
2411             s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
2412             s.m_stream->do_init_write_async(s.m_error_code, want);
2413             if (want == Want::nothing) {
2414                 if (REALM_UNLIKELY(s.m_error_code)) {
2415                     s.set_is_complete(true); // Failure
2416                 }
2417                 else {
2418                     want = advance();
2419                 }
2420             }
2421         }
2422         return want;
2423     }
2424     Want advance() noexcept override final
2425     {
2426         auto& s = *this;
2427         REALM_ASSERT(!s.is_complete());
2428         REALM_ASSERT(!s.is_canceled());
2429         REALM_ASSERT(!s.m_error_code);
2430         REALM_ASSERT(s.m_curr < s.m_end);
2431         REALM_ASSERT(!s.m_is_write_some || s.m_curr == s.m_begin);
2432         for (;;) {
2433             // Write from callers buffer
2434             const char* data = s.m_curr;
2435             std::size_t size = std::size_t(s.m_end - s.m_curr);
2436             Want want = Want::nothing;
2437             std::size_t n = s.m_stream->do_write_some_async(data, size, s.m_error_code, want);
2438             REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
2439             bool wrote_nothing = (n == 0);
2440             if (wrote_nothing) {
2441                 if (REALM_UNLIKELY(s.m_error_code)) {
2442                     s.set_is_complete(true); // Failure
2443                     return Want::nothing;
2444                 }
2445                 // Wrote nothing, but want something written
2446                 return want;
2447             }
2448             REALM_ASSERT(!s.m_error_code);
2449             // Check for completion
2450             REALM_ASSERT(n <= size);
2451             s.m_curr += n;
2452             if (s.m_is_write_some || s.m_curr == s.m_end) {
2453                 s.set_is_complete(true); // Success
2454                 return Want::nothing;
2455             }
2456             if (want != Want::nothing)
2457                 return want;
2458             REALM_ASSERT(n < size);
2459         }
2460     }
2461 protected:
2462     const bool m_is_write_some;
2463     const char* const m_begin;    // May be dangling after cancellation
2464     const char* const m_end;      // May be dangling after cancellation
2465     const char* m_curr = m_begin; // May be dangling after cancellation
2466 };
2467
2468 template<class S> class Service::BasicStreamOps<S>::BufferedReadOperBase: public StreamOper {
2469 public:
2470     BufferedReadOperBase(std::size_t size, S& stream, char* begin, char* end, int delim,
2471                          ReadAheadBuffer& rab) noexcept:
2472         StreamOper{size, stream},
2473         m_read_ahead_buffer{rab},
2474         m_begin{begin},
2475         m_end{end},
2476         m_delim{delim}
2477     {
2478     }
2479     Want initiate()
2480     {
2481         auto& s = *this;
2482         REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
2483         REALM_ASSERT(!s.is_complete());
2484         Want want = Want::nothing;
2485         bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
2486         if (complete) {
2487             s.set_is_complete(true); // Success or failure
2488         }
2489         else {
2490             s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
2491             s.m_stream->do_init_read_async(s.m_error_code, want);
2492             if (want == Want::nothing) {
2493                 if (REALM_UNLIKELY(s.m_error_code)) {
2494                     s.set_is_complete(true); // Failure
2495                 }
2496                 else {
2497                     want = advance();
2498                 }
2499             }
2500         }
2501         return want;
2502     }
2503     Want advance() noexcept override final
2504     {
2505         auto& s = *this;
2506         REALM_ASSERT(!s.is_complete());
2507         REALM_ASSERT(!s.is_canceled());
2508         REALM_ASSERT(!s.m_error_code);
2509         REALM_ASSERT(s.m_read_ahead_buffer.empty());
2510         REALM_ASSERT(s.m_curr < s.m_end);
2511         for (;;) {
2512             // Fill read-ahead buffer from stream (is empty now)
2513             Want want = Want::nothing;
2514             bool nonempty = s.m_read_ahead_buffer.refill_async(*s.m_stream, s.m_error_code, want);
2515             REALM_ASSERT(nonempty || s.m_error_code ||
2516                          want != Want::nothing); // No busy loop, please
2517             bool got_nothing = !nonempty;
2518             if (got_nothing) {
2519                 if (REALM_UNLIKELY(s.m_error_code)) {
2520                     s.set_is_complete(true); // Failure
2521                     return Want::nothing;
2522                 }
2523                 // Got nothing, but want something
2524                 return want;
2525             }
2526             // Transfer buffered data to callers buffer
2527             bool complete =
2528                 s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
2529             if (complete) {
2530                 s.set_is_complete(true); // Success or failure (delim_not_found)
2531                 return Want::nothing;
2532             }
2533             if (want != Want::nothing)
2534                 return want;
2535         }
2536     }
2537 protected:
2538     ReadAheadBuffer& m_read_ahead_buffer; // May be dangling after cancellation
2539     char* const m_begin;                  // May be dangling after cancellation
2540     char* const m_end;                    // May be dangling after cancellation
2541     char* m_curr = m_begin;               // May be dangling after cancellation
2542     const int m_delim;
2543 };
2544
2545 template<class S> template<class H>
2546 class Service::BasicStreamOps<S>::ReadOper: public ReadOperBase {
2547 public:
2548     ReadOper(std::size_t size, S& stream, bool is_read_some, char* begin, char* end, H handler):
2549         ReadOperBase{size, stream, is_read_some, begin, end},
2550         m_handler{std::move(handler)}
2551     {
2552     }
2553     void recycle_and_execute() override final
2554     {
2555         auto& s = *this;
2556         REALM_ASSERT(s.is_complete() || s.is_canceled());
2557         REALM_ASSERT(s.is_complete() == (s.m_error_code || s.m_curr == s.m_end ||
2558                                          (s.m_is_read_some && s.m_curr != s.m_begin)));
2559         REALM_ASSERT(s.m_curr >= s.m_begin);
2560         bool orphaned = !s.m_stream;
2561         std::error_code ec = s.m_error_code;
2562         if (s.is_canceled())
2563             ec = error::operation_aborted;
2564         std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
2565         // Note: do_recycle_and_execute() commits suicide.
2566         s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
2567                                              num_bytes_transferred); // Throws
2568     }
2569 private:
2570     H m_handler;
2571 };
2572
2573 template<class S> template<class H>
2574 class Service::BasicStreamOps<S>::WriteOper: public WriteOperBase {
2575 public:
2576     WriteOper(std::size_t size, S& stream, bool is_write_some,
2577               const char* begin, const char* end, H handler):
2578         WriteOperBase{size, stream, is_write_some, begin, end},
2579         m_handler{std::move(handler)}
2580     {
2581     }
2582     void recycle_and_execute() override final
2583     {
2584         auto& s = *this;
2585         REALM_ASSERT(s.is_complete() || s.is_canceled());
2586         REALM_ASSERT(s.is_complete() == (s.m_error_code || s.m_curr == s.m_end ||
2587                                          (s.m_is_write_some && s.m_curr != s.m_begin)));
2588         REALM_ASSERT(s.m_curr >= s.m_begin);
2589         bool orphaned = !s.m_stream;
2590         std::error_code ec = s.m_error_code;
2591         if (s.is_canceled())
2592             ec = error::operation_aborted;
2593         std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
2594         // Note: do_recycle_and_execute() commits suicide.
2595         s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
2596                                              num_bytes_transferred); // Throws
2597     }
2598 private:
2599     H m_handler;
2600 };
2601
2602 template<class S> template<class H>
2603 class Service::BasicStreamOps<S>::BufferedReadOper: public BufferedReadOperBase {
2604 public:
2605     BufferedReadOper(std::size_t size, S& stream, char* begin, char* end, int delim,
2606                      ReadAheadBuffer& rab, H handler):
2607         BufferedReadOperBase{size, stream, begin, end, delim, rab},
2608         m_handler{std::move(handler)}
2609     {
2610     }
2611     void recycle_and_execute() override final
2612     {
2613         auto& s = *this;
2614         REALM_ASSERT(s.is_complete() || (s.is_canceled() && !s.m_error_code));
2615         REALM_ASSERT(s.is_canceled() || s.m_error_code ||
2616                      (s.m_delim != std::char_traits<char>::eof() ?
2617                       s.m_curr > s.m_begin && s.m_curr[-1] ==
2618                       std::char_traits<char>::to_char_type(s.m_delim) :
2619                       s.m_curr == s.m_end));
2620         REALM_ASSERT(s.m_curr >= s.m_begin);
2621         bool orphaned = !s.m_stream;
2622         std::error_code ec = s.m_error_code;
2623         if (s.is_canceled())
2624             ec = error::operation_aborted;
2625         std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
2626         // Note: do_recycle_and_execute() commits suicide.
2627         s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
2628                                              num_bytes_transferred); // Throws
2629     }
2630 private:
2631     H m_handler;
2632 };
2633
2634 template<class H> inline void Service::post(H handler)
2635 {
2636     do_post(&Service::post_oper_constr<H>, sizeof (PostOper<H>), &handler);
2637 }
2638
2639 inline void Service::OwnersOperDeleter::operator()(AsyncOper* op) const noexcept
2640 {
2641     if (op->in_use()) {
2642         op->orphan();
2643     }
2644     else {
2645         void* addr = op;
2646         op->~AsyncOper();
2647         delete[] static_cast<char*>(addr);
2648     }
2649 }
2650
2651 inline void Service::LendersOperDeleter::operator()(AsyncOper* op) const noexcept
2652 {
2653     op->recycle(); // Suicide
2654 }
2655
2656 template<class Oper, class... Args> std::unique_ptr<Oper, Service::LendersOperDeleter>
2657 Service::alloc(OwnersOperPtr& owners_ptr, Args&&... args)
2658 {
2659     void* addr = owners_ptr.get();
2660     std::size_t size;
2661     if (REALM_LIKELY(addr)) {
2662         REALM_ASSERT(!owners_ptr->in_use());
2663         size = owners_ptr->m_size;
2664         // We can use static dispatch in the destructor call here, since an
2665         // object, that is not in use, is always an instance of UnusedOper.
2666         REALM_ASSERT(dynamic_cast<UnusedOper*>(owners_ptr.get()));
2667         static_cast<UnusedOper*>(owners_ptr.get())->UnusedOper::~UnusedOper();
2668         if (REALM_UNLIKELY(size < sizeof (Oper))) {
2669             owners_ptr.release();
2670             delete[] static_cast<char*>(addr);
2671             goto no_object;
2672         }
2673     }
2674     else {
2675       no_object:
2676         addr = new char[sizeof (Oper)]; // Throws
2677         size = sizeof (Oper);
2678         owners_ptr.reset(static_cast<AsyncOper*>(addr));
2679     }
2680     std::unique_ptr<Oper, LendersOperDeleter> lenders_ptr;
2681     try {
2682         lenders_ptr.reset(new (addr) Oper(size, std::forward<Args>(args)...)); // Throws
2683     }
2684     catch (...) {
2685         new (addr) UnusedOper(size); // Does not throw
2686         throw;
2687     }
2688     return lenders_ptr;
2689 }
2690
2691 template<class Oper>
2692 inline void Service::execute(std::unique_ptr<Oper, LendersOperDeleter>& lenders_ptr)
2693 {
2694     lenders_ptr.release()->recycle_and_execute(); // Throws
2695 }
2696
2697 template<class H> inline Service::PostOperBase*
2698 Service::post_oper_constr(void* addr, std::size_t size, Impl& service, void* cookie)
2699 {
2700     H& handler = *static_cast<H*>(cookie);
2701     return new (addr) PostOper<H>(size, service, std::move(handler)); // Throws
2702 }
2703
2704 inline bool Service::AsyncOper::in_use() const noexcept
2705 {
2706     return m_in_use;
2707 }
2708
2709 inline bool Service::AsyncOper::is_complete() const noexcept
2710 {
2711     return m_complete;
2712 }
2713
2714 inline void Service::AsyncOper::cancel() noexcept
2715 {
2716     REALM_ASSERT(m_in_use);
2717     REALM_ASSERT(!m_canceled);
2718     m_canceled = true;
2719 }
2720
2721 inline Service::AsyncOper::AsyncOper(std::size_t size, bool is_in_use) noexcept:
2722     m_size{size},
2723     m_in_use{is_in_use}
2724 {
2725 }
2726
2727 inline bool Service::AsyncOper::is_canceled() const noexcept
2728 {
2729     return m_canceled;
2730 }
2731
2732 inline void Service::AsyncOper::set_is_complete(bool value) noexcept
2733 {
2734     REALM_ASSERT(!m_complete);
2735     REALM_ASSERT(!value || m_in_use);
2736     m_complete = value;
2737 }
2738
2739 template<class H, class... Args>
2740 inline void Service::AsyncOper::do_recycle_and_execute(bool orphaned, H& handler, Args&&... args)
2741 {
2742     // Recycle the operation object before the handler is exceuted, such that
2743     // the memory is available for a new post operation that might be initiated
2744     // during the execution of the handler.
2745     bool was_recycled = false;
2746     try {
2747         // We need to copy or move all arguments to be passed to the handler,
2748         // such that there is no risk of references to the recycled operation
2749         // object being passed to the handler (the passed arguments may be
2750         // references to members of the recycled operation object). The easiest
2751         // way to achive this, is by forwarding the reference arguments (passed
2752         // to this function) to a helper function whose arguments have
2753         // nonreference type (`Args...` rather than `Args&&...`).
2754         //
2755         // Note that the copying and moving of arguments may throw, and it is
2756         // important that the operation is still recycled even if that
2757         // happens. For that reason, copying and moving of arguments must not
2758         // happen until we are in a scope (this scope) that catches and deals
2759         // correctly with such exceptions.
2760         do_recycle_and_execute_helper(orphaned, was_recycled, std::move(handler),
2761                                       std::forward<Args>(args)...); // Throws
2762     }
2763     catch (...) {
2764         if (!was_recycled)
2765             do_recycle(orphaned);
2766         throw;
2767     }
2768 }
2769
2770 template<class H, class... Args>
2771 inline void Service::AsyncOper::do_recycle_and_execute_helper(bool orphaned, bool& was_recycled,
2772                                                               H handler, Args... args)
2773 {
2774     do_recycle(orphaned);
2775     was_recycled = true;
2776     handler(std::move(args)...); // Throws
2777 }
2778
2779 inline void Service::AsyncOper::do_recycle(bool orphaned) noexcept
2780 {
2781     REALM_ASSERT(in_use());
2782     void* addr = this;
2783     std::size_t size = m_size;
2784     this->~AsyncOper(); // Suicide
2785     if (orphaned) {
2786         delete[] static_cast<char*>(addr);
2787     }
2788     else {
2789         new (addr) UnusedOper(size);
2790     }
2791 }
2792
2793 // ---------------- Resolver ----------------
2794
2795 class Resolver::ResolveOperBase: public Service::AsyncOper {
2796 public:
2797     ResolveOperBase(std::size_t size, Resolver& r, Query q) noexcept:
2798         AsyncOper{size, true},
2799         m_resolver{&r},
2800         m_query{std::move(q)}
2801     {
2802     }
2803     void perform()
2804     {
2805         // FIXME: Temporary hack until we get a true asynchronous resolver
2806         m_endpoints = m_resolver->resolve(std::move(m_query), m_error_code); // Throws
2807         set_is_complete(true);
2808     }
2809     void recycle() noexcept override final
2810     {
2811         bool orphaned = !m_resolver;
2812         REALM_ASSERT(orphaned);
2813         // Note: do_recycle() commits suicide.
2814         do_recycle(orphaned);
2815     }
2816     void orphan() noexcept override final
2817     {
2818         m_resolver = nullptr;
2819     }
2820 protected:
2821     Resolver* m_resolver;
2822     Query m_query;
2823     Endpoint::List m_endpoints;
2824     std::error_code m_error_code;
2825 };
2826
2827 template<class H> class Resolver::ResolveOper: public ResolveOperBase {
2828 public:
2829     ResolveOper(std::size_t size, Resolver& r, Query q, H handler):
2830         ResolveOperBase{size, r, std::move(q)},
2831         m_handler{std::move(handler)}
2832     {
2833     }
2834     void recycle_and_execute() override final
2835     {
2836         REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
2837         REALM_ASSERT(is_canceled() || m_error_code || !m_endpoints.empty());
2838         bool orphaned = !m_resolver;
2839         std::error_code ec = m_error_code;
2840         if (is_canceled())
2841             ec = error::operation_aborted;
2842         // Note: do_recycle_and_execute() commits suicide.
2843         do_recycle_and_execute<H>(orphaned, m_handler, ec, std::move(m_endpoints)); // Throws
2844     }
2845 private:
2846     H m_handler;
2847 };
2848
2849 inline Resolver::Resolver(Service& service):
2850     m_service_impl{*service.m_impl}
2851 {
2852 }
2853
2854 inline Resolver::~Resolver() noexcept
2855 {
2856     cancel();
2857 }
2858
2859 inline Endpoint::List Resolver::resolve(const Query& q)
2860 {
2861     std::error_code ec;
2862     Endpoint::List list = resolve(q, ec);
2863     if (REALM_UNLIKELY(ec))
2864         throw std::system_error(ec);
2865     return list;
2866 }
2867
2868 template<class H> void Resolver::async_resolve(Query query, H handler)
2869 {
2870     LendersResolveOperPtr op = Service::alloc<ResolveOper<H>>(m_resolve_oper, *this,
2871                                                               std::move(query),
2872                                                               std::move(handler)); // Throws
2873     initiate_oper(std::move(op)); // Throws
2874 }
2875
2876 inline Resolver::Query::Query(std::string service_port, int init_flags):
2877     m_flags{init_flags},
2878     m_service{service_port}
2879 {
2880 }
2881
2882 inline Resolver::Query::Query(const StreamProtocol& prot, std::string service_port,
2883                               int init_flags):
2884     m_flags{init_flags},
2885     m_protocol{prot},
2886     m_service{service_port}
2887 {
2888 }
2889
2890 inline Resolver::Query::Query(std::string host_name, std::string service_port, int init_flags):
2891     m_flags{init_flags},
2892     m_host{host_name},
2893     m_service{service_port}
2894 {
2895 }
2896
2897 inline Resolver::Query::Query(const StreamProtocol& prot, std::string host_name,
2898                               std::string service_port, int init_flags):
2899     m_flags{init_flags},
2900     m_protocol{prot},
2901     m_host{host_name},
2902     m_service{service_port}
2903 {
2904 }
2905
2906 inline Resolver::Query::~Query() noexcept
2907 {
2908 }
2909
2910 inline int Resolver::Query::flags() const
2911 {
2912     return m_flags;
2913 }
2914
2915 inline StreamProtocol Resolver::Query::protocol() const
2916 {
2917     return m_protocol;
2918 }
2919
2920 inline std::string Resolver::Query::host() const
2921 {
2922     return m_host;
2923 }
2924
2925 inline std::string Resolver::Query::service() const
2926 {
2927     return m_service;
2928 }
2929
2930 // ---------------- SocketBase ----------------
2931
2932 inline SocketBase::SocketBase(Service& service):
2933     m_desc{*service.m_impl}
2934 {
2935 }
2936
2937 inline SocketBase::~SocketBase() noexcept
2938 {
2939     close();
2940 }
2941
2942 inline bool SocketBase::is_open() const noexcept
2943 {
2944     return m_desc.is_open();
2945 }
2946
2947 inline auto SocketBase::native_handle() const noexcept -> native_handle_type
2948 {
2949     return m_desc.native_handle();
2950 }
2951
2952 inline void SocketBase::open(const StreamProtocol& prot)
2953 {
2954     std::error_code ec;
2955     if (open(prot, ec))
2956         throw std::system_error(ec);
2957 }
2958
2959 inline void SocketBase::close() noexcept
2960 {
2961     if (!is_open())
2962         return;
2963     cancel();
2964     m_desc.close();
2965 }
2966
2967 template<class O>
2968 inline void SocketBase::get_option(O& opt) const
2969 {
2970     std::error_code ec;
2971     if (get_option(opt, ec))
2972         throw std::system_error(ec);
2973 }
2974
2975 template<class O>
2976 inline std::error_code SocketBase::get_option(O& opt, std::error_code& ec) const
2977 {
2978     opt.get(*this, ec);
2979     return ec;
2980 }
2981
2982 template<class O>
2983 inline void SocketBase::set_option(const O& opt)
2984 {
2985     std::error_code ec;
2986     if (set_option(opt, ec))
2987         throw std::system_error(ec);
2988 }
2989
2990 template<class O>
2991 inline std::error_code SocketBase::set_option(const O& opt, std::error_code& ec)
2992 {
2993     opt.set(*this, ec);
2994     return ec;
2995 }
2996
2997 inline void SocketBase::bind(const Endpoint& ep)
2998 {
2999     std::error_code ec;
3000     if (bind(ep, ec))
3001         throw std::system_error(ec);
3002 }
3003
3004 inline Endpoint SocketBase::local_endpoint() const
3005 {
3006     std::error_code ec;
3007     Endpoint ep = local_endpoint(ec);
3008     if (ec)
3009         throw std::system_error(ec);
3010     return ep;
3011 }
3012
3013 inline const StreamProtocol& SocketBase::get_protocol() const noexcept
3014 {
3015     return m_protocol;
3016 }
3017
3018 template<class T, int opt, class U>
3019 inline SocketBase::Option<T, opt, U>::Option(T init_value):
3020     m_value{init_value}
3021 {
3022 }
3023
3024 template<class T, int opt, class U>
3025 inline T SocketBase::Option<T, opt, U>::value() const
3026 {
3027     return m_value;
3028 }
3029
3030 template<class T, int opt, class U>
3031 inline void SocketBase::Option<T, opt, U>::get(const SocketBase& sock, std::error_code& ec)
3032 {
3033     union {
3034         U value;
3035         char strut[sizeof (U) + 1];
3036     };
3037     std::size_t value_size = sizeof strut;
3038     sock.get_option(opt_enum(opt), &value, value_size, ec);
3039     if (!ec) {
3040         REALM_ASSERT(value_size == sizeof value);
3041         m_value = T(value);
3042     }
3043 }
3044
3045 template<class T, int opt, class U>
3046 inline void SocketBase::Option<T, opt, U>::set(SocketBase& sock, std::error_code& ec) const
3047 {
3048     U value_to_set = U(m_value);
3049     sock.set_option(opt_enum(opt), &value_to_set, sizeof value_to_set, ec);
3050 }
3051
3052 // ---------------- Socket ----------------
3053
3054 class Socket::ConnectOperBase: public Service::IoOper {
3055 public:
3056     ConnectOperBase(std::size_t size, Socket& sock) noexcept:
3057         IoOper{size},
3058         m_socket{&sock}
3059     {
3060     }
3061     Want initiate(const Endpoint& ep)
3062     {
3063         REALM_ASSERT(this == m_socket->m_write_oper.get());
3064         if (m_socket->initiate_async_connect(ep, m_error_code)) { // Throws
3065             set_is_complete(true); // Failure, or immediate completion
3066             return Want::nothing;
3067         }
3068         return Want::write;
3069     }
3070     Want advance() noexcept override final
3071     {
3072         REALM_ASSERT(!is_complete());
3073         REALM_ASSERT(!is_canceled());
3074         REALM_ASSERT(!m_error_code);
3075         m_socket->finalize_async_connect(m_error_code);
3076         set_is_complete(true);
3077         return Want::nothing;
3078     }
3079     void recycle() noexcept override final
3080     {
3081         bool orphaned = !m_socket;
3082         REALM_ASSERT(orphaned);
3083         // Note: do_recycle() commits suicide.
3084         do_recycle(orphaned);
3085     }
3086     void orphan() noexcept override final
3087     {
3088         m_socket = nullptr;
3089     }
3090     Service::Descriptor& descriptor() noexcept override final
3091     {
3092         return m_socket->m_desc;
3093     }
3094 protected:
3095     Socket* m_socket;
3096     std::error_code m_error_code;
3097 };
3098
3099 template<class H> class Socket::ConnectOper: public ConnectOperBase {
3100 public:
3101     ConnectOper(std::size_t size, Socket& sock, H handler):
3102         ConnectOperBase{size, sock},
3103         m_handler{std::move(handler)}
3104     {
3105     }
3106     void recycle_and_execute() override final
3107     {
3108         REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3109         bool orphaned = !m_socket;
3110         std::error_code ec = m_error_code;
3111         if (is_canceled())
3112             ec = error::operation_aborted;
3113         // Note: do_recycle_and_execute() commits suicide.
3114         do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3115     }
3116 private:
3117     H m_handler;
3118 };
3119
3120 inline Socket::Socket(Service& service):
3121     SocketBase{service}
3122 {
3123 }
3124
3125 inline Socket::Socket(Service& service, const StreamProtocol& prot,
3126                       native_handle_type native_socket):
3127     SocketBase{service}
3128 {
3129     assign(prot, native_socket); // Throws
3130 }
3131
3132 inline Socket::~Socket() noexcept
3133 {
3134 }
3135
3136 inline void Socket::connect(const Endpoint& ep)
3137 {
3138     std::error_code ec;
3139     if (connect(ep, ec)) // Throws
3140         throw std::system_error(ec);
3141 }
3142
3143 inline std::size_t Socket::read(char* buffer, std::size_t size)
3144 {
3145     std::error_code ec;
3146     read(buffer, size, ec); // Throws
3147     if (ec)
3148         throw std::system_error(ec);
3149     return size;
3150 }
3151
3152 inline std::size_t Socket::read(char* buffer, std::size_t size, std::error_code& ec)
3153 {
3154     return StreamOps::read(*this, buffer, size, ec); // Throws
3155 }
3156
3157 inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab)
3158 {
3159     std::error_code ec;
3160     read(buffer, size, rab, ec); // Throws
3161     if (ec)
3162         throw std::system_error(ec);
3163     return size;
3164 }
3165
3166 inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab,
3167                                 std::error_code& ec)
3168 {
3169     int delim = std::char_traits<char>::eof();
3170     return StreamOps::buffered_read(*this, buffer, size, delim, rab, ec); // Throws
3171 }
3172
3173 inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim,
3174                                       ReadAheadBuffer& rab)
3175 {
3176     std::error_code ec;
3177     std::size_t n = read_until(buffer, size, delim, rab, ec); // Throws
3178     if (ec)
3179         throw std::system_error(ec);
3180     return n;
3181 }
3182
3183 inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim,
3184                                       ReadAheadBuffer& rab, std::error_code& ec)
3185 {
3186     int delim_2 = std::char_traits<char>::to_int_type(delim);
3187     return StreamOps::buffered_read(*this, buffer, size, delim_2, rab, ec); // Throws
3188 }
3189
3190 inline std::size_t Socket::write(const char* data, std::size_t size)
3191 {
3192     std::error_code ec;
3193     write(data, size, ec); // Throws
3194     if (ec)
3195         throw std::system_error(ec);
3196     return size;
3197 }
3198
3199 inline std::size_t Socket::write(const char* data, std::size_t size, std::error_code& ec)
3200 {
3201     return StreamOps::write(*this, data, size, ec); // Throws
3202 }
3203
3204 inline std::size_t Socket::read_some(char* buffer, std::size_t size)
3205 {
3206     std::error_code ec;
3207     std::size_t n = read_some(buffer, size, ec); // Throws
3208     if (ec)
3209         throw std::system_error(ec);
3210     return n;
3211 }
3212
3213 inline std::size_t Socket::read_some(char* buffer, std::size_t size, std::error_code& ec)
3214 {
3215     return StreamOps::read_some(*this, buffer, size, ec); // Throws
3216 }
3217
3218 inline std::size_t Socket::write_some(const char* data, std::size_t size)
3219 {
3220     std::error_code ec;
3221     std::size_t n = write_some(data, size, ec); // Throws
3222     if (ec)
3223         throw std::system_error(ec);
3224     return n;
3225 }
3226
3227 inline std::size_t Socket::write_some(const char* data, std::size_t size, std::error_code& ec)
3228 {
3229     return StreamOps::write_some(*this, data, size, ec); // Throws
3230 }
3231
3232 template<class H> inline void Socket::async_connect(const Endpoint& ep, H handler)
3233 {
3234     LendersConnectOperPtr op =
3235         Service::alloc<ConnectOper<H>>(m_write_oper, *this, std::move(handler)); // Throws
3236     m_desc.initiate_oper(std::move(op), ep); // Throws
3237 }
3238
3239 template<class H> inline void Socket::async_read(char* buffer, std::size_t size, H handler)
3240 {
3241     bool is_read_some = false;
3242     StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
3243 }
3244
3245 template<class H>
3246 inline void Socket::async_read(char* buffer, std::size_t size, ReadAheadBuffer& rab, H handler)
3247 {
3248     int delim = std::char_traits<char>::eof();
3249     StreamOps::async_buffered_read(*this, buffer, size, delim, rab, std::move(handler)); // Throws
3250 }
3251
3252 template<class H>
3253 inline void Socket::async_read_until(char* buffer, std::size_t size, char delim,
3254                                      ReadAheadBuffer& rab, H handler)
3255 {
3256     int delim_2 = std::char_traits<char>::to_int_type(delim);
3257     StreamOps::async_buffered_read(*this, buffer, size, delim_2, rab, std::move(handler)); // Throws
3258 }
3259
3260 template<class H> inline void Socket::async_write(const char* data, std::size_t size, H handler)
3261 {
3262     bool is_write_some = false;
3263     StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
3264 }
3265
3266 template<class H> inline void Socket::async_read_some(char* buffer, std::size_t size, H handler)
3267 {
3268     bool is_read_some = true;
3269     StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
3270 }
3271
3272 template<class H>
3273 inline void Socket::async_write_some(const char* data, std::size_t size, H handler)
3274 {
3275     bool is_write_some = true;
3276     StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
3277 }
3278
3279 inline void Socket::shutdown(shutdown_type what)
3280 {
3281     std::error_code ec;
3282     if (shutdown(what, ec)) // Throws
3283         throw std::system_error(ec);
3284 }
3285
3286 inline void Socket::assign(const StreamProtocol& prot, native_handle_type native_socket)
3287 {
3288     std::error_code ec;
3289     if (assign(prot, native_socket, ec)) // Throws
3290         throw std::system_error(ec);
3291 }
3292
3293 inline std::error_code Socket::assign(const StreamProtocol& prot,
3294                                       native_handle_type native_socket, std::error_code& ec)
3295 {
3296     return do_assign(prot, native_socket, ec); // Throws
3297 }
3298
3299 inline Socket& Socket::lowest_layer() noexcept
3300 {
3301     return *this;
3302 }
3303
3304 inline void Socket::do_init_read_async(std::error_code&, Want& want) noexcept
3305 {
3306     want = Want::read; // Wait for read readiness before proceeding
3307 }
3308
3309 inline void Socket::do_init_write_async(std::error_code&, Want& want) noexcept
3310 {
3311     want = Want::write; // Wait for write readiness before proceeding
3312 }
3313
3314 inline std::size_t Socket::do_read_some_sync(char* buffer, std::size_t size,
3315                                              std::error_code& ec) noexcept
3316 {
3317     return m_desc.read_some(buffer, size, ec);
3318 }
3319
3320 inline std::size_t Socket::do_write_some_sync(const char* data, std::size_t size,
3321                                               std::error_code& ec) noexcept
3322 {
3323     return m_desc.write_some(data, size, ec);
3324 }
3325
3326 inline std::size_t Socket::do_read_some_async(char* buffer, std::size_t size,
3327                                               std::error_code& ec, Want& want) noexcept
3328 {
3329     std::error_code ec_2;
3330     std::size_t n = m_desc.read_some(buffer, size, ec_2);
3331     bool success = (!ec_2 || ec_2 == error::resource_unavailable_try_again);
3332     if (REALM_UNLIKELY(!success)) {
3333         ec = ec_2;
3334         want = Want::nothing; // Failure
3335         return 0;
3336     }
3337     ec = std::error_code();
3338     want = Want::read; // Success
3339     return n;
3340 }
3341
3342 inline std::size_t Socket::do_write_some_async(const char* data, std::size_t size,
3343                                                std::error_code& ec, Want& want) noexcept
3344 {
3345     std::error_code ec_2;
3346     std::size_t n = m_desc.write_some(data, size, ec_2);
3347     bool success = (!ec_2 || ec_2 == error::resource_unavailable_try_again);
3348     if (REALM_UNLIKELY(!success)) {
3349         ec = ec_2;
3350         want = Want::nothing; // Failure
3351         return 0;
3352     }
3353     ec = std::error_code();
3354     want = Want::write; // Success
3355     return n;
3356 }
3357
3358 // ---------------- Acceptor ----------------
3359
3360 class Acceptor::AcceptOperBase: public Service::IoOper {
3361 public:
3362     AcceptOperBase(std::size_t size, Acceptor& a, Socket& s, Endpoint* e):
3363         IoOper{size},
3364         m_acceptor{&a},
3365         m_socket{s},
3366         m_endpoint{e}
3367     {
3368     }
3369     Want initiate()
3370     {
3371         REALM_ASSERT(this == m_acceptor->m_read_oper.get());
3372         REALM_ASSERT(!is_complete());
3373         m_acceptor->m_desc.ensure_nonblocking_mode(); // Throws
3374         return Want::read;
3375     }
3376     Want advance() noexcept override final
3377     {
3378         REALM_ASSERT(!is_complete());
3379         REALM_ASSERT(!is_canceled());
3380         REALM_ASSERT(!m_error_code);
3381         REALM_ASSERT(!m_socket.is_open());
3382         Want want = m_acceptor->do_accept_async(m_socket, m_endpoint, m_error_code);
3383         if (want == Want::nothing)
3384             set_is_complete(true); // Success or failure
3385         return want;
3386     }
3387     void recycle() noexcept override final
3388     {
3389         bool orphaned = !m_acceptor;
3390         REALM_ASSERT(orphaned);
3391         // Note: do_recycle() commits suicide.
3392         do_recycle(orphaned);
3393     }
3394     void orphan() noexcept override final
3395     {
3396         m_acceptor = nullptr;
3397     }
3398     Service::Descriptor& descriptor() noexcept override final
3399     {
3400         return m_acceptor->m_desc;
3401     }
3402 protected:
3403     Acceptor* m_acceptor;
3404     Socket& m_socket;           // May be dangling after cancellation
3405     Endpoint* const m_endpoint; // May be dangling after cancellation
3406     std::error_code m_error_code;
3407 };
3408
3409 template<class H> class Acceptor::AcceptOper: public AcceptOperBase {
3410 public:
3411     AcceptOper(std::size_t size, Acceptor& a, Socket& s, Endpoint* e, H handler):
3412         AcceptOperBase{size, a, s, e},
3413         m_handler{std::move(handler)}
3414     {
3415     }
3416     void recycle_and_execute() override final
3417     {
3418         REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3419         REALM_ASSERT(is_canceled() || m_error_code || m_socket.is_open());
3420         bool orphaned = !m_acceptor;
3421         std::error_code ec = m_error_code;
3422         if (is_canceled())
3423             ec = error::operation_aborted;
3424         // Note: do_recycle_and_execute() commits suicide.
3425         do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3426     }
3427 private:
3428     H m_handler;
3429 };
3430
3431 inline Acceptor::Acceptor(Service& service):
3432     SocketBase{service}
3433 {
3434 }
3435
3436 inline Acceptor::~Acceptor() noexcept
3437 {
3438 }
3439
3440 inline void Acceptor::listen(int backlog)
3441 {
3442     std::error_code ec;
3443     if (listen(backlog, ec)) // Throws
3444         throw std::system_error(ec);
3445 }
3446
3447 inline void Acceptor::accept(Socket& sock)
3448 {
3449     std::error_code ec;
3450     if (accept(sock, ec)) // Throws
3451         throw std::system_error(ec);
3452 }
3453
3454 inline void Acceptor::accept(Socket& sock, Endpoint& ep)
3455 {
3456     std::error_code ec;
3457     if (accept(sock, ep, ec)) // Throws
3458         throw std::system_error(ec);
3459 }
3460
3461 inline std::error_code Acceptor::accept(Socket& sock, std::error_code& ec)
3462 {
3463     Endpoint* ep = nullptr;
3464     return accept(sock, ep, ec); // Throws
3465 }
3466
3467 inline std::error_code Acceptor::accept(Socket& sock, Endpoint& ep, std::error_code& ec)
3468 {
3469     return accept(sock, &ep, ec); // Throws
3470 }
3471
3472 template<class H> inline void Acceptor::async_accept(Socket& sock, H handler)
3473 {
3474     Endpoint* ep = nullptr;
3475     async_accept(sock, ep, std::move(handler)); // Throws
3476 }
3477
3478 template<class H> inline void Acceptor::async_accept(Socket& sock, Endpoint& ep, H handler)
3479 {
3480     async_accept(sock, &ep, std::move(handler)); // Throws
3481 }
3482
3483 inline std::error_code Acceptor::accept(Socket& socket, Endpoint* ep, std::error_code& ec)
3484 {
3485     REALM_ASSERT(!m_read_oper || !m_read_oper->in_use());
3486     if (REALM_UNLIKELY(socket.is_open()))
3487         throw std::runtime_error("Socket is already open");
3488     m_desc.ensure_blocking_mode(); // Throws
3489     m_desc.accept(socket.m_desc, m_protocol, ep, ec);
3490     return ec;
3491 }
3492
3493 inline Acceptor::Want Acceptor::do_accept_async(Socket& socket, Endpoint* ep,
3494                                                 std::error_code& ec) noexcept
3495 {
3496     std::error_code ec_2;
3497     m_desc.accept(socket.m_desc, m_protocol, ep, ec_2);
3498     if (ec_2 == error::resource_unavailable_try_again)
3499         return Want::read;
3500     ec = ec_2;
3501     return Want::nothing;
3502 }
3503
3504 template<class H> inline void Acceptor::async_accept(Socket& sock, Endpoint* ep, H handler)
3505 {
3506     if (REALM_UNLIKELY(sock.is_open()))
3507         throw std::runtime_error("Socket is already open");
3508     LendersAcceptOperPtr op = Service::alloc<AcceptOper<H>>(m_read_oper, *this, sock, ep,
3509                                                             std::move(handler)); // Throws
3510     m_desc.initiate_oper(std::move(op)); // Throws
3511 }
3512
3513 // ---------------- DeadlineTimer ----------------
3514
3515 template<class H>
3516 class DeadlineTimer::WaitOper: public Service::WaitOperBase {
3517 public:
3518     WaitOper(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time, H handler):
3519         Service::WaitOperBase{size, timer, expiration_time},
3520         m_handler{std::move(handler)}
3521     {
3522     }
3523     void recycle_and_execute() override final
3524     {
3525         bool orphaned = !m_timer;
3526         std::error_code ec;
3527         if (is_canceled())
3528             ec = error::operation_aborted;
3529         // Note: do_recycle_and_execute() commits suicide.
3530         do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3531     }
3532 private:
3533     H m_handler;
3534 };
3535
3536 inline DeadlineTimer::DeadlineTimer(Service& service):
3537     m_service_impl{*service.m_impl}
3538 {
3539 }
3540
3541 inline DeadlineTimer::~DeadlineTimer() noexcept
3542 {
3543     cancel();
3544 }
3545
3546 template<class R, class P, class H>
3547 inline void DeadlineTimer::async_wait(std::chrono::duration<R,P> delay, H handler)
3548 {
3549     clock::time_point now = clock::now();
3550     // FIXME: This method of detecting overflow does not work. Comparison
3551     // between distinct duration types is not overflow safe. Overflow easily
3552     // happens in the implied conversion of arguments to the common duration
3553     // type (std::common_type<>).
3554     auto max_add = clock::time_point::max() - now;
3555     if (delay > max_add)
3556         throw std::runtime_error("Expiration time overflow");
3557     clock::time_point expiration_time = now + delay;
3558     Service::LendersWaitOperPtr op =
3559         Service::alloc<WaitOper<H>>(m_wait_oper, *this, expiration_time,
3560                                     std::move(handler)); // Throws
3561     add_oper(std::move(op)); // Throws
3562 }
3563
3564 // ---------------- Trigger ----------------
3565
3566 template<class H>
3567 class Trigger::ExecOper: public Service::TriggerExecOperBase {
3568 public:
3569     ExecOper(Service::Impl& service_impl, H handler):
3570         Service::TriggerExecOperBase{service_impl},
3571         m_handler{std::move(handler)}
3572     {
3573     }
3574     void recycle_and_execute() override final
3575     {
3576         REALM_ASSERT(in_use());
3577         // Note: Potential suicide when `self` goes out of scope
3578         util::bind_ptr<TriggerExecOperBase> self{this, bind_ptr_base::adopt_tag{}};
3579         if (m_service) {
3580             Service::reset_trigger_exec(*m_service, *this);
3581             m_handler(); // Throws
3582         }
3583     }
3584 private:
3585     H m_handler;
3586 };
3587
3588 template<class H> inline Trigger::Trigger(Service& service, H handler) :
3589     m_exec_oper{new ExecOper<H>{*service.m_impl, std::move(handler)}} // Throws
3590 {
3591 }
3592
3593 inline Trigger::~Trigger() noexcept
3594 {
3595     if (m_exec_oper)
3596         m_exec_oper->orphan();
3597 }
3598
3599 inline void Trigger::trigger() noexcept
3600 {
3601     REALM_ASSERT(m_exec_oper);
3602     m_exec_oper->trigger();
3603 }
3604
3605 // ---------------- ReadAheadBuffer ----------------
3606
3607 inline ReadAheadBuffer::ReadAheadBuffer():
3608     m_buffer{new char[s_size]} // Throws
3609 {
3610 }
3611
3612 inline void ReadAheadBuffer::clear() noexcept
3613 {
3614     m_begin = nullptr;
3615     m_end   = nullptr;
3616 }
3617
3618 inline bool ReadAheadBuffer::empty() const noexcept
3619 {
3620     return (m_begin == m_end);
3621 }
3622
3623 template<class S> inline void ReadAheadBuffer::refill_sync(S& stream, std::error_code& ec) noexcept
3624 {
3625     char* buffer = m_buffer.get();
3626     std::size_t size = s_size;
3627     static_assert(noexcept(stream.do_read_some_sync(buffer, size, ec)), "");
3628     std::size_t n = stream.do_read_some_sync(buffer, size, ec);
3629     if (REALM_UNLIKELY(n == 0))
3630         return;
3631     REALM_ASSERT(!ec);
3632     REALM_ASSERT(n <= size);
3633     m_begin = m_buffer.get();
3634     m_end   = m_begin + n;
3635 }
3636
3637 template<class S>
3638 inline bool ReadAheadBuffer::refill_async(S& stream, std::error_code& ec, Want& want) noexcept
3639 {
3640     char* buffer = m_buffer.get();
3641     std::size_t size = s_size;
3642     static_assert(noexcept(stream.do_read_some_async(buffer, size, ec, want)), "");
3643     std::size_t n = stream.do_read_some_async(buffer, size, ec, want);
3644     if (n == 0)
3645         return false;
3646     REALM_ASSERT(!ec);
3647     REALM_ASSERT(n <= size);
3648     m_begin = m_buffer.get();
3649     m_end   = m_begin + n;
3650     return true;
3651 }
3652
3653 } // namespace network
3654 } // namespace util
3655 } // namespace realm
3656
3657 #endif // REALM_UTIL_NETWORK_HPP