LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 68.5 % 162 111 51
Test Date: 2026-04-17 20:21:04 Functions: 64.7 % 136 88 48

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/io/io_object.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/detail/continuation_op.hpp>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : 
      19                 : #include <atomic>
      20                 : #include <coroutine>
      21                 : #include <cstddef>
      22                 : #include <memory>
      23                 : #include <optional>
      24                 : #include <stop_token>
      25                 : #include <system_error>
      26                 : 
      27                 : #include <errno.h>
      28                 : 
      29                 : #include <netinet/in.h>
      30                 : #include <sys/socket.h>
      31                 : #include <sys/uio.h>
      32                 : 
      33                 : namespace boost::corosio::detail {
      34                 : 
      35                 : /** Base operation for reactor-based backends.
      36                 : 
      37                 :     Holds per-operation state that depends on the concrete backend
      38                 :     socket/acceptor types: coroutine handle, executor, output
      39                 :     pointers, file descriptor, stop_callback, and type-specific
      40                 :     impl pointers.
      41                 : 
      42                 :     Fields shared across all backends (errn, bytes_transferred,
      43                 :     cancelled, impl_ptr, perform_io, complete) live in
      44                 :     reactor_op_base so the scheduler and descriptor_state can
      45                 :     access them without template instantiation.
      46                 : 
      47                 :     @tparam Socket The backend socket impl type (forward-declared).
      48                 :     @tparam Acceptor The backend acceptor impl type (forward-declared).
      49                 : */
      50                 : template<class Socket, class Acceptor>
      51                 : struct reactor_op : reactor_op_base
      52                 : {
      53                 :     /// Stop-token callback that invokes cancel() on the target op.
      54                 :     struct canceller
      55                 :     {
      56                 :         reactor_op* op;
      57 HIT         199 :         void operator()() const noexcept
      58                 :         {
      59             199 :             op->cancel();
      60             199 :         }
      61                 :     };
      62                 : 
      63                 :     /// Caller's coroutine handle to resume on completion.
      64                 :     std::coroutine_handle<> h;
      65                 : 
      66                 :     /// Scheduler-ready continuation for executor dispatch/post (wraps h).
      67                 :     detail::continuation_op cont_op;
      68                 : 
      69                 :     /// Executor for dispatching the completion.
      70                 :     capy::executor_ref ex;
      71                 : 
      72                 :     /// Output pointer for the error code.
      73                 :     std::error_code* ec_out = nullptr;
      74                 : 
      75                 :     /// Output pointer for bytes transferred.
      76                 :     std::size_t* bytes_out = nullptr;
      77                 : 
      78                 :     /// File descriptor this operation targets.
      79                 :     int fd = -1;
      80                 : 
      81                 :     /// Stop-token callback registration.
      82                 :     std::optional<std::stop_callback<canceller>> stop_cb;
      83                 : 
      84                 :     /// Owning socket impl (for stop_token cancellation).
      85                 :     Socket* socket_impl_ = nullptr;
      86                 : 
      87                 :     /// Owning acceptor impl (for stop_token cancellation).
      88                 :     Acceptor* acceptor_impl_ = nullptr;
      89                 : 
      90           75092 :     reactor_op() = default;
      91                 : 
      92                 :     /// Reset operation state for reuse.
      93          476868 :     void reset() noexcept
      94                 :     {
      95          476868 :         fd                = -1;
      96          476868 :         errn              = 0;
      97          476868 :         bytes_transferred = 0;
      98          476868 :         cancelled.store(false, std::memory_order_relaxed);
      99          476868 :         impl_ptr.reset();
     100          476868 :         socket_impl_   = nullptr;
     101          476868 :         acceptor_impl_ = nullptr;
     102          476868 :     }
     103                 : 
     104                 :     /// Return true if this is a read-direction operation.
     105           46009 :     virtual bool is_read_operation() const noexcept
     106                 :     {
     107           46009 :         return false;
     108                 :     }
     109                 : 
     110                 :     /// Cancel this operation via the owning impl.
     111                 :     virtual void cancel() noexcept = 0;
     112                 : 
     113                 :     /// Destroy without invoking.
     114 MIS           0 :     void destroy() override
     115                 :     {
     116               0 :         stop_cb.reset();
     117               0 :         reactor_op_base::destroy();
     118               0 :     }
     119                 : 
     120                 :     /// Arm the stop-token callback for a socket operation.
     121 HIT      100612 :     void start(std::stop_token const& token, Socket* impl)
     122                 :     {
     123          100612 :         cancelled.store(false, std::memory_order_release);
     124          100612 :         stop_cb.reset();
     125          100612 :         socket_impl_   = impl;
     126          100612 :         acceptor_impl_ = nullptr;
     127                 : 
     128          100612 :         if (token.stop_possible())
     129             197 :             stop_cb.emplace(token, canceller{this});
     130          100612 :     }
     131                 : 
     132                 :     /// Arm the stop-token callback for an acceptor operation.
     133            8230 :     void start(std::stop_token const& token, Acceptor* impl)
     134                 :     {
     135            8230 :         cancelled.store(false, std::memory_order_release);
     136            8230 :         stop_cb.reset();
     137            8230 :         socket_impl_   = nullptr;
     138            8230 :         acceptor_impl_ = impl;
     139                 : 
     140            8230 :         if (token.stop_possible())
     141               9 :             stop_cb.emplace(token, canceller{this});
     142            8230 :     }
     143                 : };
     144                 : 
     145                 : /** Shared connect operation.
     146                 : 
     147                 :     Checks SO_ERROR for connect completion status. The operator()()
     148                 :     and cancel() are provided by the concrete backend type.
     149                 : 
     150                 :     @tparam Base The backend's base op type.
     151                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     152                 : */
     153                 : template<class Base, class Endpoint = endpoint>
     154                 : struct reactor_connect_op : Base
     155                 : {
     156                 :     /// Endpoint to connect to.
     157                 :     Endpoint target_endpoint;
     158                 : 
     159                 :     /// Reset operation state for reuse.
     160            8232 :     void reset() noexcept
     161                 :     {
     162            8232 :         Base::reset();
     163            8232 :         target_endpoint = Endpoint{};
     164            8232 :     }
     165                 : 
     166            8217 :     void perform_io() noexcept override
     167                 :     {
     168            8217 :         int err       = 0;
     169            8217 :         socklen_t len = sizeof(err);
     170            8217 :         if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     171 MIS           0 :             err = errno;
     172 HIT        8217 :         this->complete(err, 0);
     173            8217 :     }
     174                 : };
     175                 : 
     176                 : /** Shared scatter-read operation.
     177                 : 
     178                 :     Uses readv() with an EINTR retry loop.
     179                 : 
     180                 :     @tparam Base The backend's base op type.
     181                 : */
     182                 : template<class Base>
     183                 : struct reactor_read_op : Base
     184                 : {
     185                 :     /// Maximum scatter-gather buffer count.
     186                 :     static constexpr std::size_t max_buffers = 16;
     187                 : 
     188                 :     /// Scatter-gather I/O vectors.
     189                 :     iovec iovecs[max_buffers];
     190                 : 
     191                 :     /// Number of active I/O vectors.
     192                 :     int iovec_count = 0;
     193                 : 
     194                 :     /// True for zero-length reads (completed immediately).
     195                 :     bool empty_buffer_read = false;
     196                 : 
     197                 :     /// Return true (this is a read-direction operation).
     198           46041 :     bool is_read_operation() const noexcept override
     199                 :     {
     200           46041 :         return !empty_buffer_read;
     201                 :     }
     202                 : 
     203          230302 :     void reset() noexcept
     204                 :     {
     205          230302 :         Base::reset();
     206          230302 :         iovec_count       = 0;
     207          230302 :         empty_buffer_read = false;
     208          230302 :     }
     209                 : 
     210             326 :     void perform_io() noexcept override
     211                 :     {
     212                 :         ssize_t n;
     213                 :         do
     214                 :         {
     215             326 :             n = ::readv(this->fd, iovecs, iovec_count);
     216                 :         }
     217             326 :         while (n < 0 && errno == EINTR);
     218                 : 
     219             326 :         if (n >= 0)
     220              97 :             this->complete(0, static_cast<std::size_t>(n));
     221                 :         else
     222             229 :             this->complete(errno, 0);
     223             326 :     }
     224                 : };
     225                 : 
     226                 : /** Shared gather-write operation.
     227                 : 
     228                 :     Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
     229                 :     which returns ssize_t (bytes written or -1 with errno set).
     230                 : 
     231                 :     @tparam Base The backend's base op type.
     232                 :     @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
     233                 : */
     234                 : template<class Base, class WritePolicy>
     235                 : struct reactor_write_op : Base
     236                 : {
     237                 :     /// The write syscall policy type.
     238                 :     using write_policy = WritePolicy;
     239                 : 
     240                 :     /// Maximum scatter-gather buffer count.
     241                 :     static constexpr std::size_t max_buffers = 16;
     242                 : 
     243                 :     /// Scatter-gather I/O vectors.
     244                 :     iovec iovecs[max_buffers];
     245                 : 
     246                 :     /// Number of active I/O vectors.
     247                 :     int iovec_count = 0;
     248                 : 
     249          230008 :     void reset() noexcept
     250                 :     {
     251          230008 :         Base::reset();
     252          230008 :         iovec_count = 0;
     253          230008 :     }
     254                 : 
     255 MIS           0 :     void perform_io() noexcept override
     256                 :     {
     257               0 :         ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
     258               0 :         if (n >= 0)
     259               0 :             this->complete(0, static_cast<std::size_t>(n));
     260                 :         else
     261               0 :             this->complete(errno, 0);
     262               0 :     }
     263                 : };
     264                 : 
     265                 : /** Shared accept operation.
     266                 : 
     267                 :     Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
     268                 :     which returns the accepted fd or -1 with errno set.
     269                 : 
     270                 :     @tparam Base The backend's base op type.
     271                 :     @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
     272                 : */
     273                 : template<class Base, class AcceptPolicy>
     274                 : struct reactor_accept_op : Base
     275                 : {
     276                 :     /// File descriptor of the accepted connection.
     277                 :     int accepted_fd = -1;
     278                 : 
     279                 :     /// Pointer to the peer socket implementation.
     280                 :     io_object::implementation* peer_impl = nullptr;
     281                 : 
     282                 :     /// Output pointer for the accepted implementation.
     283                 :     io_object::implementation** impl_out = nullptr;
     284                 : 
     285                 :     /// Peer address storage filled by accept.
     286                 :     sockaddr_storage peer_storage{};
     287                 : 
     288                 :     /// Peer address length returned by accept.
     289                 :     socklen_t peer_addrlen = 0;
     290                 : 
     291 HIT        8230 :     void reset() noexcept
     292                 :     {
     293            8230 :         Base::reset();
     294            8230 :         accepted_fd   = -1;
     295            8230 :         peer_impl     = nullptr;
     296            8230 :         impl_out      = nullptr;
     297            8230 :         peer_storage  = {};
     298            8230 :         peer_addrlen  = 0;
     299            8230 :     }
     300                 : 
     301            8212 :     void perform_io() noexcept override
     302                 :     {
     303            8212 :         int new_fd = AcceptPolicy::do_accept(
     304            8212 :             this->fd, peer_storage, peer_addrlen);
     305            8212 :         if (new_fd >= 0)
     306                 :         {
     307            8212 :             accepted_fd = new_fd;
     308            8212 :             this->complete(0, 0);
     309                 :         }
     310                 :         else
     311                 :         {
     312 MIS           0 :             this->complete(errno, 0);
     313                 :         }
     314 HIT        8212 :     }
     315                 : };
     316                 : 
     317                 : /** Shared connected send operation for datagram sockets.
     318                 : 
     319                 :     Uses sendmsg() with msg_name=nullptr (connected mode).
     320                 : 
     321                 :     @tparam Base The backend's base op type.
     322                 : */
     323                 : template<class Base>
     324                 : struct reactor_send_op : Base
     325                 : {
     326                 :     /// Maximum scatter-gather buffer count.
     327                 :     static constexpr std::size_t max_buffers = 16;
     328                 : 
     329                 :     /// Scatter-gather I/O vectors.
     330                 :     iovec iovecs[max_buffers];
     331                 : 
     332                 :     /// Number of active I/O vectors.
     333                 :     int iovec_count = 0;
     334                 : 
     335                 :     /// User-supplied message flags.
     336                 :     int msg_flags = 0;
     337                 : 
     338              14 :     void reset() noexcept
     339                 :     {
     340              14 :         Base::reset();
     341              14 :         iovec_count = 0;
     342              14 :         msg_flags   = 0;
     343              14 :     }
     344                 : 
     345 MIS           0 :     void perform_io() noexcept override
     346                 :     {
     347               0 :         msghdr msg{};
     348               0 :         msg.msg_iov    = iovecs;
     349               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     350                 : 
     351                 : #ifdef MSG_NOSIGNAL
     352               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     353                 : #else
     354                 :         int send_flags = msg_flags;
     355                 : #endif
     356                 : 
     357                 :         ssize_t n;
     358                 :         do
     359                 :         {
     360               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     361                 :         }
     362               0 :         while (n < 0 && errno == EINTR);
     363                 : 
     364               0 :         if (n >= 0)
     365               0 :             this->complete(0, static_cast<std::size_t>(n));
     366                 :         else
     367               0 :             this->complete(errno, 0);
     368               0 :     }
     369                 : };
     370                 : 
     371                 : /** Shared connected recv operation for datagram sockets.
     372                 : 
     373                 :     Uses recvmsg() with msg_name=nullptr (connected mode).
     374                 :     Unlike reactor_read_op, does not map n==0 to EOF
     375                 :     (zero-length datagrams are valid).
     376                 : 
     377                 :     @tparam Base The backend's base op type.
     378                 : */
     379                 : template<class Base>
     380                 : struct reactor_recv_op : Base
     381                 : {
     382                 :     /// Maximum scatter-gather buffer count.
     383                 :     static constexpr std::size_t max_buffers = 16;
     384                 : 
     385                 :     /// Scatter-gather I/O vectors.
     386                 :     iovec iovecs[max_buffers];
     387                 : 
     388                 :     /// Number of active I/O vectors.
     389                 :     int iovec_count = 0;
     390                 : 
     391                 :     /// User-supplied message flags.
     392                 :     int msg_flags = 0;
     393                 : 
     394                 :     /// Return true (this is a read-direction operation).
     395               0 :     bool is_read_operation() const noexcept override
     396                 :     {
     397               0 :         return true;
     398                 :     }
     399                 : 
     400 HIT          14 :     void reset() noexcept
     401                 :     {
     402              14 :         Base::reset();
     403              14 :         iovec_count = 0;
     404              14 :         msg_flags   = 0;
     405              14 :     }
     406                 : 
     407 MIS           0 :     void perform_io() noexcept override
     408                 :     {
     409               0 :         msghdr msg{};
     410               0 :         msg.msg_iov    = iovecs;
     411               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     412                 : 
     413                 :         ssize_t n;
     414                 :         do
     415                 :         {
     416               0 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     417                 :         }
     418               0 :         while (n < 0 && errno == EINTR);
     419                 : 
     420               0 :         if (n >= 0)
     421               0 :             this->complete(0, static_cast<std::size_t>(n));
     422                 :         else
     423               0 :             this->complete(errno, 0);
     424               0 :     }
     425                 : };
     426                 : 
     427                 : /** Shared send_to operation for datagram sockets.
     428                 : 
     429                 :     Uses sendmsg() with the destination endpoint in msg_name.
     430                 : 
     431                 :     @tparam Base The backend's base op type.
     432                 : */
     433                 : template<class Base>
     434                 : struct reactor_send_to_op : Base
     435                 : {
     436                 :     /// Maximum scatter-gather buffer count.
     437                 :     static constexpr std::size_t max_buffers = 16;
     438                 : 
     439                 :     /// Scatter-gather I/O vectors.
     440                 :     iovec iovecs[max_buffers];
     441                 : 
     442                 :     /// Number of active I/O vectors.
     443                 :     int iovec_count = 0;
     444                 : 
     445                 :     /// Destination address storage.
     446                 :     sockaddr_storage dest_storage{};
     447                 : 
     448                 :     /// Destination address length.
     449                 :     socklen_t dest_len = 0;
     450                 : 
     451                 :     /// User-supplied message flags.
     452                 :     int msg_flags = 0;
     453                 : 
     454 HIT          28 :     void reset() noexcept
     455                 :     {
     456              28 :         Base::reset();
     457              28 :         iovec_count  = 0;
     458              28 :         dest_storage = {};
     459              28 :         dest_len     = 0;
     460              28 :         msg_flags    = 0;
     461              28 :     }
     462                 : 
     463 MIS           0 :     void perform_io() noexcept override
     464                 :     {
     465               0 :         msghdr msg{};
     466               0 :         msg.msg_name    = &dest_storage;
     467               0 :         msg.msg_namelen = dest_len;
     468               0 :         msg.msg_iov     = iovecs;
     469               0 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     470                 : 
     471                 : #ifdef MSG_NOSIGNAL
     472               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     473                 : #else
     474                 :         int send_flags = msg_flags;
     475                 : #endif
     476                 : 
     477                 :         ssize_t n;
     478                 :         do
     479                 :         {
     480               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     481                 :         }
     482               0 :         while (n < 0 && errno == EINTR);
     483                 : 
     484               0 :         if (n >= 0)
     485               0 :             this->complete(0, static_cast<std::size_t>(n));
     486                 :         else
     487               0 :             this->complete(errno, 0);
     488               0 :     }
     489                 : };
     490                 : 
     491                 : /** Shared recv_from operation for datagram sockets.
     492                 : 
     493                 :     Uses recvmsg() with msg_name to capture the source endpoint.
     494                 : 
     495                 :     @tparam Base The backend's base op type.
     496                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     497                 : */
     498                 : template<class Base, class Endpoint = endpoint>
     499                 : struct reactor_recv_from_op : Base
     500                 : {
     501                 :     /// Maximum scatter-gather buffer count.
     502                 :     static constexpr std::size_t max_buffers = 16;
     503                 : 
     504                 :     /// Scatter-gather I/O vectors.
     505                 :     iovec iovecs[max_buffers];
     506                 : 
     507                 :     /// Number of active I/O vectors.
     508                 :     int iovec_count = 0;
     509                 : 
     510                 :     /// Source address storage filled by recvmsg.
     511                 :     sockaddr_storage source_storage{};
     512                 : 
     513                 :     /// Actual source address length returned by recvmsg.
     514                 :     socklen_t source_addrlen = 0;
     515                 : 
     516                 :     /// Output pointer for the source endpoint (set by do_recv_from).
     517                 :     Endpoint* source_out = nullptr;
     518                 : 
     519                 :     /// User-supplied message flags.
     520                 :     int msg_flags = 0;
     521                 : 
     522                 :     /// Return true (this is a read-direction operation).
     523               0 :     bool is_read_operation() const noexcept override
     524                 :     {
     525               0 :         return true;
     526                 :     }
     527                 : 
     528 HIT          40 :     void reset() noexcept
     529                 :     {
     530              40 :         Base::reset();
     531              40 :         iovec_count    = 0;
     532              40 :         source_storage = {};
     533              40 :         source_addrlen = 0;
     534              40 :         source_out     = nullptr;
     535              40 :         msg_flags      = 0;
     536              40 :     }
     537                 : 
     538               2 :     void perform_io() noexcept override
     539                 :     {
     540               2 :         msghdr msg{};
     541               2 :         msg.msg_name    = &source_storage;
     542               2 :         msg.msg_namelen = sizeof(source_storage);
     543               2 :         msg.msg_iov     = iovecs;
     544               2 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     545                 : 
     546                 :         ssize_t n;
     547                 :         do
     548                 :         {
     549               2 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     550                 :         }
     551               2 :         while (n < 0 && errno == EINTR);
     552                 : 
     553               2 :         if (n >= 0)
     554                 :         {
     555               2 :             source_addrlen = msg.msg_namelen;
     556               2 :             this->complete(0, static_cast<std::size_t>(n));
     557                 :         }
     558                 :         else
     559 MIS           0 :             this->complete(errno, 0);
     560 HIT           2 :     }
     561                 : };
     562                 : 
     563                 : } // namespace boost::corosio::detail
     564                 : 
     565                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
        

Generated by: LCOV version 2.3