TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BACKEND_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BACKEND_HPP
12 :
13 : /* Reactor backend: acceptor accept() implementation.
14 :
15 : Contains the accept() method body for reactor_acceptor_impl,
16 : which needs all socket/service types to be complete. Included
17 : by per-backend type files (epoll_types.hpp, etc.) after all
18 : named types are defined.
19 : */
20 :
21 : #include <boost/corosio/native/detail/reactor/reactor_service_finals.hpp>
22 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
23 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
24 : #include <boost/corosio/detail/dispatch_coro.hpp>
25 :
26 : #include <mutex>
27 :
28 : namespace boost::corosio::detail {
29 :
30 : // ============================================================
31 : // Acceptor accept() implementation
32 : // ============================================================
33 :
34 : template<class Derived, class Traits, class Service,
35 : class SocketFinal, class AccImplBase, class Endpoint>
36 : std::coroutine_handle<>
37 HIT 8230 : reactor_acceptor_impl<Derived, Traits, Service, SocketFinal, AccImplBase, Endpoint>::accept(
38 : std::coroutine_handle<> h,
39 : capy::executor_ref ex,
40 : std::stop_token token,
41 : std::error_code* ec,
42 : io_object::implementation** impl_out)
43 : {
44 8230 : auto& op = this->acc_;
45 8230 : op.reset();
46 8230 : op.h = h;
47 8230 : op.ex = ex;
48 8230 : op.ec_out = ec;
49 8230 : op.impl_out = impl_out;
50 8230 : op.fd = this->fd_;
51 8230 : op.start(token, static_cast<Derived*>(this));
52 :
53 8230 : sockaddr_storage peer_storage{};
54 8230 : socklen_t peer_addrlen = 0;
55 :
56 8230 : int accepted = Traits::accept_policy::do_accept(
57 : this->fd_, peer_storage, peer_addrlen);
58 :
59 8230 : if (accepted >= 0)
60 : {
61 : {
62 6 : std::lock_guard lock(this->desc_state_.mutex);
63 6 : this->desc_state_.read_ready = false;
64 6 : }
65 :
66 6 : if (this->svc_.scheduler().try_consume_inline_budget())
67 : {
68 MIS 0 : auto* socket_svc = this->svc_.stream_service();
69 0 : if (socket_svc)
70 : {
71 : auto& impl =
72 0 : static_cast<SocketFinal&>(*socket_svc->construct());
73 0 : impl.set_socket(accepted);
74 :
75 0 : impl.desc_state_.fd = accepted;
76 : {
77 0 : std::lock_guard lock(impl.desc_state_.mutex);
78 0 : impl.desc_state_.read_op = nullptr;
79 0 : impl.desc_state_.write_op = nullptr;
80 0 : impl.desc_state_.connect_op = nullptr;
81 0 : }
82 0 : socket_svc->scheduler().register_descriptor(
83 : accepted, &impl.desc_state_);
84 :
85 0 : impl.set_endpoints(
86 : this->local_endpoint_,
87 0 : from_sockaddr_as(
88 : peer_storage, peer_addrlen, Endpoint{}));
89 :
90 0 : *ec = {};
91 0 : if (impl_out)
92 0 : *impl_out = &impl;
93 : }
94 : else
95 : {
96 0 : ::close(accepted);
97 0 : *ec = make_err(ENOENT);
98 0 : if (impl_out)
99 0 : *impl_out = nullptr;
100 : }
101 0 : op.cont_op.cont.h = h;
102 0 : return dispatch_coro(ex, op.cont_op.cont);
103 : }
104 :
105 HIT 6 : op.accepted_fd = accepted;
106 6 : op.peer_storage = peer_storage;
107 6 : op.peer_addrlen = peer_addrlen;
108 6 : op.complete(0, 0);
109 6 : op.impl_ptr = this->shared_from_this();
110 6 : this->svc_.post(&op);
111 6 : return std::noop_coroutine();
112 : }
113 :
114 8224 : if (errno == EAGAIN || errno == EWOULDBLOCK)
115 : {
116 8224 : op.impl_ptr = this->shared_from_this();
117 8224 : this->svc_.work_started();
118 :
119 8224 : std::lock_guard lock(this->desc_state_.mutex);
120 8224 : bool io_done = false;
121 8224 : if (this->desc_state_.read_ready)
122 : {
123 MIS 0 : this->desc_state_.read_ready = false;
124 0 : op.perform_io();
125 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
126 0 : if (!io_done)
127 0 : op.errn = 0;
128 : }
129 :
130 HIT 8224 : if (io_done || op.cancelled.load(std::memory_order_acquire))
131 : {
132 MIS 0 : this->svc_.post(&op);
133 0 : this->svc_.work_finished();
134 : }
135 : else
136 : {
137 HIT 8224 : this->desc_state_.read_op = &op;
138 : }
139 8224 : return std::noop_coroutine();
140 8224 : }
141 :
142 MIS 0 : op.complete(errno, 0);
143 0 : op.impl_ptr = this->shared_from_this();
144 0 : this->svc_.post(&op);
145 0 : return std::noop_coroutine();
146 : }
147 :
148 : } // namespace boost::corosio::detail
149 :
150 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BACKEND_HPP
|