LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_acceptor.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 65.5 % 194 127 67
Test Date: 2026-05-19 19:42:26 Functions: 70.5 % 88 62 26

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
      12                 : 
      13                 : #include <boost/corosio/tcp_acceptor.hpp>
      14                 : #include <boost/corosio/wait_type.hpp>
      15                 : #include <boost/corosio/detail/intrusive.hpp>
      16                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      17                 : #include <boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp>
      18                 : #include <boost/corosio/native/detail/make_err.hpp>
      19                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      20                 : 
      21                 : #include <memory>
      22                 : #include <mutex>
      23                 : #include <utility>
      24                 : 
      25                 : #include <errno.h>
      26                 : #include <netinet/in.h>
      27                 : #include <sys/socket.h>
      28                 : #include <unistd.h>
      29                 : 
      30                 : namespace boost::corosio::detail {
      31                 : 
      32                 : /** CRTP base for reactor-backed acceptor implementations.
      33                 : 
      34                 :     Provides shared data members, trivial virtual overrides, and
      35                 :     non-virtual helper methods for cancellation and close. Concrete
      36                 :     backends inherit and add `cancel()`, `close_socket()`, and
      37                 :     `accept()` overrides that delegate to the `do_*` helpers.
      38                 : 
      39                 :     @tparam Derived   The concrete acceptor type (CRTP).
      40                 :     @tparam Service   The backend's acceptor service type.
      41                 :     @tparam Op        The backend's base op type.
      42                 :     @tparam AcceptOp  The backend's accept op type.
      43                 :     @tparam WaitOp    The backend's wait op type.
      44                 :     @tparam DescState The backend's descriptor_state type.
      45                 :     @tparam ImplBase  The public vtable base
      46                 :                       (tcp_acceptor::implementation or
      47                 :                        local_stream_acceptor::implementation).
      48                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      49                 : */
      50                 : template<
      51                 :     class Derived,
      52                 :     class Service,
      53                 :     class Op,
      54                 :     class AcceptOp,
      55                 :     class WaitOp,
      56                 :     class DescState,
      57                 :     class ImplBase = tcp_acceptor::implementation,
      58                 :     class Endpoint = endpoint>
      59                 : class reactor_acceptor
      60                 :     : public ImplBase
      61                 :     , public std::enable_shared_from_this<Derived>
      62                 :     , public intrusive_list<Derived>::node
      63                 : {
      64                 :     friend Derived;
      65                 : 
      66                 : protected:
      67                 :     // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
      68 HIT         195 :     explicit reactor_acceptor(Service& svc) noexcept : svc_(svc) {}
      69                 : 
      70                 : protected:
      71                 :     Service& svc_;
      72                 :     int fd_ = -1;
      73                 :     Endpoint local_endpoint_;
      74                 : 
      75                 : public:
      76                 :     /// Pending accept operation slot.
      77                 :     AcceptOp acc_;
      78                 : 
      79                 :     /// Pending wait-for-read operation slot.
      80                 :     WaitOp wait_rd_;
      81                 : 
      82                 :     /// Pending wait-for-write operation slot.
      83                 :     WaitOp wait_wr_;
      84                 : 
      85                 :     /// Pending wait-for-error operation slot.
      86                 :     WaitOp wait_er_;
      87                 : 
      88                 :     /// Per-descriptor state for persistent reactor registration.
      89                 :     DescState desc_state_;
      90                 : 
      91             195 :     ~reactor_acceptor() override = default;
      92                 : 
      93                 :     /// Return the underlying file descriptor.
      94                 :     int native_handle() const noexcept
      95                 :     {
      96                 :         return fd_;
      97                 :     }
      98                 : 
      99                 :     /// Return the cached local endpoint.
     100            8573 :     Endpoint local_endpoint() const noexcept override
     101                 :     {
     102            8573 :         return local_endpoint_;
     103                 :     }
     104                 : 
     105                 :     /// Return true if the acceptor has an open file descriptor.
     106            9687 :     bool is_open() const noexcept override
     107                 :     {
     108            9687 :         return fd_ >= 0;
     109                 :     }
     110                 : 
     111                 :     /// Set a socket option.
     112             171 :     std::error_code set_option(
     113                 :         int level,
     114                 :         int optname,
     115                 :         void const* data,
     116                 :         std::size_t size) noexcept override
     117                 :     {
     118             171 :         if (::setsockopt(
     119             171 :                 fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
     120 MIS           0 :             return make_err(errno);
     121 HIT         171 :         return {};
     122                 :     }
     123                 : 
     124                 :     /// Get a socket option.
     125                 :     std::error_code
     126 MIS           0 :     get_option(int level, int optname, void* data, std::size_t* size)
     127                 :         const noexcept override
     128                 :     {
     129               0 :         socklen_t len = static_cast<socklen_t>(*size);
     130               0 :         if (::getsockopt(fd_, level, optname, data, &len) != 0)
     131               0 :             return make_err(errno);
     132               0 :         *size = static_cast<std::size_t>(len);
     133               0 :         return {};
     134                 :     }
     135                 : 
     136                 :     /// Cache the local endpoint.
     137 HIT         180 :     void set_local_endpoint(Endpoint ep) noexcept
     138                 :     {
     139             180 :         local_endpoint_ = std::move(ep);
     140             180 :     }
     141                 : 
     142                 :     /// Assign the fd and initialize descriptor state for the acceptor.
     143             190 :     void init_acceptor_fd(int fd) noexcept
     144                 :     {
     145             190 :         fd_ = fd;
     146             190 :         desc_state_.fd = fd;
     147                 :         {
     148             190 :             std::lock_guard lock(desc_state_.mutex);
     149             190 :             desc_state_.read_op       = nullptr;
     150             190 :             desc_state_.wait_read_op  = nullptr;
     151             190 :             desc_state_.wait_write_op = nullptr;
     152             190 :             desc_state_.wait_error_op = nullptr;
     153             190 :         }
     154             190 :     }
     155                 : 
     156                 :     /// Return a reference to the owning service.
     157            8411 :     Service& service() noexcept
     158                 :     {
     159            8411 :         return svc_;
     160                 :     }
     161                 : 
     162               6 :     void cancel() noexcept override { do_cancel(); }
     163                 : 
     164                 :     /// Close the acceptor (non-virtual, called by the service).
     165             770 :     void close_socket() noexcept { do_close_socket(); }
     166                 : 
     167               2 :     std::coroutine_handle<> wait(
     168                 :         std::coroutine_handle<> h,
     169                 :         capy::executor_ref ex,
     170                 :         wait_type w,
     171                 :         std::stop_token token,
     172                 :         std::error_code* ec) override
     173                 :     {
     174               2 :         return do_wait(h, ex, w, token, ec);
     175                 :     }
     176                 : 
     177                 :     /** Wait for readiness on the listen socket.
     178                 : 
     179                 :         Registers a wait op on the matching event slot. For
     180                 :         `wait_type::read`, completion signals that an incoming
     181                 :         connection is pending and a subsequent accept will
     182                 :         succeed without blocking.
     183                 :     */
     184                 :     std::coroutine_handle<> do_wait(
     185                 :         std::coroutine_handle<>,
     186                 :         capy::executor_ref,
     187                 :         wait_type,
     188                 :         std::stop_token const&,
     189                 :         std::error_code*);
     190                 : 
     191                 :     /** Cancel a single pending operation.
     192                 : 
     193                 :         Claims the operation from the read_op descriptor slot
     194                 :         under the mutex and posts it to the scheduler as cancelled.
     195                 : 
     196                 :         @param op The operation to cancel.
     197                 :     */
     198                 :     void cancel_single_op(Op& op) noexcept;
     199                 : 
     200                 :     /** Cancel the pending accept operation. */
     201                 :     void do_cancel() noexcept;
     202                 : 
     203                 :     /** Close the acceptor and cancel pending operations.
     204                 : 
     205                 :         Invoked by the derived class's close_socket(). The
     206                 :         derived class may add backend-specific cleanup after
     207                 :         calling this method.
     208                 :     */
     209                 :     void do_close_socket() noexcept;
     210                 : 
     211                 :     /** Release the acceptor without closing the fd. */
     212                 :     native_handle_type do_release_socket() noexcept;
     213                 : 
     214                 :     /** Bind the acceptor socket to an endpoint.
     215                 : 
     216                 :         Caches the resolved local endpoint (including ephemeral
     217                 :         port) after a successful bind.
     218                 : 
     219                 :         @param ep The endpoint to bind to.
     220                 :         @return The error code from bind(), or success.
     221                 :     */
     222                 :     std::error_code do_bind(Endpoint const& ep);
     223                 : 
     224                 :     /** Start listening on the acceptor socket.
     225                 : 
     226                 :         Registers the file descriptor with the reactor after
     227                 :         a successful listen() call.
     228                 : 
     229                 :         @param backlog The listen backlog.
     230                 :         @return The error code from listen(), or success.
     231                 :     */
     232                 :     std::error_code do_listen(int backlog);
     233                 : };
     234                 : 
     235                 : template<
     236                 :     class Derived,
     237                 :     class Service,
     238                 :     class Op,
     239                 :     class AcceptOp,
     240                 :     class WaitOp,
     241                 :     class DescState,
     242                 :     class ImplBase,
     243                 :     class Endpoint>
     244                 : void
     245              30 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
     246                 :     cancel_single_op(Op& op) noexcept
     247                 : {
     248              30 :     auto self = this->weak_from_this().lock();
     249              30 :     if (!self)
     250 MIS           0 :         return;
     251                 : 
     252 HIT          30 :     op.request_cancel();
     253                 : 
     254              30 :     reactor_op_base* claimed = nullptr;
     255                 :     {
     256              30 :         std::lock_guard lock(desc_state_.mutex);
     257             270 :         auto try_claim = [&](reactor_op_base*& slot) {
     258             120 :             if (!claimed && slot == &op)
     259               8 :                 claimed = std::exchange(slot, nullptr);
     260                 :         };
     261              30 :         try_claim(desc_state_.read_op);
     262              30 :         try_claim(desc_state_.wait_read_op);
     263              30 :         try_claim(desc_state_.wait_write_op);
     264              30 :         try_claim(desc_state_.wait_error_op);
     265              30 :     }
     266              30 :     if (claimed)
     267                 :     {
     268               8 :         op.impl_ptr = self;
     269               8 :         svc_.post(&op);
     270               8 :         svc_.work_finished();
     271                 :     }
     272              30 : }
     273                 : 
     274                 : template<
     275                 :     class Derived,
     276                 :     class Service,
     277                 :     class Op,
     278                 :     class AcceptOp,
     279                 :     class WaitOp,
     280                 :     class DescState,
     281                 :     class ImplBase,
     282                 :     class Endpoint>
     283                 : void
     284               6 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
     285                 :     do_cancel() noexcept
     286                 : {
     287               6 :     cancel_single_op(acc_);
     288               6 :     cancel_single_op(wait_rd_);
     289               6 :     cancel_single_op(wait_wr_);
     290               6 :     cancel_single_op(wait_er_);
     291               6 : }
     292                 : 
     293                 : template<
     294                 :     class Derived,
     295                 :     class Service,
     296                 :     class Op,
     297                 :     class AcceptOp,
     298                 :     class WaitOp,
     299                 :     class DescState,
     300                 :     class ImplBase,
     301                 :     class Endpoint>
     302                 : void
     303             770 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
     304                 :     do_close_socket() noexcept
     305                 : {
     306             770 :     auto self = this->weak_from_this().lock();
     307             770 :     if (self)
     308                 :     {
     309             770 :         acc_.request_cancel();
     310             770 :         wait_rd_.request_cancel();
     311             770 :         wait_wr_.request_cancel();
     312             770 :         wait_er_.request_cancel();
     313                 : 
     314             770 :         reactor_op_base* claimed_acc = nullptr;
     315             770 :         reactor_op_base* claimed_wr  = nullptr;
     316             770 :         reactor_op_base* claimed_ww  = nullptr;
     317             770 :         reactor_op_base* claimed_we  = nullptr;
     318                 :         {
     319             770 :             std::lock_guard lock(desc_state_.mutex);
     320             770 :             claimed_acc = std::exchange(desc_state_.read_op, nullptr);
     321             770 :             claimed_wr  = std::exchange(desc_state_.wait_read_op, nullptr);
     322             770 :             claimed_ww  = std::exchange(desc_state_.wait_write_op, nullptr);
     323             770 :             claimed_we  = std::exchange(desc_state_.wait_error_op, nullptr);
     324             770 :             desc_state_.read_ready  = false;
     325             770 :             desc_state_.write_ready = false;
     326                 : 
     327             770 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     328 MIS           0 :                 desc_state_.impl_ref_ = self;
     329 HIT         770 :         }
     330                 : 
     331            6930 :         auto repost = [&](reactor_op_base* claimed, reactor_op_base& op) {
     332            3080 :             if (claimed)
     333                 :             {
     334               4 :                 op.impl_ptr = self;
     335               4 :                 svc_.post(&op);
     336               4 :                 svc_.work_finished();
     337                 :             }
     338                 :         };
     339             770 :         repost(claimed_acc, acc_);
     340             770 :         repost(claimed_wr, wait_rd_);
     341             770 :         repost(claimed_ww, wait_wr_);
     342             770 :         repost(claimed_we, wait_er_);
     343                 :     }
     344                 : 
     345             770 :     if (fd_ >= 0)
     346                 :     {
     347             190 :         if (desc_state_.registered_events != 0)
     348             160 :             svc_.scheduler().deregister_descriptor(fd_);
     349             190 :         ::close(fd_);
     350             190 :         fd_ = -1;
     351                 :     }
     352                 : 
     353             770 :     desc_state_.fd                = -1;
     354             770 :     desc_state_.registered_events = 0;
     355                 : 
     356             770 :     local_endpoint_ = Endpoint{};
     357             770 : }
     358                 : 
     359                 : template<
     360                 :     class Derived,
     361                 :     class Service,
     362                 :     class Op,
     363                 :     class AcceptOp,
     364                 :     class WaitOp,
     365                 :     class DescState,
     366                 :     class ImplBase,
     367                 :     class Endpoint>
     368                 : native_handle_type
     369 MIS           0 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
     370                 :     do_release_socket() noexcept
     371                 : {
     372               0 :     auto self = this->weak_from_this().lock();
     373               0 :     if (self)
     374                 :     {
     375               0 :         acc_.request_cancel();
     376               0 :         wait_rd_.request_cancel();
     377               0 :         wait_wr_.request_cancel();
     378               0 :         wait_er_.request_cancel();
     379                 : 
     380               0 :         reactor_op_base* claimed_acc = nullptr;
     381               0 :         reactor_op_base* claimed_wr  = nullptr;
     382               0 :         reactor_op_base* claimed_ww  = nullptr;
     383               0 :         reactor_op_base* claimed_we  = nullptr;
     384                 :         {
     385               0 :             std::lock_guard lock(desc_state_.mutex);
     386               0 :             claimed_acc = std::exchange(desc_state_.read_op, nullptr);
     387               0 :             claimed_wr  = std::exchange(desc_state_.wait_read_op, nullptr);
     388               0 :             claimed_ww  = std::exchange(desc_state_.wait_write_op, nullptr);
     389               0 :             claimed_we  = std::exchange(desc_state_.wait_error_op, nullptr);
     390               0 :             desc_state_.read_ready  = false;
     391               0 :             desc_state_.write_ready = false;
     392                 : 
     393               0 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     394               0 :                 desc_state_.impl_ref_ = self;
     395               0 :         }
     396                 : 
     397               0 :         auto repost = [&](reactor_op_base* claimed, reactor_op_base& op) {
     398               0 :             if (claimed)
     399                 :             {
     400               0 :                 op.impl_ptr = self;
     401               0 :                 svc_.post(&op);
     402               0 :                 svc_.work_finished();
     403                 :             }
     404                 :         };
     405               0 :         repost(claimed_acc, acc_);
     406               0 :         repost(claimed_wr, wait_rd_);
     407               0 :         repost(claimed_ww, wait_wr_);
     408               0 :         repost(claimed_we, wait_er_);
     409                 :     }
     410                 : 
     411               0 :     native_handle_type released = fd_;
     412                 : 
     413               0 :     if (fd_ >= 0)
     414                 :     {
     415               0 :         if (desc_state_.registered_events != 0)
     416               0 :             svc_.scheduler().deregister_descriptor(fd_);
     417               0 :         fd_ = -1;
     418                 :     }
     419                 : 
     420               0 :     desc_state_.fd                = -1;
     421               0 :     desc_state_.registered_events = 0;
     422                 : 
     423               0 :     local_endpoint_ = Endpoint{};
     424                 : 
     425               0 :     return released;
     426               0 : }
     427                 : 
     428                 : template<
     429                 :     class Derived,
     430                 :     class Service,
     431                 :     class Op,
     432                 :     class AcceptOp,
     433                 :     class WaitOp,
     434                 :     class DescState,
     435                 :     class ImplBase,
     436                 :     class Endpoint>
     437                 : std::error_code
     438 HIT         188 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
     439                 :     do_bind(Endpoint const& ep)
     440                 : {
     441             188 :     sockaddr_storage storage{};
     442             188 :     socklen_t addrlen = to_sockaddr(ep, storage);
     443             188 :     if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
     444               8 :         return make_err(errno);
     445                 : 
     446                 :     // Cache local endpoint (resolves ephemeral port / path)
     447             180 :     sockaddr_storage local{};
     448             180 :     socklen_t local_len = sizeof(local);
     449             180 :     if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local), &local_len) ==
     450                 :         0)
     451             180 :         set_local_endpoint(from_sockaddr_as(local, local_len, Endpoint{}));
     452                 : 
     453             180 :     return {};
     454                 : }
     455                 : 
     456                 : template<
     457                 :     class Derived,
     458                 :     class Service,
     459                 :     class Op,
     460                 :     class AcceptOp,
     461                 :     class WaitOp,
     462                 :     class DescState,
     463                 :     class ImplBase,
     464                 :     class Endpoint>
     465                 : std::error_code
     466             160 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
     467                 :     do_listen(int backlog)
     468                 : {
     469             160 :     if (::listen(fd_, backlog) < 0)
     470 MIS           0 :         return make_err(errno);
     471                 : 
     472 HIT         160 :     svc_.scheduler().register_descriptor(fd_, &desc_state_);
     473             160 :     return {};
     474                 : }
     475                 : 
     476                 : template<
     477                 :     class Derived,
     478                 :     class Service,
     479                 :     class Op,
     480                 :     class AcceptOp,
     481                 :     class WaitOp,
     482                 :     class DescState,
     483                 :     class ImplBase,
     484                 :     class Endpoint>
     485                 : std::coroutine_handle<>
     486               2 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
     487                 :     do_wait(
     488                 :         std::coroutine_handle<> h,
     489                 :         capy::executor_ref ex,
     490                 :         wait_type w,
     491                 :         std::stop_token const& token,
     492                 :         std::error_code* ec)
     493                 : {
     494                 :     // wait_type::write completes immediately (see reactor_stream_socket::do_wait).
     495               2 :     if (w == wait_type::write)
     496                 :     {
     497 MIS           0 :         auto& op = wait_wr_;
     498               0 :         op.reset();
     499               0 :         op.wait_event = reactor_event_write;
     500               0 :         op.h          = h;
     501               0 :         op.ex         = ex;
     502               0 :         op.ec_out     = ec;
     503               0 :         op.fd         = this->fd_;
     504               0 :         op.start(token, static_cast<Derived*>(this));
     505               0 :         op.impl_ptr   = this->shared_from_this();
     506               0 :         op.complete(0, 0);
     507               0 :         svc_.post(&op);
     508               0 :         return std::noop_coroutine();
     509                 :     }
     510                 : 
     511                 :     WaitOp* op_ptr;
     512                 :     reactor_op_base** desc_slot_ptr;
     513                 :     std::uint32_t event;
     514                 : 
     515 HIT           2 :     if (w == wait_type::read)
     516                 :     {
     517               2 :         op_ptr        = &wait_rd_;
     518               2 :         desc_slot_ptr = &desc_state_.wait_read_op;
     519               2 :         event         = reactor_event_read;
     520                 :     }
     521                 :     else // wait_type::error
     522                 :     {
     523 MIS           0 :         op_ptr        = &wait_er_;
     524               0 :         desc_slot_ptr = &desc_state_.wait_error_op;
     525               0 :         event         = reactor_event_error;
     526                 :     }
     527                 : 
     528 HIT           2 :     auto& op      = *op_ptr;
     529               2 :     op.reset();
     530               2 :     op.wait_event = event;
     531               2 :     op.h          = h;
     532               2 :     op.ex         = ex;
     533               2 :     op.ec_out     = ec;
     534               2 :     op.fd         = this->fd_;
     535               2 :     op.start(token, static_cast<Derived*>(this));
     536               2 :     op.impl_ptr   = this->shared_from_this();
     537                 : 
     538               2 :     svc_.work_started();
     539                 : 
     540               2 :     std::lock_guard lock(desc_state_.mutex);
     541               2 :     if (op.cancelled.load(std::memory_order_acquire))
     542                 :     {
     543 MIS           0 :         svc_.post(&op);
     544               0 :         svc_.work_finished();
     545                 :     }
     546                 :     else
     547                 :     {
     548 HIT           2 :         *desc_slot_ptr = &op;
     549                 :     }
     550               2 :     return std::noop_coroutine();
     551               2 : }
     552                 : 
     553                 : } // namespace boost::corosio::detail
     554                 : 
     555                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
        

Generated by: LCOV version 2.3