LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 67.3 % 171 115 56
Test Date: 2026-05-19 19:42:26 Functions: 58.8 % 160 94 66

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/io/io_object.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/detail/continuation_op.hpp>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : 
      19                 : #include <atomic>
      20                 : #include <coroutine>
      21                 : #include <cstddef>
      22                 : #include <memory>
      23                 : #include <optional>
      24                 : #include <stop_token>
      25                 : #include <system_error>
      26                 : 
      27                 : #include <errno.h>
      28                 : 
      29                 : #include <netinet/in.h>
      30                 : #include <sys/socket.h>
      31                 : #include <sys/uio.h>
      32                 : 
      33                 : namespace boost::corosio::detail {
      34                 : 
      35                 : /** Base operation for reactor-based backends.
      36                 : 
      37                 :     Holds per-operation state that depends on the concrete backend
      38                 :     socket/acceptor types: coroutine handle, executor, output
      39                 :     pointers, file descriptor, stop_callback, and type-specific
      40                 :     impl pointers.
      41                 : 
      42                 :     Fields shared across all backends (errn, bytes_transferred,
      43                 :     cancelled, impl_ptr, perform_io, complete) live in
      44                 :     reactor_op_base so the scheduler and descriptor_state can
      45                 :     access them without template instantiation.
      46                 : 
      47                 :     @tparam Socket The backend socket impl type (forward-declared).
      48                 :     @tparam Acceptor The backend acceptor impl type (forward-declared).
      49                 : */
      50                 : template<class Socket, class Acceptor>
      51                 : struct reactor_op : reactor_op_base
      52                 : {
      53                 :     /// Stop-token callback that invokes cancel() on the target op.
      54                 :     struct canceller
      55                 :     {
      56                 :         reactor_op* op;
      57 HIT         199 :         void operator()() const noexcept
      58                 :         {
      59             199 :             op->cancel();
      60             199 :         }
      61                 :     };
      62                 : 
      63                 :     /// Caller's coroutine handle to resume on completion.
      64                 :     std::coroutine_handle<> h;
      65                 : 
      66                 :     /// Scheduler-ready continuation for executor dispatch/post (wraps h).
      67                 :     detail::continuation_op cont_op;
      68                 : 
      69                 :     /// Executor for dispatching the completion.
      70                 :     capy::executor_ref ex;
      71                 : 
      72                 :     /// Output pointer for the error code.
      73                 :     std::error_code* ec_out = nullptr;
      74                 : 
      75                 :     /// Output pointer for bytes transferred.
      76                 :     std::size_t* bytes_out = nullptr;
      77                 : 
      78                 :     /// File descriptor this operation targets.
      79                 :     int fd = -1;
      80                 : 
      81                 :     /// Stop-token callback registration.
      82                 :     std::optional<std::stop_callback<canceller>> stop_cb;
      83                 : 
      84                 :     /// Owning socket impl (for stop_token cancellation).
      85                 :     Socket* socket_impl_ = nullptr;
      86                 : 
      87                 :     /// Owning acceptor impl (for stop_token cancellation).
      88                 :     Acceptor* acceptor_impl_ = nullptr;
      89                 : 
      90          154020 :     reactor_op() = default;
      91                 : 
      92                 :     /// Reset operation state for reuse.
      93          410145 :     void reset() noexcept
      94                 :     {
      95          410145 :         fd                = -1;
      96          410145 :         errn              = 0;
      97          410145 :         bytes_transferred = 0;
      98          410145 :         cancelled.store(false, std::memory_order_relaxed);
      99          410145 :         impl_ptr.reset();
     100          410145 :         socket_impl_   = nullptr;
     101          410145 :         acceptor_impl_ = nullptr;
     102          410145 :     }
     103                 : 
     104                 :     /// Return true if this is a read-direction operation.
     105           39300 :     virtual bool is_read_operation() const noexcept
     106                 :     {
     107           39300 :         return false;
     108                 :     }
     109                 : 
     110                 :     /// Cancel this operation via the owning impl.
     111                 :     virtual void cancel() noexcept = 0;
     112                 : 
     113                 :     /// Destroy without invoking.
     114 MIS           0 :     void destroy() override
     115                 :     {
     116               0 :         stop_cb.reset();
     117               0 :         reactor_op_base::destroy();
     118               0 :     }
     119                 : 
     120                 :     /// Arm the stop-token callback for a socket operation.
     121 HIT       87408 :     void start(std::stop_token const& token, Socket* impl)
     122                 :     {
     123           87408 :         cancelled.store(false, std::memory_order_release);
     124           87408 :         stop_cb.reset();
     125           87408 :         socket_impl_   = impl;
     126           87408 :         acceptor_impl_ = nullptr;
     127                 : 
     128           87408 :         if (token.stop_possible())
     129             197 :             stop_cb.emplace(token, canceller{this});
     130           87408 :     }
     131                 : 
     132                 :     /// Arm the stop-token callback for an acceptor operation.
     133            8427 :     void start(std::stop_token const& token, Acceptor* impl)
     134                 :     {
     135            8427 :         cancelled.store(false, std::memory_order_release);
     136            8427 :         stop_cb.reset();
     137            8427 :         socket_impl_   = nullptr;
     138            8427 :         acceptor_impl_ = impl;
     139                 : 
     140            8427 :         if (token.stop_possible())
     141               9 :             stop_cb.emplace(token, canceller{this});
     142            8427 :     }
     143                 : };
     144                 : 
     145                 : /** Shared connect operation.
     146                 : 
     147                 :     Checks SO_ERROR for connect completion status. The operator()()
     148                 :     and cancel() are provided by the concrete backend type.
     149                 : 
     150                 :     @tparam Base The backend's base op type.
     151                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     152                 : */
     153                 : template<class Base, class Endpoint = endpoint>
     154                 : struct reactor_connect_op : Base
     155                 : {
     156                 :     /// Endpoint to connect to.
     157                 :     Endpoint target_endpoint;
     158                 : 
     159                 :     /// Reset operation state for reuse.
     160            8439 :     void reset() noexcept
     161                 :     {
     162            8439 :         Base::reset();
     163            8439 :         target_endpoint = Endpoint{};
     164            8439 :     }
     165                 : 
     166            8415 :     void perform_io() noexcept override
     167                 :     {
     168            8415 :         int err       = 0;
     169            8415 :         socklen_t len = sizeof(err);
     170            8415 :         if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     171 MIS           0 :             err = errno;
     172 HIT        8415 :         this->complete(err, 0);
     173            8415 :     }
     174                 : };
     175                 : 
     176                 : /** Readiness-only wait operation.
     177                 : 
     178                 :     Does not perform any I/O syscall. Completion is signalled by
     179                 :     the reactor delivering the requested edge event; reactor_descriptor_state
     180                 :     calls complete() directly and never invokes perform_io().
     181                 : 
     182                 :     @tparam Base The backend's base op type.
     183                 : */
     184                 : template<class Base>
     185                 : struct reactor_wait_op : Base
     186                 : {
     187                 :     /* Mirror of reactor_event_read from reactor_descriptor_state.hpp.
     188                 :        Including that header from here would create an include cycle
     189                 :        (descriptor_state -> reactor_op_base; reactor_op -> reactor_op_base),
     190                 :        so we carry the value locally. Both must stay in sync. */
     191                 :     static constexpr std::uint32_t read_event = 0x001;
     192                 : 
     193                 :     /// Which event bit this wait targets (reactor_event_read/write/error).
     194                 :     std::uint32_t wait_event = 0;
     195                 : 
     196              14 :     void reset() noexcept
     197                 :     {
     198              14 :         Base::reset();
     199              14 :         wait_event = 0;
     200              14 :     }
     201                 : 
     202 MIS           0 :     bool is_read_operation() const noexcept override
     203                 :     {
     204               0 :         return wait_event == read_event;
     205                 :     }
     206                 : 
     207                 :     /* perform_io() should never be called for a wait op — readiness
     208                 :        IS the completion. Overridden here to satisfy the virtual and
     209                 :        produce a safe result if called defensively. */
     210               0 :     void perform_io() noexcept override
     211                 :     {
     212               0 :         this->complete(0, 0);
     213               0 :     }
     214                 : };
     215                 : 
     216                 : /** Shared scatter-read operation.
     217                 : 
     218                 :     Uses readv() with an EINTR retry loop.
     219                 : 
     220                 :     @tparam Base The backend's base op type.
     221                 : */
     222                 : template<class Base>
     223                 : struct reactor_read_op : Base
     224                 : {
     225                 :     /// Maximum scatter-gather buffer count.
     226                 :     static constexpr std::size_t max_buffers = 16;
     227                 : 
     228                 :     /// Scatter-gather I/O vectors.
     229                 :     iovec iovecs[max_buffers];
     230                 : 
     231                 :     /// Number of active I/O vectors.
     232                 :     int iovec_count = 0;
     233                 : 
     234                 :     /// True for zero-length reads (completed immediately).
     235                 :     bool empty_buffer_read = false;
     236                 : 
     237                 :     /// Return true (this is a read-direction operation).
     238 HIT       39327 :     bool is_read_operation() const noexcept override
     239                 :     {
     240           39327 :         return !empty_buffer_read;
     241                 :     }
     242                 : 
     243          196731 :     void reset() noexcept
     244                 :     {
     245          196731 :         Base::reset();
     246          196731 :         iovec_count       = 0;
     247          196731 :         empty_buffer_read = false;
     248          196731 :     }
     249                 : 
     250             327 :     void perform_io() noexcept override
     251                 :     {
     252                 :         ssize_t n;
     253                 :         do
     254                 :         {
     255             327 :             n = ::readv(this->fd, iovecs, iovec_count);
     256                 :         }
     257             327 :         while (n < 0 && errno == EINTR);
     258                 : 
     259             327 :         if (n >= 0)
     260              98 :             this->complete(0, static_cast<std::size_t>(n));
     261                 :         else
     262             229 :             this->complete(errno, 0);
     263             327 :     }
     264                 : };
     265                 : 
     266                 : /** Shared gather-write operation.
     267                 : 
     268                 :     Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
     269                 :     which returns ssize_t (bytes written or -1 with errno set).
     270                 : 
     271                 :     @tparam Base The backend's base op type.
     272                 :     @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
     273                 : */
     274                 : template<class Base, class WritePolicy>
     275                 : struct reactor_write_op : Base
     276                 : {
     277                 :     /// The write syscall policy type.
     278                 :     using write_policy = WritePolicy;
     279                 : 
     280                 :     /// Maximum scatter-gather buffer count.
     281                 :     static constexpr std::size_t max_buffers = 16;
     282                 : 
     283                 :     /// Scatter-gather I/O vectors.
     284                 :     iovec iovecs[max_buffers];
     285                 : 
     286                 :     /// Number of active I/O vectors.
     287                 :     int iovec_count = 0;
     288                 : 
     289          196438 :     void reset() noexcept
     290                 :     {
     291          196438 :         Base::reset();
     292          196438 :         iovec_count = 0;
     293          196438 :     }
     294                 : 
     295 MIS           0 :     void perform_io() noexcept override
     296                 :     {
     297               0 :         ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
     298               0 :         if (n >= 0)
     299               0 :             this->complete(0, static_cast<std::size_t>(n));
     300                 :         else
     301               0 :             this->complete(errno, 0);
     302               0 :     }
     303                 : };
     304                 : 
     305                 : /** Shared accept operation.
     306                 : 
     307                 :     Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
     308                 :     which returns the accepted fd or -1 with errno set.
     309                 : 
     310                 :     @tparam Base The backend's base op type.
     311                 :     @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
     312                 : */
     313                 : template<class Base, class AcceptPolicy>
     314                 : struct reactor_accept_op : Base
     315                 : {
     316                 :     /// File descriptor of the accepted connection.
     317                 :     int accepted_fd = -1;
     318                 : 
     319                 :     /// Pointer to the peer socket implementation.
     320                 :     io_object::implementation* peer_impl = nullptr;
     321                 : 
     322                 :     /// Output pointer for the accepted implementation.
     323                 :     io_object::implementation** impl_out = nullptr;
     324                 : 
     325                 :     /// Peer address storage filled by accept.
     326                 :     sockaddr_storage peer_storage{};
     327                 : 
     328                 :     /// Peer address length returned by accept.
     329                 :     socklen_t peer_addrlen = 0;
     330                 : 
     331 HIT        8425 :     void reset() noexcept
     332                 :     {
     333            8425 :         Base::reset();
     334            8425 :         accepted_fd   = -1;
     335            8425 :         peer_impl     = nullptr;
     336            8425 :         impl_out      = nullptr;
     337            8425 :         peer_storage  = {};
     338            8425 :         peer_addrlen  = 0;
     339            8425 :     }
     340                 : 
     341            8405 :     void perform_io() noexcept override
     342                 :     {
     343            8405 :         int new_fd = AcceptPolicy::do_accept(
     344            8405 :             this->fd, peer_storage, peer_addrlen);
     345            8405 :         if (new_fd >= 0)
     346                 :         {
     347            8405 :             accepted_fd = new_fd;
     348            8405 :             this->complete(0, 0);
     349                 :         }
     350                 :         else
     351                 :         {
     352 MIS           0 :             this->complete(errno, 0);
     353                 :         }
     354 HIT        8405 :     }
     355                 : };
     356                 : 
     357                 : /** Shared connected send operation for datagram sockets.
     358                 : 
     359                 :     Uses sendmsg() with msg_name=nullptr (connected mode).
     360                 : 
     361                 :     @tparam Base The backend's base op type.
     362                 : */
     363                 : template<class Base>
     364                 : struct reactor_send_op : Base
     365                 : {
     366                 :     /// Maximum scatter-gather buffer count.
     367                 :     static constexpr std::size_t max_buffers = 16;
     368                 : 
     369                 :     /// Scatter-gather I/O vectors.
     370                 :     iovec iovecs[max_buffers];
     371                 : 
     372                 :     /// Number of active I/O vectors.
     373                 :     int iovec_count = 0;
     374                 : 
     375                 :     /// User-supplied message flags.
     376                 :     int msg_flags = 0;
     377                 : 
     378              14 :     void reset() noexcept
     379                 :     {
     380              14 :         Base::reset();
     381              14 :         iovec_count = 0;
     382              14 :         msg_flags   = 0;
     383              14 :     }
     384                 : 
     385 MIS           0 :     void perform_io() noexcept override
     386                 :     {
     387               0 :         msghdr msg{};
     388               0 :         msg.msg_iov    = iovecs;
     389               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     390                 : 
     391                 : #ifdef MSG_NOSIGNAL
     392               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     393                 : #else
     394                 :         int send_flags = msg_flags;
     395                 : #endif
     396                 : 
     397                 :         ssize_t n;
     398                 :         do
     399                 :         {
     400               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     401                 :         }
     402               0 :         while (n < 0 && errno == EINTR);
     403                 : 
     404               0 :         if (n >= 0)
     405               0 :             this->complete(0, static_cast<std::size_t>(n));
     406                 :         else
     407               0 :             this->complete(errno, 0);
     408               0 :     }
     409                 : };
     410                 : 
     411                 : /** Shared connected recv operation for datagram sockets.
     412                 : 
     413                 :     Uses recvmsg() with msg_name=nullptr (connected mode).
     414                 :     Unlike reactor_read_op, does not map n==0 to EOF
     415                 :     (zero-length datagrams are valid).
     416                 : 
     417                 :     @tparam Base The backend's base op type.
     418                 : */
     419                 : template<class Base>
     420                 : struct reactor_recv_op : Base
     421                 : {
     422                 :     /// Maximum scatter-gather buffer count.
     423                 :     static constexpr std::size_t max_buffers = 16;
     424                 : 
     425                 :     /// Scatter-gather I/O vectors.
     426                 :     iovec iovecs[max_buffers];
     427                 : 
     428                 :     /// Number of active I/O vectors.
     429                 :     int iovec_count = 0;
     430                 : 
     431                 :     /// User-supplied message flags.
     432                 :     int msg_flags = 0;
     433                 : 
     434                 :     /// Return true (this is a read-direction operation).
     435               0 :     bool is_read_operation() const noexcept override
     436                 :     {
     437               0 :         return true;
     438                 :     }
     439                 : 
     440 HIT          14 :     void reset() noexcept
     441                 :     {
     442              14 :         Base::reset();
     443              14 :         iovec_count = 0;
     444              14 :         msg_flags   = 0;
     445              14 :     }
     446                 : 
     447 MIS           0 :     void perform_io() noexcept override
     448                 :     {
     449               0 :         msghdr msg{};
     450               0 :         msg.msg_iov    = iovecs;
     451               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     452                 : 
     453                 :         ssize_t n;
     454                 :         do
     455                 :         {
     456               0 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     457                 :         }
     458               0 :         while (n < 0 && errno == EINTR);
     459                 : 
     460               0 :         if (n >= 0)
     461               0 :             this->complete(0, static_cast<std::size_t>(n));
     462                 :         else
     463               0 :             this->complete(errno, 0);
     464               0 :     }
     465                 : };
     466                 : 
     467                 : /** Shared send_to operation for datagram sockets.
     468                 : 
     469                 :     Uses sendmsg() with the destination endpoint in msg_name.
     470                 : 
     471                 :     @tparam Base The backend's base op type.
     472                 : */
     473                 : template<class Base>
     474                 : struct reactor_send_to_op : Base
     475                 : {
     476                 :     /// Maximum scatter-gather buffer count.
     477                 :     static constexpr std::size_t max_buffers = 16;
     478                 : 
     479                 :     /// Scatter-gather I/O vectors.
     480                 :     iovec iovecs[max_buffers];
     481                 : 
     482                 :     /// Number of active I/O vectors.
     483                 :     int iovec_count = 0;
     484                 : 
     485                 :     /// Destination address storage.
     486                 :     sockaddr_storage dest_storage{};
     487                 : 
     488                 :     /// Destination address length.
     489                 :     socklen_t dest_len = 0;
     490                 : 
     491                 :     /// User-supplied message flags.
     492                 :     int msg_flags = 0;
     493                 : 
     494 HIT          30 :     void reset() noexcept
     495                 :     {
     496              30 :         Base::reset();
     497              30 :         iovec_count  = 0;
     498              30 :         dest_storage = {};
     499              30 :         dest_len     = 0;
     500              30 :         msg_flags    = 0;
     501              30 :     }
     502                 : 
     503 MIS           0 :     void perform_io() noexcept override
     504                 :     {
     505               0 :         msghdr msg{};
     506               0 :         msg.msg_name    = &dest_storage;
     507               0 :         msg.msg_namelen = dest_len;
     508               0 :         msg.msg_iov     = iovecs;
     509               0 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     510                 : 
     511                 : #ifdef MSG_NOSIGNAL
     512               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     513                 : #else
     514                 :         int send_flags = msg_flags;
     515                 : #endif
     516                 : 
     517                 :         ssize_t n;
     518                 :         do
     519                 :         {
     520               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     521                 :         }
     522               0 :         while (n < 0 && errno == EINTR);
     523                 : 
     524               0 :         if (n >= 0)
     525               0 :             this->complete(0, static_cast<std::size_t>(n));
     526                 :         else
     527               0 :             this->complete(errno, 0);
     528               0 :     }
     529                 : };
     530                 : 
     531                 : /** Shared recv_from operation for datagram sockets.
     532                 : 
     533                 :     Uses recvmsg() with msg_name to capture the source endpoint.
     534                 : 
     535                 :     @tparam Base The backend's base op type.
     536                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     537                 : */
     538                 : template<class Base, class Endpoint = endpoint>
     539                 : struct reactor_recv_from_op : Base
     540                 : {
     541                 :     /// Maximum scatter-gather buffer count.
     542                 :     static constexpr std::size_t max_buffers = 16;
     543                 : 
     544                 :     /// Scatter-gather I/O vectors.
     545                 :     iovec iovecs[max_buffers];
     546                 : 
     547                 :     /// Number of active I/O vectors.
     548                 :     int iovec_count = 0;
     549                 : 
     550                 :     /// Source address storage filled by recvmsg.
     551                 :     sockaddr_storage source_storage{};
     552                 : 
     553                 :     /// Actual source address length returned by recvmsg.
     554                 :     socklen_t source_addrlen = 0;
     555                 : 
     556                 :     /// Output pointer for the source endpoint (set by do_recv_from).
     557                 :     Endpoint* source_out = nullptr;
     558                 : 
     559                 :     /// User-supplied message flags.
     560                 :     int msg_flags = 0;
     561                 : 
     562                 :     /// Return true (this is a read-direction operation).
     563               0 :     bool is_read_operation() const noexcept override
     564                 :     {
     565               0 :         return true;
     566                 :     }
     567                 : 
     568 HIT          40 :     void reset() noexcept
     569                 :     {
     570              40 :         Base::reset();
     571              40 :         iovec_count    = 0;
     572              40 :         source_storage = {};
     573              40 :         source_addrlen = 0;
     574              40 :         source_out     = nullptr;
     575              40 :         msg_flags      = 0;
     576              40 :     }
     577                 : 
     578               2 :     void perform_io() noexcept override
     579                 :     {
     580               2 :         msghdr msg{};
     581               2 :         msg.msg_name    = &source_storage;
     582               2 :         msg.msg_namelen = sizeof(source_storage);
     583               2 :         msg.msg_iov     = iovecs;
     584               2 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     585                 : 
     586                 :         ssize_t n;
     587                 :         do
     588                 :         {
     589               2 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     590                 :         }
     591               2 :         while (n < 0 && errno == EINTR);
     592                 : 
     593               2 :         if (n >= 0)
     594                 :         {
     595               2 :             source_addrlen = msg.msg_namelen;
     596               2 :             this->complete(0, static_cast<std::size_t>(n));
     597                 :         }
     598                 :         else
     599 MIS           0 :             this->complete(errno, 0);
     600 HIT           2 :     }
     601                 : };
     602                 : 
     603                 : } // namespace boost::corosio::detail
     604                 : 
     605                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
        

Generated by: LCOV version 2.3