TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/intrusive.hpp>
14 : #include <boost/corosio/detail/native_handle.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
17 : #include <boost/corosio/native/detail/make_err.hpp>
18 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
19 :
20 : #include <memory>
21 : #include <mutex>
22 : #include <utility>
23 :
24 : #include <errno.h>
25 : #include <netinet/in.h>
26 : #include <sys/socket.h>
27 : #include <unistd.h>
28 :
29 : namespace boost::corosio::detail {
30 :
31 : /** CRTP base for reactor-backed socket implementations.
32 :
33 : Extracts the shared data members, virtual overrides, and
34 : cancel/close/register logic that is identical across TCP
35 : (reactor_stream_socket) and UDP (reactor_datagram_socket).
36 :
37 : Derived classes provide CRTP callbacks that enumerate their
38 : specific op slots so cancel/close can iterate them generically.
39 :
40 : @tparam Derived The concrete socket type (CRTP).
41 : @tparam ImplBase The public vtable base (tcp_socket::implementation
42 : or udp_socket::implementation).
43 : @tparam Service The backend's service type.
44 : @tparam DescState The backend's descriptor_state type.
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class ImplBase,
50 : class Service,
51 : class DescState,
52 : class Endpoint = endpoint>
53 : class reactor_basic_socket
54 : : public ImplBase
55 : , public std::enable_shared_from_this<Derived>
56 : , public intrusive_list<Derived>::node
57 : {
58 : friend Derived;
59 :
60 : template<class, class, class, class, class, class, class, class>
61 : friend class reactor_stream_socket;
62 :
63 : template<class, class, class, class, class, class, class, class, class, class>
64 : friend class reactor_datagram_socket;
65 :
66 HIT 24897 : explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
67 :
68 : protected:
69 : Service& svc_;
70 : int fd_ = -1;
71 : Endpoint local_endpoint_;
72 :
73 : public:
74 : /// Per-descriptor state for persistent reactor registration.
75 : DescState desc_state_;
76 :
77 24897 : ~reactor_basic_socket() override = default;
78 :
79 : /// Return the underlying file descriptor.
80 75363 : native_handle_type native_handle() const noexcept override
81 : {
82 75363 : return fd_;
83 : }
84 :
85 : /// Return the cached local endpoint.
86 80 : Endpoint local_endpoint() const noexcept override
87 : {
88 80 : return local_endpoint_;
89 : }
90 :
91 : /// Return true if the socket has an open file descriptor.
92 : bool is_open() const noexcept
93 : {
94 : return fd_ >= 0;
95 : }
96 :
97 : /// Set a socket option.
98 20 : std::error_code set_option(
99 : int level,
100 : int optname,
101 : void const* data,
102 : std::size_t size) noexcept override
103 : {
104 20 : if (::setsockopt(
105 20 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
106 MIS 0 : return make_err(errno);
107 HIT 20 : return {};
108 : }
109 :
110 : /// Get a socket option.
111 : std::error_code
112 78 : get_option(int level, int optname, void* data, std::size_t* size)
113 : const noexcept override
114 : {
115 78 : socklen_t len = static_cast<socklen_t>(*size);
116 78 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
117 MIS 0 : return make_err(errno);
118 HIT 78 : *size = static_cast<std::size_t>(len);
119 78 : return {};
120 : }
121 :
122 : /// Assign the file descriptor.
123 8218 : void set_socket(int fd) noexcept
124 : {
125 8218 : fd_ = fd;
126 8218 : }
127 :
128 : /// Cache the local endpoint.
129 : void set_local_endpoint(Endpoint ep) noexcept
130 : {
131 : local_endpoint_ = ep;
132 : }
133 :
134 : /** Bind the socket to a local endpoint.
135 :
136 : Calls ::bind() and caches the resulting local endpoint
137 : via getsockname().
138 :
139 : @param ep The endpoint to bind to.
140 : @return Error code on failure, empty on success.
141 : */
142 76 : std::error_code do_bind(Endpoint const& ep) noexcept
143 : {
144 76 : sockaddr_storage storage{};
145 76 : socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
146 76 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
147 10 : return make_err(errno);
148 :
149 66 : sockaddr_storage local_storage{};
150 66 : socklen_t local_len = sizeof(local_storage);
151 66 : if (::getsockname(
152 66 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
153 : 0)
154 52 : local_endpoint_ =
155 66 : from_sockaddr_as(local_storage, local_len, Endpoint{});
156 :
157 66 : return {};
158 : }
159 :
160 : /// Assign the fd, initialize descriptor state, and register with the reactor.
161 8394 : void init_and_register(int fd) noexcept
162 : {
163 8394 : fd_ = fd;
164 8394 : desc_state_.fd = fd;
165 : {
166 8394 : std::lock_guard lock(desc_state_.mutex);
167 8394 : desc_state_.read_op = nullptr;
168 8394 : desc_state_.write_op = nullptr;
169 8394 : desc_state_.connect_op = nullptr;
170 8394 : }
171 8394 : svc_.scheduler().register_descriptor(fd, &desc_state_);
172 8394 : }
173 :
174 : /** Register an op with the reactor.
175 :
176 : Handles cached edge events and deferred cancellation.
177 : Called on the EAGAIN/EINPROGRESS path when speculative
178 : I/O failed.
179 : */
180 : template<class Op>
181 : void register_op(
182 : Op& op,
183 : reactor_op_base*& desc_slot,
184 : bool& ready_flag,
185 : bool& cancel_flag,
186 : bool is_write_direction = false) noexcept;
187 :
188 : /** Cancel a single pending operation.
189 :
190 : Claims the operation from its descriptor_state slot under
191 : the mutex and posts it to the scheduler as cancelled.
192 : Derived must implement:
193 : op_to_desc_slot(Op&) -> reactor_op_base**
194 : op_to_cancel_flag(Op&) -> bool*
195 : */
196 : template<class Op>
197 : void cancel_single_op(Op& op) noexcept;
198 :
199 : /** Cancel all pending operations.
200 :
201 : Invoked by the derived class's cancel() override.
202 : Derived must implement:
203 : for_each_op(auto fn)
204 : for_each_desc_entry(auto fn)
205 : */
206 : void do_cancel() noexcept;
207 :
208 : /** Close the socket and cancel pending operations.
209 :
210 : Invoked by the derived class's close_socket(). The
211 : derived class may add backend-specific cleanup after
212 : calling this method.
213 : Derived must implement:
214 : for_each_op(auto fn)
215 : for_each_desc_entry(auto fn)
216 : */
217 : void do_close_socket() noexcept;
218 :
219 : /** Release the socket without closing the fd.
220 :
221 : Like do_close_socket() but does not call ::close().
222 : Returns the fd so the caller can take ownership.
223 : */
224 : native_handle_type do_release_socket() noexcept;
225 : };
226 :
227 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
228 : template<class Op>
229 : void
230 8615 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
231 : Op& op,
232 : reactor_op_base*& desc_slot,
233 : bool& ready_flag,
234 : bool& cancel_flag,
235 : bool is_write_direction) noexcept
236 : {
237 8615 : svc_.work_started();
238 :
239 8615 : std::lock_guard lock(desc_state_.mutex);
240 8615 : bool io_done = false;
241 8615 : if (ready_flag)
242 : {
243 185 : ready_flag = false;
244 185 : op.perform_io();
245 185 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
246 185 : if (!io_done)
247 185 : op.errn = 0;
248 : }
249 :
250 8615 : if (cancel_flag)
251 : {
252 MIS 0 : cancel_flag = false;
253 0 : op.cancelled.store(true, std::memory_order_relaxed);
254 : }
255 :
256 HIT 8615 : if (io_done || op.cancelled.load(std::memory_order_acquire))
257 : {
258 MIS 0 : svc_.post(&op);
259 0 : svc_.work_finished();
260 : }
261 : else
262 : {
263 HIT 8615 : desc_slot = &op;
264 :
265 : // Select must rebuild its fd_sets when a write-direction op
266 : // is parked, so select() watches for writability. Compiled
267 : // away to nothing for epoll and kqueue.
268 : if constexpr (requires { Service::needs_write_notification; })
269 : {
270 : if constexpr (Service::needs_write_notification)
271 : {
272 3725 : if (is_write_direction)
273 3532 : svc_.scheduler().notify_reactor();
274 : }
275 : }
276 : }
277 8615 : }
278 :
279 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
280 : template<class Op>
281 : void
282 193 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
283 : Op& op) noexcept
284 : {
285 193 : auto self = this->weak_from_this().lock();
286 193 : if (!self)
287 MIS 0 : return;
288 :
289 HIT 193 : op.request_cancel();
290 :
291 193 : auto* d = static_cast<Derived*>(this);
292 193 : reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
293 :
294 193 : if (desc_op_ptr)
295 : {
296 193 : reactor_op_base* claimed = nullptr;
297 : {
298 193 : std::lock_guard lock(desc_state_.mutex);
299 193 : if (*desc_op_ptr == &op)
300 193 : claimed = std::exchange(*desc_op_ptr, nullptr);
301 : else
302 : {
303 MIS 0 : bool* cflag = d->op_to_cancel_flag(op);
304 0 : if (cflag)
305 0 : *cflag = true;
306 : }
307 HIT 193 : }
308 193 : if (claimed)
309 : {
310 193 : op.impl_ptr = self;
311 193 : svc_.post(&op);
312 193 : svc_.work_finished();
313 : }
314 : }
315 193 : }
316 :
317 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
318 : void
319 189 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
320 : do_cancel() noexcept
321 : {
322 189 : auto self = this->weak_from_this().lock();
323 189 : if (!self)
324 MIS 0 : return;
325 :
326 HIT 189 : auto* d = static_cast<Derived*>(this);
327 :
328 764 : d->for_each_op([](auto& op) { op.request_cancel(); });
329 :
330 : // Claim ops under a single lock acquisition
331 : struct claimed_entry
332 : {
333 : reactor_op_base* op = nullptr;
334 : reactor_op_base* base = nullptr;
335 : };
336 : // Max 3 ops (conn, rd, wr)
337 189 : claimed_entry claimed[3];
338 189 : int count = 0;
339 :
340 : {
341 189 : std::lock_guard lock(desc_state_.mutex);
342 1339 : d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
343 575 : if (desc_slot == &op)
344 : {
345 101 : claimed[count].op = std::exchange(desc_slot, nullptr);
346 101 : claimed[count].base = &op;
347 101 : ++count;
348 : }
349 : });
350 189 : }
351 :
352 290 : for (int i = 0; i < count; ++i)
353 : {
354 101 : claimed[i].base->impl_ptr = self;
355 101 : svc_.post(claimed[i].base);
356 101 : svc_.work_finished();
357 : }
358 189 : }
359 :
360 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
361 : void
362 74798 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
363 : do_close_socket() noexcept
364 : {
365 74798 : auto self = this->weak_from_this().lock();
366 74798 : if (self)
367 : {
368 74798 : auto* d = static_cast<Derived*>(this);
369 :
370 300120 : d->for_each_op([](auto& op) { op.request_cancel(); });
371 :
372 : struct claimed_entry
373 : {
374 : reactor_op_base* base = nullptr;
375 : };
376 74798 : claimed_entry claimed[3];
377 74798 : int count = 0;
378 :
379 : {
380 74798 : std::lock_guard lock(desc_state_.mutex);
381 74798 : d->for_each_desc_entry(
382 450644 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
383 225322 : auto* c = std::exchange(desc_slot, nullptr);
384 225322 : if (c)
385 : {
386 4 : claimed[count].base = c;
387 4 : ++count;
388 : }
389 : });
390 74798 : desc_state_.read_ready = false;
391 74798 : desc_state_.write_ready = false;
392 74798 : desc_state_.read_cancel_pending = false;
393 74798 : desc_state_.write_cancel_pending = false;
394 74798 : desc_state_.connect_cancel_pending = false;
395 :
396 74798 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
397 268 : desc_state_.impl_ref_ = self;
398 74798 : }
399 :
400 74802 : for (int i = 0; i < count; ++i)
401 : {
402 4 : claimed[i].base->impl_ptr = self;
403 4 : svc_.post(claimed[i].base);
404 4 : svc_.work_finished();
405 : }
406 : }
407 :
408 74798 : if (fd_ >= 0)
409 : {
410 16610 : if (desc_state_.registered_events != 0)
411 16610 : svc_.scheduler().deregister_descriptor(fd_);
412 16610 : ::close(fd_);
413 16610 : fd_ = -1;
414 : }
415 :
416 74798 : desc_state_.fd = -1;
417 74798 : desc_state_.registered_events = 0;
418 :
419 74798 : local_endpoint_ = Endpoint{};
420 74798 : }
421 :
422 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
423 : native_handle_type
424 2 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
425 : do_release_socket() noexcept
426 : {
427 : // Cancel pending ops (same as do_close_socket)
428 2 : auto self = this->weak_from_this().lock();
429 2 : if (self)
430 : {
431 2 : auto* d = static_cast<Derived*>(this);
432 :
433 8 : d->for_each_op([](auto& op) { op.request_cancel(); });
434 :
435 : struct claimed_entry
436 : {
437 : reactor_op_base* base = nullptr;
438 : };
439 2 : claimed_entry claimed[3];
440 2 : int count = 0;
441 :
442 : {
443 2 : std::lock_guard lock(desc_state_.mutex);
444 2 : d->for_each_desc_entry(
445 12 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
446 6 : auto* c = std::exchange(desc_slot, nullptr);
447 6 : if (c)
448 : {
449 MIS 0 : claimed[count].base = c;
450 0 : ++count;
451 : }
452 : });
453 HIT 2 : desc_state_.read_ready = false;
454 2 : desc_state_.write_ready = false;
455 2 : desc_state_.read_cancel_pending = false;
456 2 : desc_state_.write_cancel_pending = false;
457 2 : desc_state_.connect_cancel_pending = false;
458 :
459 2 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
460 MIS 0 : desc_state_.impl_ref_ = self;
461 HIT 2 : }
462 :
463 2 : for (int i = 0; i < count; ++i)
464 : {
465 MIS 0 : claimed[i].base->impl_ptr = self;
466 0 : svc_.post(claimed[i].base);
467 0 : svc_.work_finished();
468 : }
469 : }
470 :
471 HIT 2 : native_handle_type released = fd_;
472 :
473 2 : if (fd_ >= 0)
474 : {
475 2 : if (desc_state_.registered_events != 0)
476 2 : svc_.scheduler().deregister_descriptor(fd_);
477 : // Do NOT close -- caller takes ownership
478 2 : fd_ = -1;
479 : }
480 :
481 2 : desc_state_.fd = -1;
482 2 : desc_state_.registered_events = 0;
483 :
484 2 : local_endpoint_ = Endpoint{};
485 :
486 4 : return released;
487 2 : }
488 :
489 : } // namespace boost::corosio::detail
490 :
491 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
|