TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
12 :
13 : #include <boost/corosio/tcp_acceptor.hpp>
14 : #include <boost/corosio/detail/intrusive.hpp>
15 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
16 : #include <boost/corosio/native/detail/make_err.hpp>
17 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
18 :
19 : #include <memory>
20 : #include <mutex>
21 : #include <utility>
22 :
23 : #include <errno.h>
24 : #include <netinet/in.h>
25 : #include <sys/socket.h>
26 : #include <unistd.h>
27 :
28 : namespace boost::corosio::detail {
29 :
30 : /** CRTP base for reactor-backed acceptor implementations.
31 :
32 : Provides shared data members, trivial virtual overrides, and
33 : non-virtual helper methods for cancellation and close. Concrete
34 : backends inherit and add `cancel()`, `close_socket()`, and
35 : `accept()` overrides that delegate to the `do_*` helpers.
36 :
37 : @tparam Derived The concrete acceptor type (CRTP).
38 : @tparam Service The backend's acceptor service type.
39 : @tparam Op The backend's base op type.
40 : @tparam AcceptOp The backend's accept op type.
41 : @tparam DescState The backend's descriptor_state type.
42 : @tparam ImplBase The public vtable base
43 : (tcp_acceptor::implementation or
44 : local_stream_acceptor::implementation).
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class Service,
50 : class Op,
51 : class AcceptOp,
52 : class DescState,
53 : class ImplBase = tcp_acceptor::implementation,
54 : class Endpoint = endpoint>
55 : class reactor_acceptor
56 : : public ImplBase
57 : , public std::enable_shared_from_this<Derived>
58 : , public intrusive_list<Derived>::node
59 : {
60 : friend Derived;
61 :
62 : protected:
63 : // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
64 HIT 161 : explicit reactor_acceptor(Service& svc) noexcept : svc_(svc) {}
65 :
66 : protected:
67 : Service& svc_;
68 : int fd_ = -1;
69 : Endpoint local_endpoint_;
70 :
71 : public:
72 : /// Pending accept operation slot.
73 : AcceptOp acc_;
74 :
75 : /// Per-descriptor state for persistent reactor registration.
76 : DescState desc_state_;
77 :
78 161 : ~reactor_acceptor() override = default;
79 :
80 : /// Return the underlying file descriptor.
81 : int native_handle() const noexcept
82 : {
83 : return fd_;
84 : }
85 :
86 : /// Return the cached local endpoint.
87 8348 : Endpoint local_endpoint() const noexcept override
88 : {
89 8348 : return local_endpoint_;
90 : }
91 :
92 : /// Return true if the acceptor has an open file descriptor.
93 9282 : bool is_open() const noexcept override
94 : {
95 9282 : return fd_ >= 0;
96 : }
97 :
98 : /// Set a socket option.
99 139 : std::error_code set_option(
100 : int level,
101 : int optname,
102 : void const* data,
103 : std::size_t size) noexcept override
104 : {
105 139 : if (::setsockopt(
106 139 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
107 MIS 0 : return make_err(errno);
108 HIT 139 : return {};
109 : }
110 :
111 : /// Get a socket option.
112 : std::error_code
113 MIS 0 : get_option(int level, int optname, void* data, std::size_t* size)
114 : const noexcept override
115 : {
116 0 : socklen_t len = static_cast<socklen_t>(*size);
117 0 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
118 0 : return make_err(errno);
119 0 : *size = static_cast<std::size_t>(len);
120 0 : return {};
121 : }
122 :
123 : /// Cache the local endpoint.
124 HIT 146 : void set_local_endpoint(Endpoint ep) noexcept
125 : {
126 146 : local_endpoint_ = std::move(ep);
127 146 : }
128 :
129 : /// Assign the fd and initialize descriptor state for the acceptor.
130 156 : void init_acceptor_fd(int fd) noexcept
131 : {
132 156 : fd_ = fd;
133 156 : desc_state_.fd = fd;
134 : {
135 156 : std::lock_guard lock(desc_state_.mutex);
136 156 : desc_state_.read_op = nullptr;
137 156 : }
138 156 : }
139 :
140 : /// Return a reference to the owning service.
141 8218 : Service& service() noexcept
142 : {
143 8218 : return svc_;
144 : }
145 :
146 4 : void cancel() noexcept override { do_cancel(); }
147 :
148 : /// Close the acceptor (non-virtual, called by the service).
149 634 : void close_socket() noexcept { do_close_socket(); }
150 :
151 : /** Cancel a single pending operation.
152 :
153 : Claims the operation from the read_op descriptor slot
154 : under the mutex and posts it to the scheduler as cancelled.
155 :
156 : @param op The operation to cancel.
157 : */
158 : void cancel_single_op(Op& op) noexcept;
159 :
160 : /** Cancel the pending accept operation. */
161 : void do_cancel() noexcept;
162 :
163 : /** Close the acceptor and cancel pending operations.
164 :
165 : Invoked by the derived class's close_socket(). The
166 : derived class may add backend-specific cleanup after
167 : calling this method.
168 : */
169 : void do_close_socket() noexcept;
170 :
171 : /** Release the acceptor without closing the fd. */
172 : native_handle_type do_release_socket() noexcept;
173 :
174 : /** Bind the acceptor socket to an endpoint.
175 :
176 : Caches the resolved local endpoint (including ephemeral
177 : port) after a successful bind.
178 :
179 : @param ep The endpoint to bind to.
180 : @return The error code from bind(), or success.
181 : */
182 : std::error_code do_bind(Endpoint const& ep);
183 :
184 : /** Start listening on the acceptor socket.
185 :
186 : Registers the file descriptor with the reactor after
187 : a successful listen() call.
188 :
189 : @param backlog The listen backlog.
190 : @return The error code from listen(), or success.
191 : */
192 : std::error_code do_listen(int backlog);
193 : };
194 :
195 : template<
196 : class Derived,
197 : class Service,
198 : class Op,
199 : class AcceptOp,
200 : class DescState,
201 : class ImplBase,
202 : class Endpoint>
203 : void
204 10 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
205 : cancel_single_op(Op& op) noexcept
206 : {
207 10 : auto self = this->weak_from_this().lock();
208 10 : if (!self)
209 MIS 0 : return;
210 :
211 HIT 10 : op.request_cancel();
212 :
213 10 : reactor_op_base* claimed = nullptr;
214 : {
215 10 : std::lock_guard lock(desc_state_.mutex);
216 10 : if (desc_state_.read_op == &op)
217 8 : claimed = std::exchange(desc_state_.read_op, nullptr);
218 10 : }
219 10 : if (claimed)
220 : {
221 8 : op.impl_ptr = self;
222 8 : svc_.post(&op);
223 8 : svc_.work_finished();
224 : }
225 10 : }
226 :
227 : template<
228 : class Derived,
229 : class Service,
230 : class Op,
231 : class AcceptOp,
232 : class DescState,
233 : class ImplBase,
234 : class Endpoint>
235 : void
236 4 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
237 : do_cancel() noexcept
238 : {
239 4 : cancel_single_op(acc_);
240 4 : }
241 :
242 : template<
243 : class Derived,
244 : class Service,
245 : class Op,
246 : class AcceptOp,
247 : class DescState,
248 : class ImplBase,
249 : class Endpoint>
250 : void
251 634 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
252 : do_close_socket() noexcept
253 : {
254 634 : auto self = this->weak_from_this().lock();
255 634 : if (self)
256 : {
257 634 : acc_.request_cancel();
258 :
259 634 : reactor_op_base* claimed = nullptr;
260 : {
261 634 : std::lock_guard lock(desc_state_.mutex);
262 634 : claimed = std::exchange(desc_state_.read_op, nullptr);
263 634 : desc_state_.read_ready = false;
264 634 : desc_state_.write_ready = false;
265 :
266 634 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
267 MIS 0 : desc_state_.impl_ref_ = self;
268 HIT 634 : }
269 :
270 634 : if (claimed)
271 : {
272 4 : acc_.impl_ptr = self;
273 4 : svc_.post(&acc_);
274 4 : svc_.work_finished();
275 : }
276 : }
277 :
278 634 : if (fd_ >= 0)
279 : {
280 156 : if (desc_state_.registered_events != 0)
281 138 : svc_.scheduler().deregister_descriptor(fd_);
282 156 : ::close(fd_);
283 156 : fd_ = -1;
284 : }
285 :
286 634 : desc_state_.fd = -1;
287 634 : desc_state_.registered_events = 0;
288 :
289 634 : local_endpoint_ = Endpoint{};
290 634 : }
291 :
292 : template<
293 : class Derived,
294 : class Service,
295 : class Op,
296 : class AcceptOp,
297 : class DescState,
298 : class ImplBase,
299 : class Endpoint>
300 : native_handle_type
301 MIS 0 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
302 : do_release_socket() noexcept
303 : {
304 0 : auto self = this->weak_from_this().lock();
305 0 : if (self)
306 : {
307 0 : acc_.request_cancel();
308 :
309 0 : reactor_op_base* claimed = nullptr;
310 : {
311 0 : std::lock_guard lock(desc_state_.mutex);
312 0 : claimed = std::exchange(desc_state_.read_op, nullptr);
313 0 : desc_state_.read_ready = false;
314 0 : desc_state_.write_ready = false;
315 :
316 0 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
317 0 : desc_state_.impl_ref_ = self;
318 0 : }
319 :
320 0 : if (claimed)
321 : {
322 0 : acc_.impl_ptr = self;
323 0 : svc_.post(&acc_);
324 0 : svc_.work_finished();
325 : }
326 : }
327 :
328 0 : native_handle_type released = fd_;
329 :
330 0 : if (fd_ >= 0)
331 : {
332 0 : if (desc_state_.registered_events != 0)
333 0 : svc_.scheduler().deregister_descriptor(fd_);
334 0 : fd_ = -1;
335 : }
336 :
337 0 : desc_state_.fd = -1;
338 0 : desc_state_.registered_events = 0;
339 :
340 0 : local_endpoint_ = Endpoint{};
341 :
342 0 : return released;
343 0 : }
344 :
345 : template<
346 : class Derived,
347 : class Service,
348 : class Op,
349 : class AcceptOp,
350 : class DescState,
351 : class ImplBase,
352 : class Endpoint>
353 : std::error_code
354 HIT 154 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
355 : do_bind(Endpoint const& ep)
356 : {
357 154 : sockaddr_storage storage{};
358 154 : socklen_t addrlen = to_sockaddr(ep, storage);
359 154 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
360 8 : return make_err(errno);
361 :
362 : // Cache local endpoint (resolves ephemeral port / path)
363 146 : sockaddr_storage local{};
364 146 : socklen_t local_len = sizeof(local);
365 146 : if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local), &local_len) ==
366 : 0)
367 146 : set_local_endpoint(from_sockaddr_as(local, local_len, Endpoint{}));
368 :
369 146 : return {};
370 : }
371 :
372 : template<
373 : class Derived,
374 : class Service,
375 : class Op,
376 : class AcceptOp,
377 : class DescState,
378 : class ImplBase,
379 : class Endpoint>
380 : std::error_code
381 138 : reactor_acceptor<Derived, Service, Op, AcceptOp, DescState, ImplBase, Endpoint>::
382 : do_listen(int backlog)
383 : {
384 138 : if (::listen(fd_, backlog) < 0)
385 MIS 0 : return make_err(errno);
386 :
387 HIT 138 : svc_.scheduler().register_descriptor(fd_, &desc_state_);
388 138 : return {};
389 : }
390 :
391 : } // namespace boost::corosio::detail
392 :
393 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
|