TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 :
19 : #include <atomic>
20 : #include <coroutine>
21 : #include <cstddef>
22 : #include <memory>
23 : #include <optional>
24 : #include <stop_token>
25 : #include <system_error>
26 :
27 : #include <errno.h>
28 :
29 : #include <netinet/in.h>
30 : #include <sys/socket.h>
31 : #include <sys/uio.h>
32 :
33 : namespace boost::corosio::detail {
34 :
35 : /** Base operation for reactor-based backends.
36 :
37 : Holds per-operation state that depends on the concrete backend
38 : socket/acceptor types: coroutine handle, executor, output
39 : pointers, file descriptor, stop_callback, and type-specific
40 : impl pointers.
41 :
42 : Fields shared across all backends (errn, bytes_transferred,
43 : cancelled, impl_ptr, perform_io, complete) live in
44 : reactor_op_base so the scheduler and descriptor_state can
45 : access them without template instantiation.
46 :
47 : @tparam Socket The backend socket impl type (forward-declared).
48 : @tparam Acceptor The backend acceptor impl type (forward-declared).
49 : */
50 : template<class Socket, class Acceptor>
51 : struct reactor_op : reactor_op_base
52 : {
53 : /// Stop-token callback that invokes cancel() on the target op.
54 : struct canceller
55 : {
56 : reactor_op* op;
57 HIT 199 : void operator()() const noexcept
58 : {
59 199 : op->cancel();
60 199 : }
61 : };
62 :
63 : /// Caller's coroutine handle to resume on completion.
64 : std::coroutine_handle<> h;
65 :
66 : /// Scheduler-ready continuation for executor dispatch/post (wraps h).
67 : detail::continuation_op cont_op;
68 :
69 : /// Executor for dispatching the completion.
70 : capy::executor_ref ex;
71 :
72 : /// Output pointer for the error code.
73 : std::error_code* ec_out = nullptr;
74 :
75 : /// Output pointer for bytes transferred.
76 : std::size_t* bytes_out = nullptr;
77 :
78 : /// File descriptor this operation targets.
79 : int fd = -1;
80 :
81 : /// Stop-token callback registration.
82 : std::optional<std::stop_callback<canceller>> stop_cb;
83 :
84 : /// Owning socket impl (for stop_token cancellation).
85 : Socket* socket_impl_ = nullptr;
86 :
87 : /// Owning acceptor impl (for stop_token cancellation).
88 : Acceptor* acceptor_impl_ = nullptr;
89 :
90 75092 : reactor_op() = default;
91 :
92 : /// Reset operation state for reuse.
93 476868 : void reset() noexcept
94 : {
95 476868 : fd = -1;
96 476868 : errn = 0;
97 476868 : bytes_transferred = 0;
98 476868 : cancelled.store(false, std::memory_order_relaxed);
99 476868 : impl_ptr.reset();
100 476868 : socket_impl_ = nullptr;
101 476868 : acceptor_impl_ = nullptr;
102 476868 : }
103 :
104 : /// Return true if this is a read-direction operation.
105 46009 : virtual bool is_read_operation() const noexcept
106 : {
107 46009 : return false;
108 : }
109 :
110 : /// Cancel this operation via the owning impl.
111 : virtual void cancel() noexcept = 0;
112 :
113 : /// Destroy without invoking.
114 MIS 0 : void destroy() override
115 : {
116 0 : stop_cb.reset();
117 0 : reactor_op_base::destroy();
118 0 : }
119 :
120 : /// Arm the stop-token callback for a socket operation.
121 HIT 100612 : void start(std::stop_token const& token, Socket* impl)
122 : {
123 100612 : cancelled.store(false, std::memory_order_release);
124 100612 : stop_cb.reset();
125 100612 : socket_impl_ = impl;
126 100612 : acceptor_impl_ = nullptr;
127 :
128 100612 : if (token.stop_possible())
129 197 : stop_cb.emplace(token, canceller{this});
130 100612 : }
131 :
132 : /// Arm the stop-token callback for an acceptor operation.
133 8230 : void start(std::stop_token const& token, Acceptor* impl)
134 : {
135 8230 : cancelled.store(false, std::memory_order_release);
136 8230 : stop_cb.reset();
137 8230 : socket_impl_ = nullptr;
138 8230 : acceptor_impl_ = impl;
139 :
140 8230 : if (token.stop_possible())
141 9 : stop_cb.emplace(token, canceller{this});
142 8230 : }
143 : };
144 :
145 : /** Shared connect operation.
146 :
147 : Checks SO_ERROR for connect completion status. The operator()()
148 : and cancel() are provided by the concrete backend type.
149 :
150 : @tparam Base The backend's base op type.
151 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
152 : */
153 : template<class Base, class Endpoint = endpoint>
154 : struct reactor_connect_op : Base
155 : {
156 : /// Endpoint to connect to.
157 : Endpoint target_endpoint;
158 :
159 : /// Reset operation state for reuse.
160 8232 : void reset() noexcept
161 : {
162 8232 : Base::reset();
163 8232 : target_endpoint = Endpoint{};
164 8232 : }
165 :
166 8217 : void perform_io() noexcept override
167 : {
168 8217 : int err = 0;
169 8217 : socklen_t len = sizeof(err);
170 8217 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
171 MIS 0 : err = errno;
172 HIT 8217 : this->complete(err, 0);
173 8217 : }
174 : };
175 :
176 : /** Shared scatter-read operation.
177 :
178 : Uses readv() with an EINTR retry loop.
179 :
180 : @tparam Base The backend's base op type.
181 : */
182 : template<class Base>
183 : struct reactor_read_op : Base
184 : {
185 : /// Maximum scatter-gather buffer count.
186 : static constexpr std::size_t max_buffers = 16;
187 :
188 : /// Scatter-gather I/O vectors.
189 : iovec iovecs[max_buffers];
190 :
191 : /// Number of active I/O vectors.
192 : int iovec_count = 0;
193 :
194 : /// True for zero-length reads (completed immediately).
195 : bool empty_buffer_read = false;
196 :
197 : /// Return true (this is a read-direction operation).
198 46041 : bool is_read_operation() const noexcept override
199 : {
200 46041 : return !empty_buffer_read;
201 : }
202 :
203 230302 : void reset() noexcept
204 : {
205 230302 : Base::reset();
206 230302 : iovec_count = 0;
207 230302 : empty_buffer_read = false;
208 230302 : }
209 :
210 326 : void perform_io() noexcept override
211 : {
212 : ssize_t n;
213 : do
214 : {
215 326 : n = ::readv(this->fd, iovecs, iovec_count);
216 : }
217 326 : while (n < 0 && errno == EINTR);
218 :
219 326 : if (n >= 0)
220 97 : this->complete(0, static_cast<std::size_t>(n));
221 : else
222 229 : this->complete(errno, 0);
223 326 : }
224 : };
225 :
226 : /** Shared gather-write operation.
227 :
228 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
229 : which returns ssize_t (bytes written or -1 with errno set).
230 :
231 : @tparam Base The backend's base op type.
232 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
233 : */
234 : template<class Base, class WritePolicy>
235 : struct reactor_write_op : Base
236 : {
237 : /// The write syscall policy type.
238 : using write_policy = WritePolicy;
239 :
240 : /// Maximum scatter-gather buffer count.
241 : static constexpr std::size_t max_buffers = 16;
242 :
243 : /// Scatter-gather I/O vectors.
244 : iovec iovecs[max_buffers];
245 :
246 : /// Number of active I/O vectors.
247 : int iovec_count = 0;
248 :
249 230008 : void reset() noexcept
250 : {
251 230008 : Base::reset();
252 230008 : iovec_count = 0;
253 230008 : }
254 :
255 MIS 0 : void perform_io() noexcept override
256 : {
257 0 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
258 0 : if (n >= 0)
259 0 : this->complete(0, static_cast<std::size_t>(n));
260 : else
261 0 : this->complete(errno, 0);
262 0 : }
263 : };
264 :
265 : /** Shared accept operation.
266 :
267 : Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
268 : which returns the accepted fd or -1 with errno set.
269 :
270 : @tparam Base The backend's base op type.
271 : @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
272 : */
273 : template<class Base, class AcceptPolicy>
274 : struct reactor_accept_op : Base
275 : {
276 : /// File descriptor of the accepted connection.
277 : int accepted_fd = -1;
278 :
279 : /// Pointer to the peer socket implementation.
280 : io_object::implementation* peer_impl = nullptr;
281 :
282 : /// Output pointer for the accepted implementation.
283 : io_object::implementation** impl_out = nullptr;
284 :
285 : /// Peer address storage filled by accept.
286 : sockaddr_storage peer_storage{};
287 :
288 : /// Peer address length returned by accept.
289 : socklen_t peer_addrlen = 0;
290 :
291 HIT 8230 : void reset() noexcept
292 : {
293 8230 : Base::reset();
294 8230 : accepted_fd = -1;
295 8230 : peer_impl = nullptr;
296 8230 : impl_out = nullptr;
297 8230 : peer_storage = {};
298 8230 : peer_addrlen = 0;
299 8230 : }
300 :
301 8212 : void perform_io() noexcept override
302 : {
303 8212 : int new_fd = AcceptPolicy::do_accept(
304 8212 : this->fd, peer_storage, peer_addrlen);
305 8212 : if (new_fd >= 0)
306 : {
307 8212 : accepted_fd = new_fd;
308 8212 : this->complete(0, 0);
309 : }
310 : else
311 : {
312 MIS 0 : this->complete(errno, 0);
313 : }
314 HIT 8212 : }
315 : };
316 :
317 : /** Shared connected send operation for datagram sockets.
318 :
319 : Uses sendmsg() with msg_name=nullptr (connected mode).
320 :
321 : @tparam Base The backend's base op type.
322 : */
323 : template<class Base>
324 : struct reactor_send_op : Base
325 : {
326 : /// Maximum scatter-gather buffer count.
327 : static constexpr std::size_t max_buffers = 16;
328 :
329 : /// Scatter-gather I/O vectors.
330 : iovec iovecs[max_buffers];
331 :
332 : /// Number of active I/O vectors.
333 : int iovec_count = 0;
334 :
335 : /// User-supplied message flags.
336 : int msg_flags = 0;
337 :
338 14 : void reset() noexcept
339 : {
340 14 : Base::reset();
341 14 : iovec_count = 0;
342 14 : msg_flags = 0;
343 14 : }
344 :
345 MIS 0 : void perform_io() noexcept override
346 : {
347 0 : msghdr msg{};
348 0 : msg.msg_iov = iovecs;
349 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
350 :
351 : #ifdef MSG_NOSIGNAL
352 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
353 : #else
354 : int send_flags = msg_flags;
355 : #endif
356 :
357 : ssize_t n;
358 : do
359 : {
360 0 : n = ::sendmsg(this->fd, &msg, send_flags);
361 : }
362 0 : while (n < 0 && errno == EINTR);
363 :
364 0 : if (n >= 0)
365 0 : this->complete(0, static_cast<std::size_t>(n));
366 : else
367 0 : this->complete(errno, 0);
368 0 : }
369 : };
370 :
371 : /** Shared connected recv operation for datagram sockets.
372 :
373 : Uses recvmsg() with msg_name=nullptr (connected mode).
374 : Unlike reactor_read_op, does not map n==0 to EOF
375 : (zero-length datagrams are valid).
376 :
377 : @tparam Base The backend's base op type.
378 : */
379 : template<class Base>
380 : struct reactor_recv_op : Base
381 : {
382 : /// Maximum scatter-gather buffer count.
383 : static constexpr std::size_t max_buffers = 16;
384 :
385 : /// Scatter-gather I/O vectors.
386 : iovec iovecs[max_buffers];
387 :
388 : /// Number of active I/O vectors.
389 : int iovec_count = 0;
390 :
391 : /// User-supplied message flags.
392 : int msg_flags = 0;
393 :
394 : /// Return true (this is a read-direction operation).
395 0 : bool is_read_operation() const noexcept override
396 : {
397 0 : return true;
398 : }
399 :
400 HIT 14 : void reset() noexcept
401 : {
402 14 : Base::reset();
403 14 : iovec_count = 0;
404 14 : msg_flags = 0;
405 14 : }
406 :
407 MIS 0 : void perform_io() noexcept override
408 : {
409 0 : msghdr msg{};
410 0 : msg.msg_iov = iovecs;
411 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
412 :
413 : ssize_t n;
414 : do
415 : {
416 0 : n = ::recvmsg(this->fd, &msg, msg_flags);
417 : }
418 0 : while (n < 0 && errno == EINTR);
419 :
420 0 : if (n >= 0)
421 0 : this->complete(0, static_cast<std::size_t>(n));
422 : else
423 0 : this->complete(errno, 0);
424 0 : }
425 : };
426 :
427 : /** Shared send_to operation for datagram sockets.
428 :
429 : Uses sendmsg() with the destination endpoint in msg_name.
430 :
431 : @tparam Base The backend's base op type.
432 : */
433 : template<class Base>
434 : struct reactor_send_to_op : Base
435 : {
436 : /// Maximum scatter-gather buffer count.
437 : static constexpr std::size_t max_buffers = 16;
438 :
439 : /// Scatter-gather I/O vectors.
440 : iovec iovecs[max_buffers];
441 :
442 : /// Number of active I/O vectors.
443 : int iovec_count = 0;
444 :
445 : /// Destination address storage.
446 : sockaddr_storage dest_storage{};
447 :
448 : /// Destination address length.
449 : socklen_t dest_len = 0;
450 :
451 : /// User-supplied message flags.
452 : int msg_flags = 0;
453 :
454 HIT 28 : void reset() noexcept
455 : {
456 28 : Base::reset();
457 28 : iovec_count = 0;
458 28 : dest_storage = {};
459 28 : dest_len = 0;
460 28 : msg_flags = 0;
461 28 : }
462 :
463 MIS 0 : void perform_io() noexcept override
464 : {
465 0 : msghdr msg{};
466 0 : msg.msg_name = &dest_storage;
467 0 : msg.msg_namelen = dest_len;
468 0 : msg.msg_iov = iovecs;
469 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
470 :
471 : #ifdef MSG_NOSIGNAL
472 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
473 : #else
474 : int send_flags = msg_flags;
475 : #endif
476 :
477 : ssize_t n;
478 : do
479 : {
480 0 : n = ::sendmsg(this->fd, &msg, send_flags);
481 : }
482 0 : while (n < 0 && errno == EINTR);
483 :
484 0 : if (n >= 0)
485 0 : this->complete(0, static_cast<std::size_t>(n));
486 : else
487 0 : this->complete(errno, 0);
488 0 : }
489 : };
490 :
491 : /** Shared recv_from operation for datagram sockets.
492 :
493 : Uses recvmsg() with msg_name to capture the source endpoint.
494 :
495 : @tparam Base The backend's base op type.
496 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
497 : */
498 : template<class Base, class Endpoint = endpoint>
499 : struct reactor_recv_from_op : Base
500 : {
501 : /// Maximum scatter-gather buffer count.
502 : static constexpr std::size_t max_buffers = 16;
503 :
504 : /// Scatter-gather I/O vectors.
505 : iovec iovecs[max_buffers];
506 :
507 : /// Number of active I/O vectors.
508 : int iovec_count = 0;
509 :
510 : /// Source address storage filled by recvmsg.
511 : sockaddr_storage source_storage{};
512 :
513 : /// Actual source address length returned by recvmsg.
514 : socklen_t source_addrlen = 0;
515 :
516 : /// Output pointer for the source endpoint (set by do_recv_from).
517 : Endpoint* source_out = nullptr;
518 :
519 : /// User-supplied message flags.
520 : int msg_flags = 0;
521 :
522 : /// Return true (this is a read-direction operation).
523 0 : bool is_read_operation() const noexcept override
524 : {
525 0 : return true;
526 : }
527 :
528 HIT 40 : void reset() noexcept
529 : {
530 40 : Base::reset();
531 40 : iovec_count = 0;
532 40 : source_storage = {};
533 40 : source_addrlen = 0;
534 40 : source_out = nullptr;
535 40 : msg_flags = 0;
536 40 : }
537 :
538 2 : void perform_io() noexcept override
539 : {
540 2 : msghdr msg{};
541 2 : msg.msg_name = &source_storage;
542 2 : msg.msg_namelen = sizeof(source_storage);
543 2 : msg.msg_iov = iovecs;
544 2 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
545 :
546 : ssize_t n;
547 : do
548 : {
549 2 : n = ::recvmsg(this->fd, &msg, msg_flags);
550 : }
551 2 : while (n < 0 && errno == EINTR);
552 :
553 2 : if (n >= 0)
554 : {
555 2 : source_addrlen = msg.msg_namelen;
556 2 : this->complete(0, static_cast<std::size_t>(n));
557 : }
558 : else
559 MIS 0 : this->complete(errno, 0);
560 HIT 2 : }
561 : };
562 :
563 : } // namespace boost::corosio::detail
564 :
565 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|