1 /*************************************************************************
6 * [2011] - [2015] Realm Inc
9 * NOTICE: All information contained herein is, and remains
10 * the property of Realm Incorporated and its suppliers,
11 * if any. The intellectual and technical concepts contained
12 * herein are proprietary to Realm Incorporated
13 * and its suppliers and may be covered by U.S. and Foreign Patents,
14 * patents in process, and are protected by trade secret or copyright law.
15 * Dissemination of this information or reproduction of this material
16 * is strictly forbidden unless prior written permission is obtained
17 * from Realm Incorporated.
19 **************************************************************************/
20 #ifndef REALM_UTIL_NETWORK_HPP
21 #define REALM_UTIL_NETWORK_HPP
27 #include <system_error>
30 #include <sys/types.h>
33 # include <winsock2.h>
34 # include <ws2tcpip.h>
37 # pragma comment(lib, "Ws2_32.lib")
39 # include <sys/socket.h>
40 # include <arpa/inet.h>
44 #include <realm/util/features.h>
45 #include <realm/util/assert.hpp>
46 #include <realm/util/bind_ptr.hpp>
47 #include <realm/util/buffer.hpp>
48 #include <realm/util/basic_system_errors.hpp>
52 // Require Linux kernel version >= 2.6.27 such that we have epoll_create1(),
53 // `O_CLOEXEC`, and `EPOLLRDHUP`.
54 #if defined(__linux__)
55 # include <linux/version.h>
56 # if !defined(REALM_HAVE_EPOLL)
57 # if !defined(REALM_DISABLE_UTIL_NETWORK_EPOLL)
58 # if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,27)
59 # define REALM_HAVE_EPOLL 1
64 #if !defined(REALM_HAVE_EPOLL)
65 # define REALM_HAVE_EPOLL 0
70 // Available on Mac OS X, FreeBSD, NetBSD, OpenBSD
71 #if (defined(__MACH__) && defined(__APPLE__)) || defined(__FreeBSD__) || \
72 defined(__NetBSD__) || defined(__OpenBSD__)
73 # if !defined(REALM_HAVE_KQUEUE)
74 # if !defined(REALM_DISABLE_UTIL_NETWORK_KQUEUE)
75 # define REALM_HAVE_KQUEUE 1
79 #if !defined(REALM_HAVE_KQUEUE)
80 # define REALM_HAVE_KQUEUE 0
85 // FIXME: Unfinished business around `Address::m_ip_v6_scope_id`.
91 /// \brief TCP/IP networking API.
93 /// The design of this networking API is heavily inspired by the ASIO C++
94 /// library (http://think-async.com).
99 /// A *service context* is a set of objects consisting of an instance of
100 /// Service, and all the objects that are associated with that instance (\ref
101 /// Resolver, \ref Socket`, \ref Acceptor`, \ref DeadlineTimer, \ref Trigger,
102 /// and \ref ssl::Stream).
104 /// In general, it is unsafe for two threads to call functions on the same
105 /// object, or on different objects in the same service context. This also
106 /// applies to destructors. Notable exceptions are the fully thread-safe
107 /// functions, such as Service::post(), Service::stop(), and Service::reset().
109 /// On the other hand, it is always safe for two threads to call functions on
110 /// objects belonging to different service contexts.
112 /// One implication of these rules is that at most one thread must execute run()
113 /// at any given time, and if one thread is executing run(), then no other
114 /// thread is allowed to access objects in the same service context (with the
115 /// mentioned exceptions).
117 /// Unless otherwise specified, free-standing objects, such as \ref
118 /// StreamProtocol, \ref Address, \ref Endpoint, and \ref Endpoint::List are
119 /// fully thread-safe as long as they are not mutated. If one thread is mutating
120 /// such an object, no other thread may access it. Note that these free-standing
121 /// objects are not associcated with an instance of Service, and are therefore
122 /// not part of a service context.
125 /// ### Comparison with ASIO
127 /// There is a crucial difference between the two libraries in regards to the
128 /// guarantees that are provided about the cancelability of asynchronous
129 /// operations. The Realm networking library (this library) considers an
130 /// asynchronous operation to be complete precisely when the completion handler
131 /// starts to execute, and it guarantees that such an operation is cancelable up
132 /// until that point in time. In particular, if `cancel()` is called on a socket
133 /// or a deadline timer object before the completion handler starts to execute,
134 /// then that operation will be canceled, and will receive
135 /// `error::operation_aborted`. This guarantee is possible to provide (and free
136 /// of ambiguities) precisely because this library prohibits multiple threads
137 /// from executing the event loop concurrently, and because `cancel()` is
138 /// allowed to be called only from a completion handler (executed by the event
139 /// loop thread) or while no thread is executing the event loop. This guarantee
140 /// allows for safe destruction of sockets and deadline timers as long as the
141 /// completion handlers react appropriately to `error::operation_aborted`, in
142 /// particular, that they do not attempt to access the socket or deadline timer
143 /// object in such cases.
145 /// ASIO, on the other hand, allows for an asynchronous operation to complete
146 /// and become **uncancellable** before the completion handler starts to
147 /// execute. For this reason, it is possible with ASIO to get the completion
148 /// handler of an asynchronous wait operation to start executing and receive an
149 /// error code other than "operation aborted" at a point in time where
150 /// `cancel()` has already been called on the deadline timer object, or even at
151 /// a point in timer where the deadline timer has been destroyed. This seems
152 /// like an inevitable consequence of the fact that ASIO allows for multiple
153 /// threads to execute the event loop concurrently. This generally forces ASIO
154 /// applications to invent ways of extending the lifetime of deadline timer and
155 /// socket objects until the completion handler starts executing.
157 /// IMPORTANT: Even if ASIO is used in a way where at most one thread executes
158 /// the event loop, there is still no guarantee that an asynchronous operation
159 /// remains cancelable up until the point in time where the completion handler
160 /// starts to execute.
163 std::string host_name();
166 class StreamProtocol;
176 class ReadAheadBuffer;
182 /// \brief An IP protocol descriptor.
183 class StreamProtocol {
185 static StreamProtocol ip_v4();
186 static StreamProtocol ip_v6();
188 bool is_ip_v4() const;
189 bool is_ip_v6() const;
191 int protocol() const;
195 ~StreamProtocol() noexcept {}
202 friend class Resolver;
203 friend class SocketBase;
207 /// \brief An IP address (IPv4 or IPv6).
210 bool is_ip_v4() const;
211 bool is_ip_v6() const;
213 template<class C, class T>
214 friend std::basic_ostream<C,T>& operator<<(std::basic_ostream<C,T>&, const Address&);
217 ~Address() noexcept {}
220 using ip_v4_type = in_addr;
221 using ip_v6_type = in6_addr;
227 std::uint_least32_t m_ip_v6_scope_id = 0;
228 bool m_is_ip_v6 = false;
230 friend Address make_address(const char*, std::error_code&) noexcept;
231 friend class Endpoint;
234 Address make_address(const char* c_str);
235 Address make_address(const char* c_str, std::error_code& ec) noexcept;
236 Address make_address(const std::string&);
237 Address make_address(const std::string&, std::error_code& ec) noexcept;
240 /// \brief An IP endpoint.
242 /// An IP endpoint is a triplet (`protocol`, `address`, `port`).
245 using port_type = std::uint_fast16_t;
248 StreamProtocol protocol() const;
249 Address address() const;
250 port_type port() const;
253 Endpoint(const StreamProtocol&, port_type);
254 Endpoint(const Address&, port_type);
255 ~Endpoint() noexcept {}
257 using data_type = sockaddr;
259 const data_type* data() const;
262 StreamProtocol m_protocol;
264 using sockaddr_base_type = sockaddr;
265 using sockaddr_ip_v4_type = sockaddr_in;
266 using sockaddr_ip_v6_type = sockaddr_in6;
267 union sockaddr_union_type {
268 sockaddr_base_type m_base;
269 sockaddr_ip_v4_type m_ip_v4;
270 sockaddr_ip_v6_type m_ip_v6;
272 sockaddr_union_type m_sockaddr_union;
274 friend class Service;
275 friend class Resolver;
276 friend class SocketBase;
281 /// \brief A list of IP endpoints.
282 class Endpoint::List {
284 using iterator = const Endpoint*;
286 iterator begin() const noexcept;
287 iterator end() const noexcept;
288 std::size_t size() const noexcept;
289 bool empty() const noexcept;
291 List() noexcept = default;
292 List(List&&) noexcept = default;
293 ~List() noexcept = default;
295 List& operator=(List&&) noexcept = default;
298 Buffer<Endpoint> m_endpoints;
300 friend class Resolver;
304 /// \brief TCP/IP networking service.
310 /// \brief Execute the event loop.
312 /// Execute completion handlers of completed asynchronous operations, or
313 /// wait for more completion handlers to become ready for
314 /// execution. Handlers submitted via post() are considered immeditely
315 /// ready. If there are no completion handlers ready for execution, and
316 /// there are no asynchronous operations in progress, run() returns.
318 /// All completion handlers, including handlers submitted via post() will be
319 /// executed from run(), that is, by the thread that executes run(). If no
320 /// thread executes run(), then the completion handlers will not be
323 /// Exceptions thrown by completion handlers will always propagate back
326 /// Syncronous operations (e.g., Socket::connect()) execute independently of
327 /// the event loop, and do not require that any thread calls run().
330 /// @{ \brief Stop event loop execution.
332 /// stop() puts the event loop into the stopped mode. If a thread is
333 /// currently executing run(), it will be made to return in a timely
334 /// fashion, that is, without further blocking. If a thread is currently
335 /// blocked in run(), it will be unblocked. Handlers that can be executed
336 /// immediately, may, or may not be executed before run() returns, but new
337 /// handlers submitted by these, will not be executed before run()
338 /// returns. Also, if a handler is submitted by a call to post, and that
339 /// call happens after stop() returns, then that handler is guaranteed to
340 /// not be executed before run() returns (assuming that reset() is not called
341 /// before run() returns).
343 /// The event loop will remain in the stopped mode until reset() is
344 /// called. If reset() is called before run() returns, it may, or may not
345 /// cause run() to resume normal operation without returning.
347 /// Both stop() and reset() are thread-safe, that is, they may be called by
348 /// any thread. Also, both of these function may be called from completion
349 /// handlers (including posted handlers).
350 void stop() noexcept;
351 void reset() noexcept;
354 /// \brief Submit a handler to be executed by the event loop thread.
356 /// Register the sepcified completion handler for immediate asynchronous
357 /// execution. The specified handler will be executed by an expression on
358 /// the form `handler()`. If the the handler object is movable, it will
359 /// never be copied. Otherwise, it will be copied as necessary.
361 /// This function is thread-safe, that is, it may be called by any
362 /// thread. It may also be called from other completion handlers.
364 /// The handler will never be called as part of the execution of post(). It
365 /// will always be called by a thread that is executing run(). If no thread
366 /// is currently executing run(), the handler will not be executed until a
367 /// thread starts executing run(). If post() is called while another thread
368 /// is executing run(), the handler may be called before post() returns. If
369 /// post() is called from another completion handler, the submitted handler
370 /// is guaranteed to not be called during the execution of post().
372 /// Completion handlers added through post() will be executed in the order
373 /// that they are added. More precisely, if post() is called twice to add
374 /// two handlers, A and B, and the execution of post(A) ends before the
375 /// beginning of the execution of post(B), then A is guaranteed to execute
377 template<class H> void post(H handler);
380 enum class Want { nothing = 0, read, write };
382 template<class Oper> class OperQueue;
386 class TriggerExecOperBase;
388 template<class H> class PostOper;
390 class UnusedOper; // Allocated, but currently unused memory
392 template<class S> class BasicStreamOps;
394 struct OwnersOperDeleter {
395 void operator()(AsyncOper*) const noexcept;
397 struct LendersOperDeleter {
398 void operator()(AsyncOper*) const noexcept;
400 using OwnersOperPtr = std::unique_ptr<AsyncOper, OwnersOperDeleter>;
401 using LendersOperPtr = std::unique_ptr<AsyncOper, LendersOperDeleter>;
402 using LendersWaitOperPtr = std::unique_ptr<WaitOperBase, LendersOperDeleter>;
403 using LendersIoOperPtr = std::unique_ptr<IoOper, LendersOperDeleter>;
407 const std::unique_ptr<Impl> m_impl;
409 template<class Oper, class... Args>
410 static std::unique_ptr<Oper, LendersOperDeleter> alloc(OwnersOperPtr&, Args&&...);
412 template<class Oper> static void execute(std::unique_ptr<Oper, LendersOperDeleter>&);
414 using PostOperConstr = PostOperBase*(void* addr, std::size_t size, Impl&, void* cookie);
415 void do_post(PostOperConstr, std::size_t size, void* cookie);
417 static PostOperBase* post_oper_constr(void* addr, std::size_t size, Impl&, void* cookie);
418 static void recycle_post_oper(Impl&, PostOperBase*) noexcept;
419 static void trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
420 static void reset_trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
422 using clock = std::chrono::steady_clock;
424 friend class Resolver;
425 friend class SocketBase;
427 friend class Acceptor;
428 friend class DeadlineTimer;
429 friend class Trigger;
430 friend class ReadAheadBuffer;
431 friend class ssl::Stream;
435 template<class Oper> class Service::OperQueue {
437 using LendersOperPtr = std::unique_ptr<Oper, LendersOperDeleter>;
438 bool empty() const noexcept;
439 void push_back(LendersOperPtr) noexcept;
440 template<class Oper2> void push_back(OperQueue<Oper2>&) noexcept;
441 LendersOperPtr pop_front() noexcept;
442 void clear() noexcept;
443 OperQueue() noexcept = default;
444 OperQueue(OperQueue&&) noexcept;
445 ~OperQueue() noexcept;
447 Oper* m_back = nullptr;
448 template<class> friend class OperQueue;
452 class Service::Descriptor {
455 using native_handle_type = SOCKET;
457 using native_handle_type = int;
462 Descriptor(Impl& service) noexcept;
463 ~Descriptor() noexcept;
465 /// \param in_blocking_mode Must be true if, and only if the passed file
466 /// descriptor refers to a file description in which the file status flag
467 /// O_NONBLOCK is not set.
469 /// The passed file descriptor must have the file descriptor flag FD_CLOEXEC
471 void assign(native_handle_type fd, bool in_blocking_mode) noexcept;
472 void close() noexcept;
474 bool is_open() const noexcept;
476 native_handle_type native_handle() const noexcept;
477 bool in_blocking_mode() const noexcept;
479 void accept(Descriptor&, StreamProtocol, Endpoint*, std::error_code&) noexcept;
480 std::size_t read_some(char* buffer, std::size_t size, std::error_code&) noexcept;
481 std::size_t write_some(const char* data, std::size_t size, std::error_code&) noexcept;
483 /// \tparam Oper An operation type inherited from IoOper with an initate()
484 /// function that initiates the operation and figures out whether it needs
485 /// to read from, or write to the underlying descriptor to
486 /// proceed. `initiate()` must return Want::read if the operation needs to
487 /// read, or Want::write if the operation needs to write. If the operation
488 /// completes immediately (e.g. due to a failure during initialization),
489 /// `initiate()` must return Want::nothing.
490 template<class Oper, class... Args>
491 void initiate_oper(std::unique_ptr<Oper, LendersOperDeleter>, Args&&...);
493 void ensure_blocking_mode();
494 void ensure_nonblocking_mode();
497 native_handle_type m_fd = -1;
498 bool m_in_blocking_mode; // Not in nonblocking mode
500 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
503 bool m_imminent_end_of_input; // Kernel has seen the end of input
504 bool m_is_registered;
505 OperQueue<IoOper> m_suspended_read_ops, m_suspended_write_ops;
507 void deregister_for_async() noexcept;
510 bool assume_read_would_block() const noexcept;
511 bool assume_write_would_block() const noexcept;
513 void set_read_ready(bool) noexcept;
514 void set_write_ready(bool) noexcept;
516 void set_nonblock_flag(bool value);
517 void add_initiated_oper(LendersIoOperPtr, Want);
519 void do_close() noexcept;
521 friend class IoReactor;
530 ~Resolver() noexcept;
533 Service& get_service() noexcept;
535 /// @{ \brief Resolve the specified query to one or more endpoints.
536 Endpoint::List resolve(const Query&);
537 Endpoint::List resolve(const Query&, std::error_code&);
540 /// \brief Perform an asynchronous resolve operation.
542 /// Initiate an asynchronous resolve operation. The completion handler will
543 /// be called when the operation completes. The operation completes when it
544 /// succeeds, or an error occurs.
546 /// The completion handler is always executed by the event loop thread,
547 /// i.e., by a thread that is executing Service::run(). Conversely, the
548 /// completion handler is guaranteed to not be called while no thread is
549 /// executing Service::run(). The execution of the completion handler is
550 /// always deferred to the event loop, meaning that it never happens as a
551 /// synchronous side effect of the execution of async_resolve(), even when
552 /// async_resolve() is executed by the event loop thread. The completion
553 /// handler is guaranteed to be called eventually, as long as there is time
554 /// enough for the operation to complete or fail, and a thread is executing
555 /// Service::run() for long enough.
557 /// The operation can be canceled by calling cancel(), and will be
558 /// automatically canceled if the resolver object is destroyed. If the
559 /// operation is canceled, it will fail with `error::operation_aborted`. The
560 /// operation remains cancelable up until the point in time where the
561 /// completion handler starts to execute. This means that if cancel() is
562 /// called before the completion handler starts to execute, then the
563 /// completion handler is guaranteed to have `error::operation_aborted`
564 /// passed to it. This is true regardless of whether cancel() is called
565 /// explicitly or implicitly, such as when the resolver is destroyed.
567 /// The specified handler will be executed by an expression on the form
568 /// `handler(ec, endpoints)` where `ec` is the error code and `endpoints` is
569 /// an object of type `Endpoint::List`. If the the handler object is
570 /// movable, it will never be copied. Otherwise, it will be copied as
573 /// It is an error to start a new resolve operation (synchronous or
574 /// asynchronous) while an asynchronous resolve operation is in progress via
575 /// the same resolver object. An asynchronous resolve operation is
576 /// considered complete as soon as the completion handler starts to
577 /// execute. This means that a new resolve operation can be started from the
578 /// completion handler.
579 template<class H> void async_resolve(Query, H handler);
581 /// \brief Cancel all asynchronous operations.
583 /// Cause all incomplete asynchronous operations, that are associated with
584 /// this resolver (at most one), to fail with `error::operation_aborted`. An
585 /// asynchronous operation is complete precisely when its completion handler
586 /// starts executing.
588 /// Completion handlers of canceled operations will become immediately ready
589 /// to execute, but will never be executed directly as part of the execution
592 /// Cancellation happens automatically when the resolver object is destroyed.
593 void cancel() noexcept;
596 class ResolveOperBase;
597 template<class H> class ResolveOper;
599 using LendersResolveOperPtr = std::unique_ptr<ResolveOperBase, Service::LendersOperDeleter>;
601 Service::Impl& m_service_impl;
603 Service::OwnersOperPtr m_resolve_oper;
605 void initiate_oper(LendersResolveOperPtr);
609 class Resolver::Query {
612 /// Locally bound socket endpoint (server side)
613 passive = AI_PASSIVE,
615 /// Ignore families without a configured non-loopback address
616 address_configured = AI_ADDRCONFIG
619 Query(std::string service_port, int init_flags = passive|address_configured);
620 Query(const StreamProtocol&, std::string service_port,
621 int init_flags = passive|address_configured);
622 Query(std::string host_name, std::string service_port,
623 int init_flags = address_configured);
624 Query(const StreamProtocol&, std::string host_name, std::string service_port,
625 int init_flags = address_configured);
630 StreamProtocol protocol() const;
631 std::string host() const;
632 std::string service() const;
636 StreamProtocol m_protocol;
637 std::string m_host; // hostname
638 std::string m_service; // port
640 friend class Resolver;
646 using native_handle_type = Service::Descriptor::native_handle_type;
648 ~SocketBase() noexcept;
651 Service& get_service() noexcept;
653 bool is_open() const noexcept;
654 native_handle_type native_handle() const noexcept;
656 /// @{ \brief Open the socket for use with the specified protocol.
658 /// It is an error to call open() on a socket that is already open.
659 void open(const StreamProtocol&);
660 std::error_code open(const StreamProtocol&, std::error_code&);
663 /// \brief Close this socket.
665 /// If the socket is open, it will be closed. If it is already closed (or
666 /// never opened), this function does nothing (idempotency).
668 /// A socket is automatically closed when destroyed.
670 /// When the socket is closed, any incomplete asynchronous operation will be
671 /// canceled (as if cancel() was called).
672 void close() noexcept;
674 /// \brief Cancel all asynchronous operations.
676 /// Cause all incomplete asynchronous operations, that are associated with
677 /// this socket, to fail with `error::operation_aborted`. An asynchronous
678 /// operation is complete precisely when its completion handler starts
681 /// Completion handlers of canceled operations will become immediately ready
682 /// to execute, but will never be executed directly as part of the execution
684 void cancel() noexcept;
687 void get_option(O& opt) const;
690 std::error_code get_option(O& opt, std::error_code&) const;
693 void set_option(const O& opt);
696 std::error_code set_option(const O& opt, std::error_code&);
698 void bind(const Endpoint&);
699 std::error_code bind(const Endpoint&, std::error_code&);
701 Endpoint local_endpoint() const;
702 Endpoint local_endpoint(std::error_code&) const;
706 opt_ReuseAddr, ///< `SOL_SOCKET`, `SO_REUSEADDR`
707 opt_Linger, ///< `SOL_SOCKET`, `SO_LINGER`
708 opt_NoDelay, ///< `IPPROTO_TCP`, `TCP_NODELAY` (disable the Nagle algorithm)
711 template<class, int, class> class Option;
714 using reuse_address = Option<bool, opt_ReuseAddr, int>;
715 using no_delay = Option<bool, opt_NoDelay, int>;
717 // linger struct defined by POSIX sys/socket.h.
719 using linger = Option<linger_opt, opt_Linger, struct linger>;
722 Service::Descriptor m_desc;
725 StreamProtocol m_protocol;
728 Service::OwnersOperPtr m_read_oper; // Read or accept
729 Service::OwnersOperPtr m_write_oper; // Write or connect
731 SocketBase(Service&);
733 const StreamProtocol& get_protocol() const noexcept;
734 std::error_code do_assign(const StreamProtocol&, native_handle_type, std::error_code&);
735 void do_close() noexcept;
737 void get_option(opt_enum, void* value_data, std::size_t& value_size, std::error_code&) const;
738 void set_option(opt_enum, const void* value_data, std::size_t value_size, std::error_code&);
739 void map_option(opt_enum, int& level, int& option_name) const;
741 friend class Acceptor;
745 template<class T, int opt, class U> class SocketBase::Option {
747 Option(T value = T());
753 void get(const SocketBase&, std::error_code&);
754 void set(SocketBase&, std::error_code&) const;
756 friend class SocketBase;
759 struct SocketBase::linger_opt {
760 linger_opt(bool enable, int timeout_seconds = 0)
762 m_linger.l_onoff = enable ? 1 : 0;
763 m_linger.l_linger = timeout_seconds;
768 operator ::linger() const { return m_linger; }
770 bool enabled() const { return m_linger.l_onoff != 0; }
771 int timeout() const { return m_linger.l_linger; }
775 /// Switching between synchronous and asynchronous operations is allowed, but
776 /// only in a nonoverlapping fashion. That is, a synchronous operation is not
777 /// allowed to run concurrently with an asynchronous one on the same
778 /// socket. Note that an asynchronous operation is considered to be running
779 /// until its completion handler starts executing.
780 class Socket: public SocketBase {
784 /// \brief Create a socket with an already-connected native socket handle.
786 /// This constructor is shorthand for creating the socket with the
787 /// one-argument constructor, and then calling the two-argument assign()
788 /// with the specified protocol and native handle.
789 Socket(Service&, const StreamProtocol&, native_handle_type);
793 void connect(const Endpoint&);
794 std::error_code connect(const Endpoint&, std::error_code&);
796 /// @{ \brief Perform a synchronous read operation.
798 /// read() will not return until the specified buffer is full, or an error
799 /// occurs. Reaching the end of input before the buffer is filled, is
800 /// considered an error, and will cause the operation to fail with
801 /// `network::end_of_input`.
803 /// read_until() will not return until the specified buffer contains the
804 /// specified delimiter, or an error occurs. If the buffer is filled before
805 /// the delimiter is found, the operation fails with
806 /// `network::delim_not_found`. Otherwise, if the end of input is reached
807 /// before the delimiter is found, the operation fails with
808 /// `network::end_of_input`. If the operation succeeds, the last byte placed
809 /// in the buffer is the delimiter.
811 /// The versions that take a ReadAheadBuffer argument will read through that
812 /// buffer. This allows for fewer larger reads on the underlying
813 /// socket. Since unconsumed data may be left in the read-ahead buffer after
814 /// a read operation returns, it is important that the same read-ahead
815 /// buffer is passed to the next read operation.
817 /// The versions of read() and read_until() that do not take an
818 /// `std::error_code&` argument will throw std::system_error on failure.
820 /// The versions that do take an `std::error_code&` argument will set \a ec
821 /// to `std::error_code()` on success, and to something else on failure. On
822 /// failure they will return the number of bytes placed in the specified
823 /// buffer before the error occured.
825 /// \return The number of bytes places in the specified buffer upon return.
826 std::size_t read(char* buffer, std::size_t size);
827 std::size_t read(char* buffer, std::size_t size, std::error_code& ec);
828 std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&);
829 std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&, std::error_code& ec);
830 std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&);
831 std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&,
832 std::error_code& ec);
835 /// @{ \brief Perform a synchronous write operation.
837 /// write() will not return until all the specified bytes have been written
838 /// to the socket, or an error occurs.
840 /// The versions of write() that does not take an `std::error_code&`
841 /// argument will throw std::system_error on failure. When it succeeds, it
842 /// always returns \a size.
844 /// The versions that does take an `std::error_code&` argument will set \a
845 /// ec to `std::error_code()` on success, and to something else on
846 /// failure. On success it returns \a size. On faulure it returns the number
847 /// of bytes written before the failure occured.
848 std::size_t write(const char* data, std::size_t size);
849 std::size_t write(const char* data, std::size_t size, std::error_code& ec);
852 /// @{ \brief Read at least one byte from this socket.
854 /// If \a size is zero, both versions of read_some() will return zero
855 /// without blocking. Read errors may or may not be detected in this case.
857 /// Otherwise, if \a size is greater than zero, and at least one byte is
858 /// immediately available, that is, without blocking, then both versions
859 /// will read at least one byte (but generally as many immediately available
860 /// bytes as will fit into the specified buffer), and return without
863 /// Otherwise, both versions will block the calling thread until at least one
864 /// byte becomes available, or an error occurs.
866 /// In this context, it counts as an error, if the end of input is reached
867 /// before at least one byte becomes available (see
868 /// `network::end_of_input`).
870 /// If no error occurs, both versions will return the number of bytes placed
871 /// in the specified buffer, which is generally as many as are immediately
872 /// available at the time when the first byte becomes available, although
873 /// never more than \a size.
875 /// If no error occurs, the three-argument version will set \a ec to
876 /// indicate success.
878 /// If an error occurs, the two-argument version will throw
879 /// `std::system_error`, while the three-argument version will set \a ec to
880 /// indicate the error, and return zero.
882 /// As long as \a size is greater than zero, the two argument version will
883 /// always return a value that is greater than zero, while the three
884 /// argument version will return a value greater than zero when, and only
885 /// when \a ec is set to indicate success (no error, and no end of input).
886 std::size_t read_some(char* buffer, std::size_t size);
887 std::size_t read_some(char* buffer, std::size_t size, std::error_code& ec);
890 /// @{ \brief Write at least one byte to this socket.
892 /// If \a size is zero, both versions of write_some() will return zero
893 /// without blocking. Write errors may or may not be detected in this case.
895 /// Otherwise, if \a size is greater than zero, and at least one byte can be
896 /// written immediately, that is, without blocking, then both versions will
897 /// write at least one byte (but generally as many as can be written
898 /// immediately), and return without blocking.
900 /// Otherwise, both versions will block the calling thread until at least one
901 /// byte can be written, or an error occurs.
903 /// If no error occurs, both versions will return the number of bytes
904 /// written, which is generally as many as can be written immediately at the
905 /// time when the first byte can be written.
907 /// If no error occurs, the three-argument version will set \a ec to
908 /// indicate success.
910 /// If an error occurs, the two-argument version will throw
911 /// `std::system_error`, while the three-argument version will set \a ec to
912 /// indicate the error, and return zero.
914 /// As long as \a size is greater than zero, the two argument version will
915 /// always return a value that is greater than zero, while the three
916 /// argument version will return a value greater than zero when, and only
917 /// when \a ec is set to indicate success.
918 std::size_t write_some(const char* data, std::size_t size);
919 std::size_t write_some(const char* data, std::size_t size, std::error_code&);
922 /// \brief Perform an asynchronous connect operation.
924 /// Initiate an asynchronous connect operation. The completion handler is
925 /// called when the operation completes. The operation completes when the
926 /// connection is established, or an error occurs.
928 /// The completion handler is always executed by the event loop thread,
929 /// i.e., by a thread that is executing Service::run(). Conversely, the
930 /// completion handler is guaranteed to not be called while no thread is
931 /// executing Service::run(). The execution of the completion handler is
932 /// always deferred to the event loop, meaning that it never happens as a
933 /// synchronous side effect of the execution of async_connect(), even when
934 /// async_connect() is executed by the event loop thread. The completion
935 /// handler is guaranteed to be called eventually, as long as there is time
936 /// enough for the operation to complete or fail, and a thread is executing
937 /// Service::run() for long enough.
939 /// The operation can be canceled by calling cancel(), and will be
940 /// automatically canceled if the socket is closed. If the operation is
941 /// canceled, it will fail with `error::operation_aborted`. The operation
942 /// remains cancelable up until the point in time where the completion
943 /// handler starts to execute. This means that if cancel() is called before
944 /// the completion handler starts to execute, then the completion handler is
945 /// guaranteed to have `error::operation_aborted` passed to it. This is true
946 /// regardless of whether cancel() is called explicitly or implicitly, such
947 /// as when the socket is destroyed.
949 /// If the socket is not already open, it will be opened as part of the
950 /// connect operation as if by calling `open(ep.protocol())`. If the opening
951 /// operation succeeds, but the connect operation fails, the socket will be
952 /// left in the opened state.
954 /// The specified handler will be executed by an expression on the form
955 /// `handler(ec)` where `ec` is the error code. If the the handler object is
956 /// movable, it will never be copied. Otherwise, it will be copied as
959 /// It is an error to start a new connect operation (synchronous or
960 /// asynchronous) while an asynchronous connect operation is in progress. An
961 /// asynchronous connect operation is considered complete as soon as the
962 /// completion handler starts to execute.
964 /// \param ep The remote endpoint of the connection to be established.
965 template<class H> void async_connect(const Endpoint& ep, H handler);
967 /// @{ \brief Perform an asynchronous read operation.
969 /// Initiate an asynchronous buffered read operation on the associated
970 /// socket. The completion handler will be called when the operation
971 /// completes, or an error occurs.
973 /// async_read() will continue reading until the specified buffer is full,
974 /// or an error occurs. If the end of input is reached before the buffer is
975 /// filled, the operation fails with `network::end_of_input`.
977 /// async_read_until() will continue reading until the specified buffer
978 /// contains the specified delimiter, or an error occurs. If the buffer is
979 /// filled before a delimiter is found, the operation fails with
980 /// `network::delim_not_found`. Otherwise, if the end of input is reached
981 /// before a delimiter is found, the operation fails with
982 /// `network::end_of_input`. Otherwise, if the operation succeeds, the last
983 /// byte placed in the buffer is the delimiter.
985 /// The versions that take a ReadAheadBuffer argument will read through that
986 /// buffer. This allows for fewer larger reads on the underlying
987 /// socket. Since unconsumed data may be left in the read-ahead buffer after
988 /// a read operation completes, it is important that the same read-ahead
989 /// buffer is passed to the next read operation.
991 /// The completion handler is always executed by the event loop thread,
992 /// i.e., by a thread that is executing Service::run(). Conversely, the
993 /// completion handler is guaranteed to not be called while no thread is
994 /// executing Service::run(). The execution of the completion handler is
995 /// always deferred to the event loop, meaning that it never happens as a
996 /// synchronous side effect of the execution of async_read() or
997 /// async_read_until(), even when async_read() or async_read_until() is
998 /// executed by the event loop thread. The completion handler is guaranteed
999 /// to be called eventually, as long as there is time enough for the
1000 /// operation to complete or fail, and a thread is executing Service::run()
1001 /// for long enough.
1003 /// The operation can be canceled by calling cancel() on the associated
1004 /// socket, and will be automatically canceled if the associated socket is
1005 /// closed. If the operation is canceled, it will fail with
1006 /// `error::operation_aborted`. The operation remains cancelable up until
1007 /// the point in time where the completion handler starts to execute. This
1008 /// means that if cancel() is called before the completion handler starts to
1009 /// execute, then the completion handler is guaranteed to have
1010 /// `error::operation_aborted` passed to it. This is true regardless of
1011 /// whether cancel() is called explicitly or implicitly, such as when the
1012 /// socket is destroyed.
1014 /// The specified handler will be executed by an expression on the form
1015 /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1016 /// bytes placed in the buffer (of type `std::size_t`). `n` is guaranteed to
1017 /// be less than, or equal to \a size. If the the handler object is movable,
1018 /// it will never be copied. Otherwise, it will be copied as necessary.
1020 /// It is an error to start a read operation before the associated socket is
1023 /// It is an error to start a new read operation (synchronous or
1024 /// asynchronous) while an asynchronous read operation is in progress. An
1025 /// asynchronous read operation is considered complete as soon as the
1026 /// completion handler starts executing. This means that a new read
1027 /// operation can be started from the completion handler of another
1028 /// asynchronous buffered read operation.
1029 template<class H> void async_read(char* buffer, std::size_t size, H handler);
1030 template<class H> void async_read(char* buffer, std::size_t size, ReadAheadBuffer&, H handler);
1031 template<class H> void async_read_until(char* buffer, std::size_t size, char delim,
1032 ReadAheadBuffer&, H handler);
1035 /// \brief Perform an asynchronous write operation.
1037 /// Initiate an asynchronous write operation. The completion handler is
1038 /// called when the operation completes. The operation completes when all
1039 /// the specified bytes have been written to the socket, or an error occurs.
1041 /// The completion handler is always executed by the event loop thread,
1042 /// i.e., by a thread that is executing Service::run(). Conversely, the
1043 /// completion handler is guaranteed to not be called while no thread is
1044 /// executing Service::run(). The execution of the completion handler is
1045 /// always deferred to the event loop, meaning that it never happens as a
1046 /// synchronous side effect of the execution of async_write(), even when
1047 /// async_write() is executed by the event loop thread. The completion
1048 /// handler is guaranteed to be called eventually, as long as there is time
1049 /// enough for the operation to complete or fail, and a thread is executing
1050 /// Service::run() for long enough.
1052 /// The operation can be canceled by calling cancel(), and will be
1053 /// automatically canceled if the socket is closed. If the operation is
1054 /// canceled, it will fail with `error::operation_aborted`. The operation
1055 /// remains cancelable up until the point in time where the completion
1056 /// handler starts to execute. This means that if cancel() is called before
1057 /// the completion handler starts to execute, then the completion handler is
1058 /// guaranteed to have `error::operation_aborted` passed to it. This is true
1059 /// regardless of whether cancel() is called explicitly or implicitly, such
1060 /// as when the socket is destroyed.
1062 /// The specified handler will be executed by an expression on the form
1063 /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1064 /// bytes written (of type `std::size_t`). If the the handler object is
1065 /// movable, it will never be copied. Otherwise, it will be copied as
1068 /// It is an error to start an asynchronous write operation before the
1069 /// socket is connected.
1071 /// It is an error to start a new write operation (synchronous or
1072 /// asynchronous) while an asynchronous write operation is in progress. An
1073 /// asynchronous write operation is considered complete as soon as the
1074 /// completion handler starts to execute. This means that a new write
1075 /// operation can be started from the completion handler of another
1076 /// asynchronous write operation.
1077 template<class H> void async_write(const char* data, std::size_t size, H handler);
1079 template<class H> void async_read_some(char* buffer, std::size_t size, H handler);
1080 template<class H> void async_write_some(const char* data, std::size_t size, H handler);
1082 enum shutdown_type {
1084 /// Shutdown the receiving side of the socket.
1085 shutdown_receive = SD_RECEIVE,
1087 /// Shutdown the sending side of the socket.
1088 shutdown_send = SD_SEND,
1090 /// Shutdown both sending and receiving side of the socket.
1091 shutdown_both = SD_BOTH
1093 shutdown_receive = SHUT_RD,
1094 shutdown_send = SHUT_WR,
1095 shutdown_both = SHUT_RDWR
1099 /// @{ \brief Shut down the connected sockets sending and/or receiving
1102 /// It is an error to call this function when the socket is not both open
1104 void shutdown(shutdown_type);
1105 std::error_code shutdown(shutdown_type, std::error_code&);
1108 /// @{ \brief Initialize socket with an already-connected native socket
1111 /// The specified native handle must refer to a socket that is already fully
1112 /// open and connected.
1114 /// If the assignment operation succeeds, this socket object has taken
1115 /// ownership of the specified native handle, and the handle will be closed
1116 /// when the socket object is destroyed, (or when close() is called). If the
1117 /// operation fails, the caller still owns the specified native handle.
1119 /// It is an error to call connect() or async_connect() on a socket object
1120 /// that is initialized this way (unless it is first closed).
1122 /// It is an error to call this function on a socket object that is already
1124 void assign(const StreamProtocol&, native_handle_type);
1125 std::error_code assign(const StreamProtocol&, native_handle_type, std::error_code&);
1128 /// Returns a reference to this socket, as this socket is the lowest layer
1130 Socket& lowest_layer() noexcept;
1133 using Want = Service::Want;
1134 using StreamOps = Service::BasicStreamOps<Socket>;
1136 class ConnectOperBase;
1137 template<class H> class ConnectOper;
1139 using LendersConnectOperPtr = std::unique_ptr<ConnectOperBase, Service::LendersOperDeleter>;
1141 // `ec` untouched on success, but no immediate completion
1142 bool initiate_async_connect(const Endpoint&, std::error_code& ec);
1143 // `ec` untouched on success
1144 std::error_code finalize_async_connect(std::error_code& ec) noexcept;
1146 // See Service::BasicStreamOps for details on these these 6 functions.
1147 void do_init_read_async(std::error_code&, Want&) noexcept;
1148 void do_init_write_async(std::error_code&, Want&) noexcept;
1149 std::size_t do_read_some_sync(char* buffer, std::size_t size,
1150 std::error_code&) noexcept;
1151 std::size_t do_write_some_sync(const char* data, std::size_t size,
1152 std::error_code&) noexcept;
1153 std::size_t do_read_some_async(char* buffer, std::size_t size,
1154 std::error_code&, Want&) noexcept;
1155 std::size_t do_write_some_async(const char* data, std::size_t size,
1156 std::error_code&, Want&) noexcept;
1158 friend class Service::BasicStreamOps<Socket>;
1159 friend class Service::BasicStreamOps<ssl::Stream>;
1160 friend class ReadAheadBuffer;
1161 friend class ssl::Stream;
1165 /// Switching between synchronous and asynchronous operations is allowed, but
1166 /// only in a nonoverlapping fashion. That is, a synchronous operation is not
1167 /// allowed to run concurrently with an asynchronous one on the same
1168 /// acceptor. Note that an asynchronous operation is considered to be running
1169 /// until its completion handler starts executing.
1170 class Acceptor: public SocketBase {
1173 ~Acceptor() noexcept;
1175 static constexpr int max_connections = SOMAXCONN;
1177 void listen(int backlog = max_connections);
1178 std::error_code listen(int backlog, std::error_code&);
1180 void accept(Socket&);
1181 void accept(Socket&, Endpoint&);
1182 std::error_code accept(Socket&, std::error_code&);
1183 std::error_code accept(Socket&, Endpoint&, std::error_code&);
1185 /// @{ \brief Perform an asynchronous accept operation.
1187 /// Initiate an asynchronous accept operation. The completion handler will
1188 /// be called when the operation completes. The operation completes when the
1189 /// connection is accepted, or an error occurs. If the operation succeeds,
1190 /// the specified local socket will have become connected to a remote
1193 /// The completion handler is always executed by the event loop thread,
1194 /// i.e., by a thread that is executing Service::run(). Conversely, the
1195 /// completion handler is guaranteed to not be called while no thread is
1196 /// executing Service::run(). The execution of the completion handler is
1197 /// always deferred to the event loop, meaning that it never happens as a
1198 /// synchronous side effect of the execution of async_accept(), even when
1199 /// async_accept() is executed by the event loop thread. The completion
1200 /// handler is guaranteed to be called eventually, as long as there is time
1201 /// enough for the operation to complete or fail, and a thread is executing
1202 /// Service::run() for long enough.
1204 /// The operation can be canceled by calling cancel(), and will be
1205 /// automatically canceled if the acceptor is closed. If the operation is
1206 /// canceled, it will fail with `error::operation_aborted`. The operation
1207 /// remains cancelable up until the point in time where the completion
1208 /// handler starts to execute. This means that if cancel() is called before
1209 /// the completion handler starts to execute, then the completion handler is
1210 /// guaranteed to have `error::operation_aborted` passed to it. This is true
1211 /// regardless of whether cancel() is called explicitly or implicitly, such
1212 /// as when the acceptor is destroyed.
1214 /// The specified handler will be executed by an expression on the form
1215 /// `handler(ec)` where `ec` is the error code. If the the handler object is
1216 /// movable, it will never be copied. Otherwise, it will be copied as
1219 /// It is an error to start a new accept operation (synchronous or
1220 /// asynchronous) while an asynchronous accept operation is in progress. An
1221 /// asynchronous accept operation is considered complete as soon as the
1222 /// completion handler starts executing. This means that a new accept
1223 /// operation can be started from the completion handler.
1225 /// \param sock This is the local socket, that upon successful completion
1226 /// will have become connected to the remote socket. It must be in the
1227 /// closed state (Socket::is_open()) when async_accept() is called.
1229 /// \param ep Upon completion, the remote peer endpoint will have been
1230 /// assigned to this variable.
1231 template<class H> void async_accept(Socket& sock, H handler);
1232 template<class H> void async_accept(Socket& sock, Endpoint& ep, H handler);
1236 using Want = Service::Want;
1238 class AcceptOperBase;
1239 template<class H> class AcceptOper;
1241 using LendersAcceptOperPtr = std::unique_ptr<AcceptOperBase, Service::LendersOperDeleter>;
1243 std::error_code accept(Socket&, Endpoint*, std::error_code&);
1244 Want do_accept_async(Socket&, Endpoint*, std::error_code&) noexcept;
1246 template<class H> void async_accept(Socket&, Endpoint*, H);
1250 /// \brief A timer object supporting asynchronous wait operations.
1251 class DeadlineTimer {
1253 DeadlineTimer(Service&);
1254 ~DeadlineTimer() noexcept;
1257 Service& get_service() noexcept;
1259 /// \brief Perform an asynchronous wait operation.
1261 /// Initiate an asynchronous wait operation. The completion handler becomes
1262 /// ready to execute when the expiration time is reached, or an error occurs
1263 /// (cancellation counts as an error here). The expiration time is the time
1264 /// of initiation plus the specified delay. The error code passed to the
1265 /// complition handler will **never** indicate success, unless the
1266 /// expiration time was reached.
1268 /// The completion handler is always executed by the event loop thread,
1269 /// i.e., by a thread that is executing Service::run(). Conversely, the
1270 /// completion handler is guaranteed to not be called while no thread is
1271 /// executing Service::run(). The execution of the completion handler is
1272 /// always deferred to the event loop, meaning that it never happens as a
1273 /// synchronous side effect of the execution of async_wait(), even when
1274 /// async_wait() is executed by the event loop thread. The completion
1275 /// handler is guaranteed to be called eventually, as long as there is time
1276 /// enough for the operation to complete or fail, and a thread is executing
1277 /// Service::run() for long enough.
1279 /// The operation can be canceled by calling cancel(), and will be
1280 /// automatically canceled if the timer is destroyed. If the operation is
1281 /// canceled, it will fail with `error::operation_aborted`. The operation
1282 /// remains cancelable up until the point in time where the completion
1283 /// handler starts to execute. This means that if cancel() is called before
1284 /// the completion handler starts to execute, then the completion handler is
1285 /// guaranteed to have `error::operation_aborted` passed to it. This is true
1286 /// regardless of whether cancel() is called explicitly or implicitly, such
1287 /// as when the timer is destroyed.
1289 /// The specified handler will be executed by an expression on the form
1290 /// `handler(ec)` where `ec` is the error code. If the the handler object is
1291 /// movable, it will never be copied. Otherwise, it will be copied as
1294 /// It is an error to start a new asynchronous wait operation while an
1295 /// another one is in progress. An asynchronous wait operation is in
1296 /// progress until its completion handler starts executing.
1297 template<class R, class P, class H>
1298 void async_wait(std::chrono::duration<R,P> delay, H handler);
1300 /// \brief Cancel an asynchronous wait operation.
1302 /// If an asynchronous wait operation, that is associated with this deadline
1303 /// timer, is in progress, cause it to fail with
1304 /// `error::operation_aborted`. An asynchronous wait operation is in
1305 /// progress until its completion handler starts executing.
1307 /// Completion handlers of canceled operations will become immediately ready
1308 /// to execute, but will never be executed directly as part of the execution
1311 /// Cancellation happens automatically when the timer object is destroyed.
1312 void cancel() noexcept;
1315 template<class H> class WaitOper;
1317 using clock = Service::clock;
1319 Service::Impl& m_service_impl;
1320 Service::OwnersOperPtr m_wait_oper;
1322 void add_oper(Service::LendersWaitOperPtr);
1326 /// \brief Register a function whose invocation can be triggered repeatedly.
1328 /// While the function is always executed by the event loop thread, the
1329 /// triggering of its execution can be done by any thread, and the triggering
1330 /// operation is guaranteed to never throw.
1332 /// The function is guaranteed to not be called after the Trigger object is
1333 /// destroyed. It is safe, though, to destroy the Trigger object during the
1334 /// execution of the function.
1336 /// Note that even though the trigger() function is thread-safe, the Trigger
1337 /// object, as a whole, is not. In particular, construction and destruction must
1338 /// not be considered thread-safe.
1340 /// ### Relation to post()
1342 /// For a particular execution of trigger() and a particular invocation of
1343 /// Service::post(), if the execution of trigger() ends before the execution of
1344 /// Service::post() begins, then it is guaranteed that the function associated
1345 /// with the trigger gets to execute at least once after the execution of
1346 /// trigger() begins, and before the post handler gets to execute.
1349 template<class F> Trigger(Service&, F func);
1350 ~Trigger() noexcept;
1352 Trigger() noexcept = default;
1353 Trigger(Trigger&&) noexcept = default;
1354 Trigger& operator=(Trigger&&) noexcept = default;
1356 /// \brief Trigger another invocation of the associated function.
1358 /// An invocation of trigger() puts the Trigger object into the triggered
1359 /// state. It remains in the triggered state until shortly before the
1360 /// function starts to execute. While the Trigger object is in the triggered
1361 /// state, trigger() has no effect. This means that the number of executions
1362 /// of the function will generally be less that the number of times
1363 /// trigger() is invoked().
1365 /// A particular invocation of trigger() ensures that there will be at least
1366 /// one invocation of the associated function whose execution begins after
1367 /// the beginning of the execution of trigger(), so long as the event loop
1368 /// thread does not exit prematurely from run().
1370 /// If trigger() is invoked from the event loop thread, the next execution
1371 /// of the associated function will not begin until after trigger returns(),
1372 /// effectively preventing reentrancy for the associated function.
1374 /// If trigger() is invoked from another thread, the associated function may
1375 /// start to execute before trigger() returns.
1377 /// Note that the associated function can retrigger itself, i.e., if the
1378 /// associated function calls trigger(), then that will lead to another
1379 /// invocation of the associated function, but not until the first
1380 /// invocation ends (no reentrance).
1382 /// This function is thread-safe.
1383 void trigger() noexcept;
1386 template<class H> class ExecOper;
1388 util::bind_ptr<Service::TriggerExecOperBase> m_exec_oper;
1392 class ReadAheadBuffer {
1396 /// Discard any buffered data.
1397 void clear() noexcept;
1400 using Want = Service::Want;
1402 char* m_begin = nullptr;
1403 char* m_end = nullptr;
1404 static constexpr std::size_t s_size = 1024;
1405 const std::unique_ptr<char[]> m_buffer;
1407 bool empty() const noexcept;
1408 bool read(char*& begin, char* end, int delim, std::error_code&) noexcept;
1409 template<class S> void refill_sync(S& stream, std::error_code&) noexcept;
1410 template<class S> bool refill_async(S& stream, std::error_code&, Want&) noexcept;
1412 template<class> friend class Service::BasicStreamOps;
1420 /// Delimiter not found.
1423 /// Host not found (authoritative).
1426 /// Host not found (non-authoritative).
1427 host_not_found_try_again,
1429 /// The query is valid but does not have associated address data.
1432 /// A non-recoverable error occurred.
1435 /// The service is not supported for the given socket type.
1438 /// The socket type is not supported.
1439 socket_type_not_supported,
1441 /// Premature end of input (e.g., end of input before reception of SSL
1442 /// shutdown alert).
1443 premature_end_of_input
1446 std::error_code make_error_code(errors);
1448 } // namespace network
1450 } // namespace realm
1454 template<> class is_error_code_enum<realm::util::network::errors> {
1456 static const bool value = true;
1471 // ---------------- StreamProtocol ----------------
1473 inline StreamProtocol StreamProtocol::ip_v4()
1475 StreamProtocol prot;
1476 prot.m_family = AF_INET;
1480 inline StreamProtocol StreamProtocol::ip_v6()
1482 StreamProtocol prot;
1483 prot.m_family = AF_INET6;
1487 inline bool StreamProtocol::is_ip_v4() const
1489 return m_family == AF_INET;
1492 inline bool StreamProtocol::is_ip_v6() const
1494 return m_family == AF_INET6;
1497 inline int StreamProtocol::family() const
1502 inline int StreamProtocol::protocol() const
1507 inline StreamProtocol::StreamProtocol():
1508 m_family{AF_UNSPEC}, // Allow both IPv4 and IPv6
1509 m_socktype{SOCK_STREAM}, // Or SOCK_DGRAM for UDP
1510 m_protocol{0} // Any protocol
1514 // ---------------- Address ----------------
1516 inline bool Address::is_ip_v4() const
1521 inline bool Address::is_ip_v6() const
1526 template<class C, class T>
1527 inline std::basic_ostream<C,T>& operator<<(std::basic_ostream<C,T>& out, const Address& addr)
1529 // FIXME: Not taking `addr.m_ip_v6_scope_id` into account. What does ASIO
1531 union buffer_union {
1532 char ip_v4[INET_ADDRSTRLEN];
1533 char ip_v6[INET6_ADDRSTRLEN];
1535 char buffer[sizeof (buffer_union)];
1536 int af = addr.m_is_ip_v6 ? AF_INET6 : AF_INET;
1538 void* src = const_cast<void*>(reinterpret_cast<const void*>(&addr.m_union));
1540 const void* src = &addr.m_union;
1542 const char* ret = ::inet_ntop(af, src, buffer, sizeof buffer);
1544 std::error_code ec = make_basic_system_error_code(errno);
1545 throw std::system_error(ec);
1551 inline Address::Address()
1553 m_union.m_ip_v4 = ip_v4_type();
1556 inline Address make_address(const char* c_str)
1559 Address addr = make_address(c_str, ec);
1561 throw std::system_error(ec);
1565 inline Address make_address(const std::string& str)
1568 Address addr = make_address(str, ec);
1570 throw std::system_error(ec);
1574 inline Address make_address(const std::string& str, std::error_code& ec) noexcept
1576 return make_address(str.c_str(), ec);
1579 // ---------------- Endpoint ----------------
1581 inline StreamProtocol Endpoint::protocol() const
1586 inline Address Endpoint::address() const
1589 if (m_protocol.is_ip_v4()) {
1590 addr.m_union.m_ip_v4 = m_sockaddr_union.m_ip_v4.sin_addr;
1593 addr.m_union.m_ip_v6 = m_sockaddr_union.m_ip_v6.sin6_addr;
1594 addr.m_ip_v6_scope_id = m_sockaddr_union.m_ip_v6.sin6_scope_id;
1595 addr.m_is_ip_v6 = true;
1600 inline Endpoint::port_type Endpoint::port() const
1602 return ntohs(m_protocol.is_ip_v4() ? m_sockaddr_union.m_ip_v4.sin_port :
1603 m_sockaddr_union.m_ip_v6.sin6_port);
1606 inline Endpoint::data_type* Endpoint::data()
1608 return &m_sockaddr_union.m_base;
1611 inline const Endpoint::data_type* Endpoint::data() const
1613 return &m_sockaddr_union.m_base;
1616 inline Endpoint::Endpoint():
1617 Endpoint{StreamProtocol::ip_v4(), 0}
1621 inline Endpoint::Endpoint(const StreamProtocol& protocol, port_type port):
1622 m_protocol{protocol}
1624 int family = m_protocol.family();
1625 if (family == AF_INET) {
1626 m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
1627 m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
1628 m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1629 m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
1631 else if (family == AF_INET6) {
1632 m_sockaddr_union.m_ip_v6 = sockaddr_ip_v6_type(); // Clear
1633 m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
1634 m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
1637 m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
1638 m_sockaddr_union.m_ip_v4.sin_family = AF_UNSPEC;
1639 m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1640 m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
1644 inline Endpoint::Endpoint(const Address& addr, port_type port)
1646 if (addr.m_is_ip_v6) {
1647 m_protocol = StreamProtocol::ip_v6();
1648 m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
1649 m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
1650 m_sockaddr_union.m_ip_v6.sin6_flowinfo = 0;
1651 m_sockaddr_union.m_ip_v6.sin6_addr = addr.m_union.m_ip_v6;
1652 m_sockaddr_union.m_ip_v6.sin6_scope_id = addr.m_ip_v6_scope_id;
1655 m_protocol = StreamProtocol::ip_v4();
1656 m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
1657 m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1658 m_sockaddr_union.m_ip_v4.sin_addr = addr.m_union.m_ip_v4;
1662 inline Endpoint::List::iterator Endpoint::List::begin() const noexcept
1664 return m_endpoints.data();
1667 inline Endpoint::List::iterator Endpoint::List::end() const noexcept
1669 return m_endpoints.data() + m_endpoints.size();
1672 inline std::size_t Endpoint::List::size() const noexcept
1674 return m_endpoints.size();
1677 inline bool Endpoint::List::empty() const noexcept
1679 return m_endpoints.size() == 0;
1682 // ---------------- Service::OperQueue ----------------
1684 template<class Oper> inline bool Service::OperQueue<Oper>::empty() const noexcept
1689 template<class Oper> inline void Service::OperQueue<Oper>::push_back(LendersOperPtr op) noexcept
1691 REALM_ASSERT(!op->m_next);
1693 op->m_next = m_back->m_next;
1694 m_back->m_next = op.get();
1697 op->m_next = op.get();
1699 m_back = op.release();
1702 template<class Oper> template<class Oper2>
1703 inline void Service::OperQueue<Oper>::push_back(OperQueue<Oper2>& q) noexcept
1708 std::swap(m_back->m_next, q.m_back->m_next);
1713 template<class Oper> inline auto Service::OperQueue<Oper>::pop_front() noexcept -> LendersOperPtr
1717 op = static_cast<Oper*>(m_back->m_next);
1719 m_back->m_next = op->m_next;
1724 op->m_next = nullptr;
1726 return LendersOperPtr(op);
1729 template<class Oper> inline void Service::OperQueue<Oper>::clear() noexcept
1732 LendersOperPtr op(m_back);
1733 while (op->m_next != m_back)
1734 op.reset(static_cast<Oper*>(op->m_next));
1739 template<class Oper> inline Service::OperQueue<Oper>::OperQueue(OperQueue&& q) noexcept:
1745 template<class Oper> inline Service::OperQueue<Oper>::~OperQueue() noexcept
1750 // ---------------- Service::Descriptor ----------------
1752 inline Service::Descriptor::Descriptor(Impl& s) noexcept:
1757 inline Service::Descriptor::~Descriptor() noexcept
1763 inline void Service::Descriptor::assign(native_handle_type fd, bool in_blocking_mode) noexcept
1765 REALM_ASSERT(!is_open());
1767 m_in_blocking_mode = in_blocking_mode;
1768 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1769 m_read_ready = false;
1770 m_write_ready = false;
1771 m_imminent_end_of_input = false;
1772 m_is_registered = false;
1776 inline void Service::Descriptor::close() noexcept
1778 REALM_ASSERT(is_open());
1779 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1780 if (m_is_registered)
1781 deregister_for_async();
1782 m_is_registered = false;
1787 inline bool Service::Descriptor::is_open() const noexcept
1789 return (m_fd != -1);
1792 inline auto Service::Descriptor::native_handle() const noexcept -> native_handle_type
1797 inline bool Service::Descriptor::in_blocking_mode() const noexcept
1799 return m_in_blocking_mode;
1802 template<class Oper, class... Args>
1803 inline void Service::Descriptor::initiate_oper(std::unique_ptr<Oper, LendersOperDeleter> op,
1806 Service::Want want = op->initiate(std::forward<Args>(args)...); // Throws
1807 add_initiated_oper(std::move(op), want); // Throws
1810 inline void Service::Descriptor::ensure_blocking_mode()
1812 // Assuming that descriptors are either used mostly in blocking mode, or
1813 // mostly in nonblocking mode.
1814 if (REALM_UNLIKELY(!m_in_blocking_mode)) {
1816 set_nonblock_flag(value); // Throws
1817 m_in_blocking_mode = true;
1821 inline void Service::Descriptor::ensure_nonblocking_mode()
1823 // Assuming that descriptors are either used mostly in blocking mode, or
1824 // mostly in nonblocking mode.
1825 if (REALM_UNLIKELY(m_in_blocking_mode)) {
1827 set_nonblock_flag(value); // Throws
1828 m_in_blocking_mode = false;
1832 inline bool Service::Descriptor::assume_read_would_block() const noexcept
1834 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1835 return !m_in_blocking_mode && !m_read_ready;
1841 inline bool Service::Descriptor::assume_write_would_block() const noexcept
1843 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1844 return !m_in_blocking_mode && !m_write_ready;
1850 inline void Service::Descriptor::set_read_ready(bool value) noexcept
1852 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1853 m_read_ready = value;
1856 static_cast<void>(value);
1860 inline void Service::Descriptor::set_write_ready(bool value) noexcept
1862 #if REALM_HAVE_EPOLL || REALM_HAVE_KQUEUE
1863 m_write_ready = value;
1866 static_cast<void>(value);
1870 // ---------------- Service ----------------
1872 class Service::AsyncOper {
1874 bool in_use() const noexcept;
1875 bool is_complete() const noexcept;
1876 bool is_canceled() const noexcept;
1877 void cancel() noexcept;
1878 /// Every object of type \ref AsyncOper must be destroyed either by a call
1879 /// to this function or to recycle(). This function recycles the operation
1880 /// object (commits suicide), even if it throws.
1881 virtual void recycle_and_execute() = 0;
1882 /// Every object of type \ref AsyncOper must be destroyed either by a call
1883 /// to recycle_and_execute() or to this function. This function destroys the
1884 /// object (commits suicide).
1885 virtual void recycle() noexcept = 0;
1886 /// Must be called when the owner dies, and the object is in use (not an
1887 /// instance of UnusedOper).
1888 virtual void orphan() noexcept = 0;
1890 AsyncOper(std::size_t size, bool in_use) noexcept;
1891 virtual ~AsyncOper() noexcept {}
1892 void set_is_complete(bool value) noexcept;
1893 template<class H, class... Args>
1894 void do_recycle_and_execute(bool orphaned, H& handler, Args&&...);
1895 void do_recycle(bool orphaned) noexcept;
1897 std::size_t m_size; // Allocated number of bytes
1898 bool m_in_use = false;
1899 // Set to true when the operation completes successfully or fails. If the
1900 // operation is canceled before this happens, it will never be set to
1901 // true. Always false when not in use
1902 bool m_complete = false;
1903 // Set to true when the operation is canceled. Always false when not in use.
1904 bool m_canceled = false;
1905 AsyncOper* m_next = nullptr; // Always null when not in use
1906 template<class H, class... Args>
1907 void do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler, Args...);
1908 friend class Service;
1911 class Service::WaitOperBase: public AsyncOper {
1913 WaitOperBase(std::size_t size, DeadlineTimer& timer,
1914 clock::time_point expiration_time) noexcept:
1915 AsyncOper{size, true}, // Second argument is `in_use`
1917 m_expiration_time{expiration_time}
1920 void expired() noexcept
1922 set_is_complete(true);
1924 void recycle() noexcept override final
1926 bool orphaned = !m_timer;
1927 REALM_ASSERT(orphaned);
1928 // Note: do_recycle() commits suicide.
1929 do_recycle(orphaned);
1931 void orphan() noexcept override final
1936 DeadlineTimer* m_timer;
1937 clock::time_point m_expiration_time;
1938 friend class Service;
1941 class Service::TriggerExecOperBase: public AsyncOper, public AtomicRefCountBase {
1943 TriggerExecOperBase(Impl& service) noexcept:
1944 AsyncOper{0, false}, // First arg is `size` (unused), second arg is `in_use`
1948 void recycle() noexcept override final
1950 REALM_ASSERT(in_use());
1951 REALM_ASSERT(!m_service);
1952 // Note: Potential suicide when `self` goes out of scope
1953 util::bind_ptr<TriggerExecOperBase> self{this, bind_ptr_base::adopt_tag{}};
1955 void orphan() noexcept override final
1957 REALM_ASSERT(m_service);
1958 m_service = nullptr;
1960 void trigger() noexcept
1962 REALM_ASSERT(m_service);
1963 Service::trigger_exec(*m_service, *this);
1969 class Service::PostOperBase: public AsyncOper {
1971 PostOperBase(std::size_t size, Impl& service) noexcept:
1972 AsyncOper{size, true}, // Second argument is `in_use`
1976 void recycle() noexcept override final
1978 // Service::recycle_post_oper() destroys this operation object
1979 Service::recycle_post_oper(m_service, this);
1981 void orphan() noexcept override final
1983 REALM_ASSERT(false); // Never called
1989 template<class H> class Service::PostOper: public PostOperBase {
1991 PostOper(std::size_t size, Impl& service, H handler):
1992 PostOperBase{size, service},
1993 m_handler{std::move(handler)}
1996 void recycle_and_execute() override final
1998 // Recycle the operation object before the handler is exceuted, such
1999 // that the memory is available for a new post operation that might be
2000 // initiated during the execution of the handler.
2001 bool was_recycled = false;
2003 H handler = std::move(m_handler); // Throws
2004 // Service::recycle_post_oper() destroys this operation object
2005 Service::recycle_post_oper(m_service, this);
2006 was_recycled = true;
2007 handler(); // Throws
2010 if (!was_recycled) {
2011 // Service::recycle_post_oper() destroys this operation object
2012 Service::recycle_post_oper(m_service, this);
2021 class Service::IoOper: public AsyncOper {
2023 IoOper(std::size_t size) noexcept:
2024 AsyncOper{size, true} // Second argument is `in_use`
2027 virtual Descriptor& descriptor() noexcept = 0;
2028 /// Advance this operation and figure out out whether it needs to read from,
2029 /// or write to the underlying descriptor to advance further. This function
2030 /// must return Want::read if the operation needs to read, or Want::write if
2031 /// the operation needs to write to advance further. If the operation
2032 /// completes (due to success or failure), this function must return
2034 virtual Want advance() noexcept = 0;
2037 class Service::UnusedOper: public AsyncOper {
2039 UnusedOper(std::size_t size) noexcept:
2040 AsyncOper{size, false} // Second argument is `in_use`
2043 void recycle_and_execute() override final
2045 // Must never be called
2046 REALM_ASSERT(false);
2048 void recycle() noexcept override final
2050 // Must never be called
2051 REALM_ASSERT(false);
2053 void orphan() noexcept override final
2055 // Must never be called
2056 REALM_ASSERT(false);
2060 // `S` must be a stream class with the following member functions:
2062 // Socket& lowest_layer() noexcept;
2064 // void do_init_read_async(std::error_code& ec, Want& want) noexcept;
2065 // void do_init_write_async(std::error_code& ec, Want& want) noexcept;
2067 // std::size_t do_read_some_sync(char* buffer, std::size_t size,
2068 // std::error_code& ec) noexcept;
2069 // std::size_t do_write_some_sync(const char* data, std::size_t size,
2070 // std::error_code& ec) noexcept;
2071 // std::size_t do_read_some_async(char* buffer, std::size_t size,
2072 // std::error_code& ec, Want& want) noexcept;
2073 // std::size_t do_write_some_async(const char* data, std::size_t size,
2074 // std::error_code& ec, Want& want) noexcept;
2076 // If an error occurs during any of these 6 functions, the `ec` argument must be
2077 // set accordingly. Otherwise the `ec` argument must be set to
2078 // `std::error_code()`.
2080 // The do_init_*_async() functions must update the `want` argument to indicate
2081 // how the operation must be initiated:
2083 // Want::read Wait for read readiness, then call do_*_some_async().
2084 // Want::write Wait for write readiness, then call do_*_some_async().
2085 // Want::nothing Call do_*_some_async() immediately without waiting for
2086 // read or write readiness.
2088 // If end-of-input occurs while reading, do_read_some_*() must fail, set `ec` to
2089 // `network::end_of_input`, and return zero.
2091 // If an error occurs during reading or writing, do_*_some_sync() must set `ec`
2092 // accordingly (to something other than `std::system_error()`) and return
2093 // zero. Otherwise they must set `ec` to `std::system_error()` and return the
2094 // number of bytes read or written, which **must** be at least 1. If the
2095 // underlying socket is in nonblocking mode, and no bytes could be immediately
2096 // read or written these functions must fail with
2097 // `error::resource_unavailable_try_again`.
2099 // If an error occurs during reading or writing, do_*_some_async() must set `ec`
2100 // accordingly (to something other than `std::system_error()`), `want` to
2101 // `Want::nothing`, and return zero. Otherwise they must set `ec` to
2102 // `std::system_error()` and return the number of bytes read or written, which
2103 // must be zero if no bytes could be immediately read or written. Note, in this
2104 // case it is not an error if the underlying socket is in nonblocking mode, and
2105 // no bytes could be immediately read or written. When these functions succeed,
2106 // but return zero because no bytes could be immediately read or written, they
2107 // must set `want` to something other than `Want::nothing`.
2109 // If no error occurs, do_*_some_async() must set `want` to indicate how the
2110 // operation should proceed if additional data needs to be read or written, or
2111 // if no bytes were transferred:
2113 // Want::read Wait for read readiness, then call do_*_some_async() again.
2114 // Want::write Wait for write readiness, then call do_*_some_async() again.
2115 // Want::nothing Call do_*_some_async() again without waiting for read or
2118 // NOTE: If, for example, do_read_some_async() sets `want` to `Want::write`, it
2119 // means that the stream needs to write data to the underlying TCP socket before
2120 // it is able to deliver any additional data to the caller. While such a
2121 // situation will never occur on a raw TCP socket, it can occur on an SSL stream
2122 // (Secure Socket Layer).
2124 // When do_*_some_async() returns `n`, at least one of the following conditions
2127 // n > 0 Bytes were transferred.
2128 // ec != std::error_code() An error occured.
2129 // want != Want::nothing Wait for read/write readiness.
2131 // This is of critical importance, as it is the only way we can avoid falling
2132 // into a busy loop of repeated invocations of do_*_some_async().
2134 // NOTE: do_*_some_async() are allowed to set `want` to `Want::read` or
2135 // `Want::write`, even when they succesfully transfer a nonzero number of bytes.
2136 template<class S> class Service::BasicStreamOps {
2140 class WriteOperBase;
2141 class BufferedReadOperBase;
2142 template<class H> class ReadOper;
2143 template<class H> class WriteOper;
2144 template<class H> class BufferedReadOper;
2146 using LendersReadOperPtr = std::unique_ptr<ReadOperBase, LendersOperDeleter>;
2147 using LendersWriteOperPtr = std::unique_ptr<WriteOperBase, LendersOperDeleter>;
2148 using LendersBufferedReadOperPtr = std::unique_ptr<BufferedReadOperBase, LendersOperDeleter>;
2151 static std::size_t read(S& stream, char* buffer, std::size_t size,
2152 std::error_code& ec)
2154 REALM_ASSERT(!stream.lowest_layer().m_read_oper ||
2155 !stream.lowest_layer().m_read_oper->in_use());
2156 stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2157 char* begin = buffer;
2158 char* end = buffer + size;
2162 ec = std::error_code(); // Success
2165 char* buffer_2 = curr;
2166 std::size_t size_2 = std::size_t(end - curr);
2167 std::size_t n = stream.do_read_some_sync(buffer_2, size_2, ec);
2168 if (REALM_UNLIKELY(ec))
2170 REALM_ASSERT(n > 0);
2171 REALM_ASSERT(n <= size_2);
2174 std::size_t n = std::size_t(curr - begin);
2178 // Synchronous write
2179 static std::size_t write(S& stream, const char* data, std::size_t size,
2180 std::error_code& ec)
2182 REALM_ASSERT(!stream.lowest_layer().m_write_oper ||
2183 !stream.lowest_layer().m_write_oper->in_use());
2184 stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2185 const char* begin = data;
2186 const char* end = data + size;
2187 const char* curr = begin;
2190 ec = std::error_code(); // Success
2193 const char* data_2 = curr;
2194 std::size_t size_2 = std::size_t(end - curr);
2195 std::size_t n = stream.do_write_some_sync(data_2, size_2, ec);
2196 if (REALM_UNLIKELY(ec))
2198 REALM_ASSERT(n > 0);
2199 REALM_ASSERT(n <= size_2);
2202 std::size_t n = std::size_t(curr - begin);
2207 static std::size_t buffered_read(S& stream, char* buffer, std::size_t size, int delim,
2208 ReadAheadBuffer& rab, std::error_code& ec)
2210 REALM_ASSERT(!stream.lowest_layer().m_read_oper ||
2211 !stream.lowest_layer().m_read_oper->in_use());
2212 stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2213 char* begin = buffer;
2214 char* end = buffer + size;
2217 bool complete = rab.read(curr, end, delim, ec);
2221 rab.refill_sync(stream, ec);
2222 if (REALM_UNLIKELY(ec))
2225 std::size_t n = (curr - begin);
2230 static std::size_t read_some(S& stream, char* buffer, std::size_t size,
2231 std::error_code& ec)
2233 REALM_ASSERT(!stream.lowest_layer().m_read_oper ||
2234 !stream.lowest_layer().m_read_oper->in_use());
2235 stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2236 return stream.do_read_some_sync(buffer, size, ec);
2239 // Synchronous write
2240 static std::size_t write_some(S& stream, const char* data, std::size_t size,
2241 std::error_code& ec)
2243 REALM_ASSERT(!stream.lowest_layer().m_write_oper ||
2244 !stream.lowest_layer().m_write_oper->in_use());
2245 stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
2246 return stream.do_write_some_sync(data, size, ec);
2250 static void async_read(S& stream, char* buffer, std::size_t size, bool is_read_some, H handler)
2252 char* begin = buffer;
2253 char* end = buffer + size;
2254 LendersReadOperPtr op =
2255 Service::alloc<ReadOper<H>>(stream.lowest_layer().m_read_oper, stream, is_read_some,
2256 begin, end, std::move(handler)); // Throws
2257 stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws
2261 static void async_write(S& stream, const char* data, std::size_t size, bool is_write_some,
2264 const char* begin = data;
2265 const char* end = data + size;
2266 LendersWriteOperPtr op =
2267 Service::alloc<WriteOper<H>>(stream.lowest_layer().m_write_oper, stream, is_write_some,
2268 begin, end, std::move(handler)); // Throws
2269 stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws
2273 static void async_buffered_read(S& stream, char* buffer, std::size_t size, int delim,
2274 ReadAheadBuffer& rab, H handler)
2276 char* begin = buffer;
2277 char* end = buffer + size;
2278 LendersBufferedReadOperPtr op =
2279 Service::alloc<BufferedReadOper<H>>(stream.lowest_layer().m_read_oper, stream,
2280 begin, end, delim, rab,
2281 std::move(handler)); // Throws
2282 stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws
2286 template<class S> class Service::BasicStreamOps<S>::StreamOper: public IoOper {
2288 StreamOper(std::size_t size, S& stream) noexcept:
2293 void recycle() noexcept override final
2295 bool orphaned = !m_stream;
2296 REALM_ASSERT(orphaned);
2297 // Note: do_recycle() commits suicide.
2298 do_recycle(orphaned);
2300 void orphan() noexcept override final
2304 Descriptor& descriptor() noexcept override final
2306 return m_stream->lowest_layer().m_desc;
2310 std::error_code m_error_code;
2313 template<class S> class Service::BasicStreamOps<S>::ReadOperBase: public StreamOper {
2315 ReadOperBase(std::size_t size, S& stream, bool is_read_some, char* begin, char* end) noexcept:
2316 StreamOper{size, stream},
2317 m_is_read_some{is_read_some},
2325 REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
2326 REALM_ASSERT(!s.is_complete());
2327 REALM_ASSERT(s.m_curr <= s.m_end);
2328 Want want = Want::nothing;
2329 if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
2330 s.set_is_complete(true); // Success
2333 s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
2334 s.m_stream->do_init_read_async(s.m_error_code, want);
2335 if (want == Want::nothing) {
2336 if (REALM_UNLIKELY(s.m_error_code)) {
2337 s.set_is_complete(true); // Failure
2346 Want advance() noexcept override final
2349 REALM_ASSERT(!s.is_complete());
2350 REALM_ASSERT(!s.is_canceled());
2351 REALM_ASSERT(!s.m_error_code);
2352 REALM_ASSERT(s.m_curr < s.m_end);
2353 REALM_ASSERT(!s.m_is_read_some || s.m_curr == m_begin);
2355 // Read into callers buffer
2356 char* buffer = s.m_curr;
2357 std::size_t size = std::size_t(s.m_end - s.m_curr);
2358 Want want = Want::nothing;
2359 std::size_t n = s.m_stream->do_read_some_async(buffer, size, s.m_error_code, want);
2360 REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
2361 bool got_nothing = (n == 0);
2363 if (REALM_UNLIKELY(s.m_error_code)) {
2364 s.set_is_complete(true); // Failure
2365 return Want::nothing;
2367 // Got nothing, but want something
2370 REALM_ASSERT(!s.m_error_code);
2371 // Check for completion
2372 REALM_ASSERT(n <= size);
2374 if (s.m_is_read_some || s.m_curr == s.m_end) {
2375 s.set_is_complete(true); // Success
2376 return Want::nothing;
2378 if (want != Want::nothing)
2380 REALM_ASSERT(n < size);
2384 const bool m_is_read_some;
2385 char* const m_begin; // May be dangling after cancellation
2386 char* const m_end; // May be dangling after cancellation
2387 char* m_curr = m_begin; // May be dangling after cancellation
2390 template<class S> class Service::BasicStreamOps<S>::WriteOperBase: public StreamOper {
2392 WriteOperBase(std::size_t size, S& stream, bool is_write_some,
2393 const char* begin, const char* end) noexcept:
2394 StreamOper{size, stream},
2395 m_is_write_some{is_write_some},
2403 REALM_ASSERT(this == s.m_stream->lowest_layer().m_write_oper.get());
2404 REALM_ASSERT(!s.is_complete());
2405 REALM_ASSERT(s.m_curr <= s.m_end);
2406 Want want = Want::nothing;
2407 if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
2408 s.set_is_complete(true); // Success
2411 s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
2412 s.m_stream->do_init_write_async(s.m_error_code, want);
2413 if (want == Want::nothing) {
2414 if (REALM_UNLIKELY(s.m_error_code)) {
2415 s.set_is_complete(true); // Failure
2424 Want advance() noexcept override final
2427 REALM_ASSERT(!s.is_complete());
2428 REALM_ASSERT(!s.is_canceled());
2429 REALM_ASSERT(!s.m_error_code);
2430 REALM_ASSERT(s.m_curr < s.m_end);
2431 REALM_ASSERT(!s.m_is_write_some || s.m_curr == s.m_begin);
2433 // Write from callers buffer
2434 const char* data = s.m_curr;
2435 std::size_t size = std::size_t(s.m_end - s.m_curr);
2436 Want want = Want::nothing;
2437 std::size_t n = s.m_stream->do_write_some_async(data, size, s.m_error_code, want);
2438 REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
2439 bool wrote_nothing = (n == 0);
2440 if (wrote_nothing) {
2441 if (REALM_UNLIKELY(s.m_error_code)) {
2442 s.set_is_complete(true); // Failure
2443 return Want::nothing;
2445 // Wrote nothing, but want something written
2448 REALM_ASSERT(!s.m_error_code);
2449 // Check for completion
2450 REALM_ASSERT(n <= size);
2452 if (s.m_is_write_some || s.m_curr == s.m_end) {
2453 s.set_is_complete(true); // Success
2454 return Want::nothing;
2456 if (want != Want::nothing)
2458 REALM_ASSERT(n < size);
2462 const bool m_is_write_some;
2463 const char* const m_begin; // May be dangling after cancellation
2464 const char* const m_end; // May be dangling after cancellation
2465 const char* m_curr = m_begin; // May be dangling after cancellation
2468 template<class S> class Service::BasicStreamOps<S>::BufferedReadOperBase: public StreamOper {
2470 BufferedReadOperBase(std::size_t size, S& stream, char* begin, char* end, int delim,
2471 ReadAheadBuffer& rab) noexcept:
2472 StreamOper{size, stream},
2473 m_read_ahead_buffer{rab},
2482 REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
2483 REALM_ASSERT(!s.is_complete());
2484 Want want = Want::nothing;
2485 bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
2487 s.set_is_complete(true); // Success or failure
2490 s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
2491 s.m_stream->do_init_read_async(s.m_error_code, want);
2492 if (want == Want::nothing) {
2493 if (REALM_UNLIKELY(s.m_error_code)) {
2494 s.set_is_complete(true); // Failure
2503 Want advance() noexcept override final
2506 REALM_ASSERT(!s.is_complete());
2507 REALM_ASSERT(!s.is_canceled());
2508 REALM_ASSERT(!s.m_error_code);
2509 REALM_ASSERT(s.m_read_ahead_buffer.empty());
2510 REALM_ASSERT(s.m_curr < s.m_end);
2512 // Fill read-ahead buffer from stream (is empty now)
2513 Want want = Want::nothing;
2514 bool nonempty = s.m_read_ahead_buffer.refill_async(*s.m_stream, s.m_error_code, want);
2515 REALM_ASSERT(nonempty || s.m_error_code ||
2516 want != Want::nothing); // No busy loop, please
2517 bool got_nothing = !nonempty;
2519 if (REALM_UNLIKELY(s.m_error_code)) {
2520 s.set_is_complete(true); // Failure
2521 return Want::nothing;
2523 // Got nothing, but want something
2526 // Transfer buffered data to callers buffer
2528 s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
2530 s.set_is_complete(true); // Success or failure (delim_not_found)
2531 return Want::nothing;
2533 if (want != Want::nothing)
2538 ReadAheadBuffer& m_read_ahead_buffer; // May be dangling after cancellation
2539 char* const m_begin; // May be dangling after cancellation
2540 char* const m_end; // May be dangling after cancellation
2541 char* m_curr = m_begin; // May be dangling after cancellation
2545 template<class S> template<class H>
2546 class Service::BasicStreamOps<S>::ReadOper: public ReadOperBase {
2548 ReadOper(std::size_t size, S& stream, bool is_read_some, char* begin, char* end, H handler):
2549 ReadOperBase{size, stream, is_read_some, begin, end},
2550 m_handler{std::move(handler)}
2553 void recycle_and_execute() override final
2556 REALM_ASSERT(s.is_complete() || s.is_canceled());
2557 REALM_ASSERT(s.is_complete() == (s.m_error_code || s.m_curr == s.m_end ||
2558 (s.m_is_read_some && s.m_curr != s.m_begin)));
2559 REALM_ASSERT(s.m_curr >= s.m_begin);
2560 bool orphaned = !s.m_stream;
2561 std::error_code ec = s.m_error_code;
2562 if (s.is_canceled())
2563 ec = error::operation_aborted;
2564 std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
2565 // Note: do_recycle_and_execute() commits suicide.
2566 s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
2567 num_bytes_transferred); // Throws
2573 template<class S> template<class H>
2574 class Service::BasicStreamOps<S>::WriteOper: public WriteOperBase {
2576 WriteOper(std::size_t size, S& stream, bool is_write_some,
2577 const char* begin, const char* end, H handler):
2578 WriteOperBase{size, stream, is_write_some, begin, end},
2579 m_handler{std::move(handler)}
2582 void recycle_and_execute() override final
2585 REALM_ASSERT(s.is_complete() || s.is_canceled());
2586 REALM_ASSERT(s.is_complete() == (s.m_error_code || s.m_curr == s.m_end ||
2587 (s.m_is_write_some && s.m_curr != s.m_begin)));
2588 REALM_ASSERT(s.m_curr >= s.m_begin);
2589 bool orphaned = !s.m_stream;
2590 std::error_code ec = s.m_error_code;
2591 if (s.is_canceled())
2592 ec = error::operation_aborted;
2593 std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
2594 // Note: do_recycle_and_execute() commits suicide.
2595 s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
2596 num_bytes_transferred); // Throws
2602 template<class S> template<class H>
2603 class Service::BasicStreamOps<S>::BufferedReadOper: public BufferedReadOperBase {
2605 BufferedReadOper(std::size_t size, S& stream, char* begin, char* end, int delim,
2606 ReadAheadBuffer& rab, H handler):
2607 BufferedReadOperBase{size, stream, begin, end, delim, rab},
2608 m_handler{std::move(handler)}
2611 void recycle_and_execute() override final
2614 REALM_ASSERT(s.is_complete() || (s.is_canceled() && !s.m_error_code));
2615 REALM_ASSERT(s.is_canceled() || s.m_error_code ||
2616 (s.m_delim != std::char_traits<char>::eof() ?
2617 s.m_curr > s.m_begin && s.m_curr[-1] ==
2618 std::char_traits<char>::to_char_type(s.m_delim) :
2619 s.m_curr == s.m_end));
2620 REALM_ASSERT(s.m_curr >= s.m_begin);
2621 bool orphaned = !s.m_stream;
2622 std::error_code ec = s.m_error_code;
2623 if (s.is_canceled())
2624 ec = error::operation_aborted;
2625 std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
2626 // Note: do_recycle_and_execute() commits suicide.
2627 s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
2628 num_bytes_transferred); // Throws
2634 template<class H> inline void Service::post(H handler)
2636 do_post(&Service::post_oper_constr<H>, sizeof (PostOper<H>), &handler);
2639 inline void Service::OwnersOperDeleter::operator()(AsyncOper* op) const noexcept
2647 delete[] static_cast<char*>(addr);
2651 inline void Service::LendersOperDeleter::operator()(AsyncOper* op) const noexcept
2653 op->recycle(); // Suicide
2656 template<class Oper, class... Args> std::unique_ptr<Oper, Service::LendersOperDeleter>
2657 Service::alloc(OwnersOperPtr& owners_ptr, Args&&... args)
2659 void* addr = owners_ptr.get();
2661 if (REALM_LIKELY(addr)) {
2662 REALM_ASSERT(!owners_ptr->in_use());
2663 size = owners_ptr->m_size;
2664 // We can use static dispatch in the destructor call here, since an
2665 // object, that is not in use, is always an instance of UnusedOper.
2666 REALM_ASSERT(dynamic_cast<UnusedOper*>(owners_ptr.get()));
2667 static_cast<UnusedOper*>(owners_ptr.get())->UnusedOper::~UnusedOper();
2668 if (REALM_UNLIKELY(size < sizeof (Oper))) {
2669 owners_ptr.release();
2670 delete[] static_cast<char*>(addr);
2676 addr = new char[sizeof (Oper)]; // Throws
2677 size = sizeof (Oper);
2678 owners_ptr.reset(static_cast<AsyncOper*>(addr));
2680 std::unique_ptr<Oper, LendersOperDeleter> lenders_ptr;
2682 lenders_ptr.reset(new (addr) Oper(size, std::forward<Args>(args)...)); // Throws
2685 new (addr) UnusedOper(size); // Does not throw
2691 template<class Oper>
2692 inline void Service::execute(std::unique_ptr<Oper, LendersOperDeleter>& lenders_ptr)
2694 lenders_ptr.release()->recycle_and_execute(); // Throws
2697 template<class H> inline Service::PostOperBase*
2698 Service::post_oper_constr(void* addr, std::size_t size, Impl& service, void* cookie)
2700 H& handler = *static_cast<H*>(cookie);
2701 return new (addr) PostOper<H>(size, service, std::move(handler)); // Throws
2704 inline bool Service::AsyncOper::in_use() const noexcept
2709 inline bool Service::AsyncOper::is_complete() const noexcept
2714 inline void Service::AsyncOper::cancel() noexcept
2716 REALM_ASSERT(m_in_use);
2717 REALM_ASSERT(!m_canceled);
2721 inline Service::AsyncOper::AsyncOper(std::size_t size, bool is_in_use) noexcept:
2727 inline bool Service::AsyncOper::is_canceled() const noexcept
2732 inline void Service::AsyncOper::set_is_complete(bool value) noexcept
2734 REALM_ASSERT(!m_complete);
2735 REALM_ASSERT(!value || m_in_use);
2739 template<class H, class... Args>
2740 inline void Service::AsyncOper::do_recycle_and_execute(bool orphaned, H& handler, Args&&... args)
2742 // Recycle the operation object before the handler is exceuted, such that
2743 // the memory is available for a new post operation that might be initiated
2744 // during the execution of the handler.
2745 bool was_recycled = false;
2747 // We need to copy or move all arguments to be passed to the handler,
2748 // such that there is no risk of references to the recycled operation
2749 // object being passed to the handler (the passed arguments may be
2750 // references to members of the recycled operation object). The easiest
2751 // way to achive this, is by forwarding the reference arguments (passed
2752 // to this function) to a helper function whose arguments have
2753 // nonreference type (`Args...` rather than `Args&&...`).
2755 // Note that the copying and moving of arguments may throw, and it is
2756 // important that the operation is still recycled even if that
2757 // happens. For that reason, copying and moving of arguments must not
2758 // happen until we are in a scope (this scope) that catches and deals
2759 // correctly with such exceptions.
2760 do_recycle_and_execute_helper(orphaned, was_recycled, std::move(handler),
2761 std::forward<Args>(args)...); // Throws
2765 do_recycle(orphaned);
2770 template<class H, class... Args>
2771 inline void Service::AsyncOper::do_recycle_and_execute_helper(bool orphaned, bool& was_recycled,
2772 H handler, Args... args)
2774 do_recycle(orphaned);
2775 was_recycled = true;
2776 handler(std::move(args)...); // Throws
2779 inline void Service::AsyncOper::do_recycle(bool orphaned) noexcept
2781 REALM_ASSERT(in_use());
2783 std::size_t size = m_size;
2784 this->~AsyncOper(); // Suicide
2786 delete[] static_cast<char*>(addr);
2789 new (addr) UnusedOper(size);
2793 // ---------------- Resolver ----------------
2795 class Resolver::ResolveOperBase: public Service::AsyncOper {
2797 ResolveOperBase(std::size_t size, Resolver& r, Query q) noexcept:
2798 AsyncOper{size, true},
2800 m_query{std::move(q)}
2805 // FIXME: Temporary hack until we get a true asynchronous resolver
2806 m_endpoints = m_resolver->resolve(std::move(m_query), m_error_code); // Throws
2807 set_is_complete(true);
2809 void recycle() noexcept override final
2811 bool orphaned = !m_resolver;
2812 REALM_ASSERT(orphaned);
2813 // Note: do_recycle() commits suicide.
2814 do_recycle(orphaned);
2816 void orphan() noexcept override final
2818 m_resolver = nullptr;
2821 Resolver* m_resolver;
2823 Endpoint::List m_endpoints;
2824 std::error_code m_error_code;
2827 template<class H> class Resolver::ResolveOper: public ResolveOperBase {
2829 ResolveOper(std::size_t size, Resolver& r, Query q, H handler):
2830 ResolveOperBase{size, r, std::move(q)},
2831 m_handler{std::move(handler)}
2834 void recycle_and_execute() override final
2836 REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
2837 REALM_ASSERT(is_canceled() || m_error_code || !m_endpoints.empty());
2838 bool orphaned = !m_resolver;
2839 std::error_code ec = m_error_code;
2841 ec = error::operation_aborted;
2842 // Note: do_recycle_and_execute() commits suicide.
2843 do_recycle_and_execute<H>(orphaned, m_handler, ec, std::move(m_endpoints)); // Throws
2849 inline Resolver::Resolver(Service& service):
2850 m_service_impl{*service.m_impl}
2854 inline Resolver::~Resolver() noexcept
2859 inline Endpoint::List Resolver::resolve(const Query& q)
2862 Endpoint::List list = resolve(q, ec);
2863 if (REALM_UNLIKELY(ec))
2864 throw std::system_error(ec);
2868 template<class H> void Resolver::async_resolve(Query query, H handler)
2870 LendersResolveOperPtr op = Service::alloc<ResolveOper<H>>(m_resolve_oper, *this,
2872 std::move(handler)); // Throws
2873 initiate_oper(std::move(op)); // Throws
2876 inline Resolver::Query::Query(std::string service_port, int init_flags):
2877 m_flags{init_flags},
2878 m_service{service_port}
2882 inline Resolver::Query::Query(const StreamProtocol& prot, std::string service_port,
2884 m_flags{init_flags},
2886 m_service{service_port}
2890 inline Resolver::Query::Query(std::string host_name, std::string service_port, int init_flags):
2891 m_flags{init_flags},
2893 m_service{service_port}
2897 inline Resolver::Query::Query(const StreamProtocol& prot, std::string host_name,
2898 std::string service_port, int init_flags):
2899 m_flags{init_flags},
2902 m_service{service_port}
2906 inline Resolver::Query::~Query() noexcept
2910 inline int Resolver::Query::flags() const
2915 inline StreamProtocol Resolver::Query::protocol() const
2920 inline std::string Resolver::Query::host() const
2925 inline std::string Resolver::Query::service() const
2930 // ---------------- SocketBase ----------------
2932 inline SocketBase::SocketBase(Service& service):
2933 m_desc{*service.m_impl}
2937 inline SocketBase::~SocketBase() noexcept
2942 inline bool SocketBase::is_open() const noexcept
2944 return m_desc.is_open();
2947 inline auto SocketBase::native_handle() const noexcept -> native_handle_type
2949 return m_desc.native_handle();
2952 inline void SocketBase::open(const StreamProtocol& prot)
2956 throw std::system_error(ec);
2959 inline void SocketBase::close() noexcept
2968 inline void SocketBase::get_option(O& opt) const
2971 if (get_option(opt, ec))
2972 throw std::system_error(ec);
2976 inline std::error_code SocketBase::get_option(O& opt, std::error_code& ec) const
2983 inline void SocketBase::set_option(const O& opt)
2986 if (set_option(opt, ec))
2987 throw std::system_error(ec);
2991 inline std::error_code SocketBase::set_option(const O& opt, std::error_code& ec)
2997 inline void SocketBase::bind(const Endpoint& ep)
3001 throw std::system_error(ec);
3004 inline Endpoint SocketBase::local_endpoint() const
3007 Endpoint ep = local_endpoint(ec);
3009 throw std::system_error(ec);
3013 inline const StreamProtocol& SocketBase::get_protocol() const noexcept
3018 template<class T, int opt, class U>
3019 inline SocketBase::Option<T, opt, U>::Option(T init_value):
3024 template<class T, int opt, class U>
3025 inline T SocketBase::Option<T, opt, U>::value() const
3030 template<class T, int opt, class U>
3031 inline void SocketBase::Option<T, opt, U>::get(const SocketBase& sock, std::error_code& ec)
3035 char strut[sizeof (U) + 1];
3037 std::size_t value_size = sizeof strut;
3038 sock.get_option(opt_enum(opt), &value, value_size, ec);
3040 REALM_ASSERT(value_size == sizeof value);
3045 template<class T, int opt, class U>
3046 inline void SocketBase::Option<T, opt, U>::set(SocketBase& sock, std::error_code& ec) const
3048 U value_to_set = U(m_value);
3049 sock.set_option(opt_enum(opt), &value_to_set, sizeof value_to_set, ec);
3052 // ---------------- Socket ----------------
3054 class Socket::ConnectOperBase: public Service::IoOper {
3056 ConnectOperBase(std::size_t size, Socket& sock) noexcept:
3061 Want initiate(const Endpoint& ep)
3063 REALM_ASSERT(this == m_socket->m_write_oper.get());
3064 if (m_socket->initiate_async_connect(ep, m_error_code)) { // Throws
3065 set_is_complete(true); // Failure, or immediate completion
3066 return Want::nothing;
3070 Want advance() noexcept override final
3072 REALM_ASSERT(!is_complete());
3073 REALM_ASSERT(!is_canceled());
3074 REALM_ASSERT(!m_error_code);
3075 m_socket->finalize_async_connect(m_error_code);
3076 set_is_complete(true);
3077 return Want::nothing;
3079 void recycle() noexcept override final
3081 bool orphaned = !m_socket;
3082 REALM_ASSERT(orphaned);
3083 // Note: do_recycle() commits suicide.
3084 do_recycle(orphaned);
3086 void orphan() noexcept override final
3090 Service::Descriptor& descriptor() noexcept override final
3092 return m_socket->m_desc;
3096 std::error_code m_error_code;
3099 template<class H> class Socket::ConnectOper: public ConnectOperBase {
3101 ConnectOper(std::size_t size, Socket& sock, H handler):
3102 ConnectOperBase{size, sock},
3103 m_handler{std::move(handler)}
3106 void recycle_and_execute() override final
3108 REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3109 bool orphaned = !m_socket;
3110 std::error_code ec = m_error_code;
3112 ec = error::operation_aborted;
3113 // Note: do_recycle_and_execute() commits suicide.
3114 do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3120 inline Socket::Socket(Service& service):
3125 inline Socket::Socket(Service& service, const StreamProtocol& prot,
3126 native_handle_type native_socket):
3129 assign(prot, native_socket); // Throws
3132 inline Socket::~Socket() noexcept
3136 inline void Socket::connect(const Endpoint& ep)
3139 if (connect(ep, ec)) // Throws
3140 throw std::system_error(ec);
3143 inline std::size_t Socket::read(char* buffer, std::size_t size)
3146 read(buffer, size, ec); // Throws
3148 throw std::system_error(ec);
3152 inline std::size_t Socket::read(char* buffer, std::size_t size, std::error_code& ec)
3154 return StreamOps::read(*this, buffer, size, ec); // Throws
3157 inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab)
3160 read(buffer, size, rab, ec); // Throws
3162 throw std::system_error(ec);
3166 inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab,
3167 std::error_code& ec)
3169 int delim = std::char_traits<char>::eof();
3170 return StreamOps::buffered_read(*this, buffer, size, delim, rab, ec); // Throws
3173 inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim,
3174 ReadAheadBuffer& rab)
3177 std::size_t n = read_until(buffer, size, delim, rab, ec); // Throws
3179 throw std::system_error(ec);
3183 inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim,
3184 ReadAheadBuffer& rab, std::error_code& ec)
3186 int delim_2 = std::char_traits<char>::to_int_type(delim);
3187 return StreamOps::buffered_read(*this, buffer, size, delim_2, rab, ec); // Throws
3190 inline std::size_t Socket::write(const char* data, std::size_t size)
3193 write(data, size, ec); // Throws
3195 throw std::system_error(ec);
3199 inline std::size_t Socket::write(const char* data, std::size_t size, std::error_code& ec)
3201 return StreamOps::write(*this, data, size, ec); // Throws
3204 inline std::size_t Socket::read_some(char* buffer, std::size_t size)
3207 std::size_t n = read_some(buffer, size, ec); // Throws
3209 throw std::system_error(ec);
3213 inline std::size_t Socket::read_some(char* buffer, std::size_t size, std::error_code& ec)
3215 return StreamOps::read_some(*this, buffer, size, ec); // Throws
3218 inline std::size_t Socket::write_some(const char* data, std::size_t size)
3221 std::size_t n = write_some(data, size, ec); // Throws
3223 throw std::system_error(ec);
3227 inline std::size_t Socket::write_some(const char* data, std::size_t size, std::error_code& ec)
3229 return StreamOps::write_some(*this, data, size, ec); // Throws
3232 template<class H> inline void Socket::async_connect(const Endpoint& ep, H handler)
3234 LendersConnectOperPtr op =
3235 Service::alloc<ConnectOper<H>>(m_write_oper, *this, std::move(handler)); // Throws
3236 m_desc.initiate_oper(std::move(op), ep); // Throws
3239 template<class H> inline void Socket::async_read(char* buffer, std::size_t size, H handler)
3241 bool is_read_some = false;
3242 StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
3246 inline void Socket::async_read(char* buffer, std::size_t size, ReadAheadBuffer& rab, H handler)
3248 int delim = std::char_traits<char>::eof();
3249 StreamOps::async_buffered_read(*this, buffer, size, delim, rab, std::move(handler)); // Throws
3253 inline void Socket::async_read_until(char* buffer, std::size_t size, char delim,
3254 ReadAheadBuffer& rab, H handler)
3256 int delim_2 = std::char_traits<char>::to_int_type(delim);
3257 StreamOps::async_buffered_read(*this, buffer, size, delim_2, rab, std::move(handler)); // Throws
3260 template<class H> inline void Socket::async_write(const char* data, std::size_t size, H handler)
3262 bool is_write_some = false;
3263 StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
3266 template<class H> inline void Socket::async_read_some(char* buffer, std::size_t size, H handler)
3268 bool is_read_some = true;
3269 StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
3273 inline void Socket::async_write_some(const char* data, std::size_t size, H handler)
3275 bool is_write_some = true;
3276 StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
3279 inline void Socket::shutdown(shutdown_type what)
3282 if (shutdown(what, ec)) // Throws
3283 throw std::system_error(ec);
3286 inline void Socket::assign(const StreamProtocol& prot, native_handle_type native_socket)
3289 if (assign(prot, native_socket, ec)) // Throws
3290 throw std::system_error(ec);
3293 inline std::error_code Socket::assign(const StreamProtocol& prot,
3294 native_handle_type native_socket, std::error_code& ec)
3296 return do_assign(prot, native_socket, ec); // Throws
3299 inline Socket& Socket::lowest_layer() noexcept
3304 inline void Socket::do_init_read_async(std::error_code&, Want& want) noexcept
3306 want = Want::read; // Wait for read readiness before proceeding
3309 inline void Socket::do_init_write_async(std::error_code&, Want& want) noexcept
3311 want = Want::write; // Wait for write readiness before proceeding
3314 inline std::size_t Socket::do_read_some_sync(char* buffer, std::size_t size,
3315 std::error_code& ec) noexcept
3317 return m_desc.read_some(buffer, size, ec);
3320 inline std::size_t Socket::do_write_some_sync(const char* data, std::size_t size,
3321 std::error_code& ec) noexcept
3323 return m_desc.write_some(data, size, ec);
3326 inline std::size_t Socket::do_read_some_async(char* buffer, std::size_t size,
3327 std::error_code& ec, Want& want) noexcept
3329 std::error_code ec_2;
3330 std::size_t n = m_desc.read_some(buffer, size, ec_2);
3331 bool success = (!ec_2 || ec_2 == error::resource_unavailable_try_again);
3332 if (REALM_UNLIKELY(!success)) {
3334 want = Want::nothing; // Failure
3337 ec = std::error_code();
3338 want = Want::read; // Success
3342 inline std::size_t Socket::do_write_some_async(const char* data, std::size_t size,
3343 std::error_code& ec, Want& want) noexcept
3345 std::error_code ec_2;
3346 std::size_t n = m_desc.write_some(data, size, ec_2);
3347 bool success = (!ec_2 || ec_2 == error::resource_unavailable_try_again);
3348 if (REALM_UNLIKELY(!success)) {
3350 want = Want::nothing; // Failure
3353 ec = std::error_code();
3354 want = Want::write; // Success
3358 // ---------------- Acceptor ----------------
3360 class Acceptor::AcceptOperBase: public Service::IoOper {
3362 AcceptOperBase(std::size_t size, Acceptor& a, Socket& s, Endpoint* e):
3371 REALM_ASSERT(this == m_acceptor->m_read_oper.get());
3372 REALM_ASSERT(!is_complete());
3373 m_acceptor->m_desc.ensure_nonblocking_mode(); // Throws
3376 Want advance() noexcept override final
3378 REALM_ASSERT(!is_complete());
3379 REALM_ASSERT(!is_canceled());
3380 REALM_ASSERT(!m_error_code);
3381 REALM_ASSERT(!m_socket.is_open());
3382 Want want = m_acceptor->do_accept_async(m_socket, m_endpoint, m_error_code);
3383 if (want == Want::nothing)
3384 set_is_complete(true); // Success or failure
3387 void recycle() noexcept override final
3389 bool orphaned = !m_acceptor;
3390 REALM_ASSERT(orphaned);
3391 // Note: do_recycle() commits suicide.
3392 do_recycle(orphaned);
3394 void orphan() noexcept override final
3396 m_acceptor = nullptr;
3398 Service::Descriptor& descriptor() noexcept override final
3400 return m_acceptor->m_desc;
3403 Acceptor* m_acceptor;
3404 Socket& m_socket; // May be dangling after cancellation
3405 Endpoint* const m_endpoint; // May be dangling after cancellation
3406 std::error_code m_error_code;
3409 template<class H> class Acceptor::AcceptOper: public AcceptOperBase {
3411 AcceptOper(std::size_t size, Acceptor& a, Socket& s, Endpoint* e, H handler):
3412 AcceptOperBase{size, a, s, e},
3413 m_handler{std::move(handler)}
3416 void recycle_and_execute() override final
3418 REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3419 REALM_ASSERT(is_canceled() || m_error_code || m_socket.is_open());
3420 bool orphaned = !m_acceptor;
3421 std::error_code ec = m_error_code;
3423 ec = error::operation_aborted;
3424 // Note: do_recycle_and_execute() commits suicide.
3425 do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3431 inline Acceptor::Acceptor(Service& service):
3436 inline Acceptor::~Acceptor() noexcept
3440 inline void Acceptor::listen(int backlog)
3443 if (listen(backlog, ec)) // Throws
3444 throw std::system_error(ec);
3447 inline void Acceptor::accept(Socket& sock)
3450 if (accept(sock, ec)) // Throws
3451 throw std::system_error(ec);
3454 inline void Acceptor::accept(Socket& sock, Endpoint& ep)
3457 if (accept(sock, ep, ec)) // Throws
3458 throw std::system_error(ec);
3461 inline std::error_code Acceptor::accept(Socket& sock, std::error_code& ec)
3463 Endpoint* ep = nullptr;
3464 return accept(sock, ep, ec); // Throws
3467 inline std::error_code Acceptor::accept(Socket& sock, Endpoint& ep, std::error_code& ec)
3469 return accept(sock, &ep, ec); // Throws
3472 template<class H> inline void Acceptor::async_accept(Socket& sock, H handler)
3474 Endpoint* ep = nullptr;
3475 async_accept(sock, ep, std::move(handler)); // Throws
3478 template<class H> inline void Acceptor::async_accept(Socket& sock, Endpoint& ep, H handler)
3480 async_accept(sock, &ep, std::move(handler)); // Throws
3483 inline std::error_code Acceptor::accept(Socket& socket, Endpoint* ep, std::error_code& ec)
3485 REALM_ASSERT(!m_read_oper || !m_read_oper->in_use());
3486 if (REALM_UNLIKELY(socket.is_open()))
3487 throw std::runtime_error("Socket is already open");
3488 m_desc.ensure_blocking_mode(); // Throws
3489 m_desc.accept(socket.m_desc, m_protocol, ep, ec);
3493 inline Acceptor::Want Acceptor::do_accept_async(Socket& socket, Endpoint* ep,
3494 std::error_code& ec) noexcept
3496 std::error_code ec_2;
3497 m_desc.accept(socket.m_desc, m_protocol, ep, ec_2);
3498 if (ec_2 == error::resource_unavailable_try_again)
3501 return Want::nothing;
3504 template<class H> inline void Acceptor::async_accept(Socket& sock, Endpoint* ep, H handler)
3506 if (REALM_UNLIKELY(sock.is_open()))
3507 throw std::runtime_error("Socket is already open");
3508 LendersAcceptOperPtr op = Service::alloc<AcceptOper<H>>(m_read_oper, *this, sock, ep,
3509 std::move(handler)); // Throws
3510 m_desc.initiate_oper(std::move(op)); // Throws
3513 // ---------------- DeadlineTimer ----------------
3516 class DeadlineTimer::WaitOper: public Service::WaitOperBase {
3518 WaitOper(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time, H handler):
3519 Service::WaitOperBase{size, timer, expiration_time},
3520 m_handler{std::move(handler)}
3523 void recycle_and_execute() override final
3525 bool orphaned = !m_timer;
3528 ec = error::operation_aborted;
3529 // Note: do_recycle_and_execute() commits suicide.
3530 do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3536 inline DeadlineTimer::DeadlineTimer(Service& service):
3537 m_service_impl{*service.m_impl}
3541 inline DeadlineTimer::~DeadlineTimer() noexcept
3546 template<class R, class P, class H>
3547 inline void DeadlineTimer::async_wait(std::chrono::duration<R,P> delay, H handler)
3549 clock::time_point now = clock::now();
3550 // FIXME: This method of detecting overflow does not work. Comparison
3551 // between distinct duration types is not overflow safe. Overflow easily
3552 // happens in the implied conversion of arguments to the common duration
3553 // type (std::common_type<>).
3554 auto max_add = clock::time_point::max() - now;
3555 if (delay > max_add)
3556 throw std::runtime_error("Expiration time overflow");
3557 clock::time_point expiration_time = now + delay;
3558 Service::LendersWaitOperPtr op =
3559 Service::alloc<WaitOper<H>>(m_wait_oper, *this, expiration_time,
3560 std::move(handler)); // Throws
3561 add_oper(std::move(op)); // Throws
3564 // ---------------- Trigger ----------------
3567 class Trigger::ExecOper: public Service::TriggerExecOperBase {
3569 ExecOper(Service::Impl& service_impl, H handler):
3570 Service::TriggerExecOperBase{service_impl},
3571 m_handler{std::move(handler)}
3574 void recycle_and_execute() override final
3576 REALM_ASSERT(in_use());
3577 // Note: Potential suicide when `self` goes out of scope
3578 util::bind_ptr<TriggerExecOperBase> self{this, bind_ptr_base::adopt_tag{}};
3580 Service::reset_trigger_exec(*m_service, *this);
3581 m_handler(); // Throws
3588 template<class H> inline Trigger::Trigger(Service& service, H handler) :
3589 m_exec_oper{new ExecOper<H>{*service.m_impl, std::move(handler)}} // Throws
3593 inline Trigger::~Trigger() noexcept
3596 m_exec_oper->orphan();
3599 inline void Trigger::trigger() noexcept
3601 REALM_ASSERT(m_exec_oper);
3602 m_exec_oper->trigger();
3605 // ---------------- ReadAheadBuffer ----------------
3607 inline ReadAheadBuffer::ReadAheadBuffer():
3608 m_buffer{new char[s_size]} // Throws
3612 inline void ReadAheadBuffer::clear() noexcept
3618 inline bool ReadAheadBuffer::empty() const noexcept
3620 return (m_begin == m_end);
3623 template<class S> inline void ReadAheadBuffer::refill_sync(S& stream, std::error_code& ec) noexcept
3625 char* buffer = m_buffer.get();
3626 std::size_t size = s_size;
3627 static_assert(noexcept(stream.do_read_some_sync(buffer, size, ec)), "");
3628 std::size_t n = stream.do_read_some_sync(buffer, size, ec);
3629 if (REALM_UNLIKELY(n == 0))
3632 REALM_ASSERT(n <= size);
3633 m_begin = m_buffer.get();
3634 m_end = m_begin + n;
3638 inline bool ReadAheadBuffer::refill_async(S& stream, std::error_code& ec, Want& want) noexcept
3640 char* buffer = m_buffer.get();
3641 std::size_t size = s_size;
3642 static_assert(noexcept(stream.do_read_some_async(buffer, size, ec, want)), "");
3643 std::size_t n = stream.do_read_some_async(buffer, size, ec, want);
3647 REALM_ASSERT(n <= size);
3648 m_begin = m_buffer.get();
3649 m_end = m_begin + n;
3653 } // namespace network
3655 } // namespace realm
3657 #endif // REALM_UTIL_NETWORK_HPP