TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
12 :
13 : #include <boost/corosio/tcp_acceptor.hpp>
14 : #include <boost/corosio/wait_type.hpp>
15 : #include <boost/corosio/detail/intrusive.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
17 : #include <boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp>
18 : #include <boost/corosio/native/detail/make_err.hpp>
19 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
20 :
21 : #include <memory>
22 : #include <mutex>
23 : #include <utility>
24 :
25 : #include <errno.h>
26 : #include <netinet/in.h>
27 : #include <sys/socket.h>
28 : #include <unistd.h>
29 :
30 : namespace boost::corosio::detail {
31 :
32 : /** CRTP base for reactor-backed acceptor implementations.
33 :
34 : Provides shared data members, trivial virtual overrides, and
35 : non-virtual helper methods for cancellation and close. Concrete
36 : backends inherit and add `cancel()`, `close_socket()`, and
37 : `accept()` overrides that delegate to the `do_*` helpers.
38 :
39 : @tparam Derived The concrete acceptor type (CRTP).
40 : @tparam Service The backend's acceptor service type.
41 : @tparam Op The backend's base op type.
42 : @tparam AcceptOp The backend's accept op type.
43 : @tparam WaitOp The backend's wait op type.
44 : @tparam DescState The backend's descriptor_state type.
45 : @tparam ImplBase The public vtable base
46 : (tcp_acceptor::implementation or
47 : local_stream_acceptor::implementation).
48 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
49 : */
50 : template<
51 : class Derived,
52 : class Service,
53 : class Op,
54 : class AcceptOp,
55 : class WaitOp,
56 : class DescState,
57 : class ImplBase = tcp_acceptor::implementation,
58 : class Endpoint = endpoint>
59 : class reactor_acceptor
60 : : public ImplBase
61 : , public std::enable_shared_from_this<Derived>
62 : , public intrusive_list<Derived>::node
63 : {
64 : friend Derived;
65 :
66 : protected:
67 : // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
68 HIT 195 : explicit reactor_acceptor(Service& svc) noexcept : svc_(svc) {}
69 :
70 : protected:
71 : Service& svc_;
72 : int fd_ = -1;
73 : Endpoint local_endpoint_;
74 :
75 : public:
76 : /// Pending accept operation slot.
77 : AcceptOp acc_;
78 :
79 : /// Pending wait-for-read operation slot.
80 : WaitOp wait_rd_;
81 :
82 : /// Pending wait-for-write operation slot.
83 : WaitOp wait_wr_;
84 :
85 : /// Pending wait-for-error operation slot.
86 : WaitOp wait_er_;
87 :
88 : /// Per-descriptor state for persistent reactor registration.
89 : DescState desc_state_;
90 :
91 195 : ~reactor_acceptor() override = default;
92 :
93 : /// Return the underlying file descriptor.
94 : int native_handle() const noexcept
95 : {
96 : return fd_;
97 : }
98 :
99 : /// Return the cached local endpoint.
100 8573 : Endpoint local_endpoint() const noexcept override
101 : {
102 8573 : return local_endpoint_;
103 : }
104 :
105 : /// Return true if the acceptor has an open file descriptor.
106 9687 : bool is_open() const noexcept override
107 : {
108 9687 : return fd_ >= 0;
109 : }
110 :
111 : /// Set a socket option.
112 171 : std::error_code set_option(
113 : int level,
114 : int optname,
115 : void const* data,
116 : std::size_t size) noexcept override
117 : {
118 171 : if (::setsockopt(
119 171 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
120 MIS 0 : return make_err(errno);
121 HIT 171 : return {};
122 : }
123 :
124 : /// Get a socket option.
125 : std::error_code
126 MIS 0 : get_option(int level, int optname, void* data, std::size_t* size)
127 : const noexcept override
128 : {
129 0 : socklen_t len = static_cast<socklen_t>(*size);
130 0 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
131 0 : return make_err(errno);
132 0 : *size = static_cast<std::size_t>(len);
133 0 : return {};
134 : }
135 :
136 : /// Cache the local endpoint.
137 HIT 180 : void set_local_endpoint(Endpoint ep) noexcept
138 : {
139 180 : local_endpoint_ = std::move(ep);
140 180 : }
141 :
142 : /// Assign the fd and initialize descriptor state for the acceptor.
143 190 : void init_acceptor_fd(int fd) noexcept
144 : {
145 190 : fd_ = fd;
146 190 : desc_state_.fd = fd;
147 : {
148 190 : std::lock_guard lock(desc_state_.mutex);
149 190 : desc_state_.read_op = nullptr;
150 190 : desc_state_.wait_read_op = nullptr;
151 190 : desc_state_.wait_write_op = nullptr;
152 190 : desc_state_.wait_error_op = nullptr;
153 190 : }
154 190 : }
155 :
156 : /// Return a reference to the owning service.
157 8411 : Service& service() noexcept
158 : {
159 8411 : return svc_;
160 : }
161 :
162 6 : void cancel() noexcept override { do_cancel(); }
163 :
164 : /// Close the acceptor (non-virtual, called by the service).
165 770 : void close_socket() noexcept { do_close_socket(); }
166 :
167 2 : std::coroutine_handle<> wait(
168 : std::coroutine_handle<> h,
169 : capy::executor_ref ex,
170 : wait_type w,
171 : std::stop_token token,
172 : std::error_code* ec) override
173 : {
174 2 : return do_wait(h, ex, w, token, ec);
175 : }
176 :
177 : /** Wait for readiness on the listen socket.
178 :
179 : Registers a wait op on the matching event slot. For
180 : `wait_type::read`, completion signals that an incoming
181 : connection is pending and a subsequent accept will
182 : succeed without blocking.
183 : */
184 : std::coroutine_handle<> do_wait(
185 : std::coroutine_handle<>,
186 : capy::executor_ref,
187 : wait_type,
188 : std::stop_token const&,
189 : std::error_code*);
190 :
191 : /** Cancel a single pending operation.
192 :
193 : Claims the operation from the read_op descriptor slot
194 : under the mutex and posts it to the scheduler as cancelled.
195 :
196 : @param op The operation to cancel.
197 : */
198 : void cancel_single_op(Op& op) noexcept;
199 :
200 : /** Cancel the pending accept operation. */
201 : void do_cancel() noexcept;
202 :
203 : /** Close the acceptor and cancel pending operations.
204 :
205 : Invoked by the derived class's close_socket(). The
206 : derived class may add backend-specific cleanup after
207 : calling this method.
208 : */
209 : void do_close_socket() noexcept;
210 :
211 : /** Release the acceptor without closing the fd. */
212 : native_handle_type do_release_socket() noexcept;
213 :
214 : /** Bind the acceptor socket to an endpoint.
215 :
216 : Caches the resolved local endpoint (including ephemeral
217 : port) after a successful bind.
218 :
219 : @param ep The endpoint to bind to.
220 : @return The error code from bind(), or success.
221 : */
222 : std::error_code do_bind(Endpoint const& ep);
223 :
224 : /** Start listening on the acceptor socket.
225 :
226 : Registers the file descriptor with the reactor after
227 : a successful listen() call.
228 :
229 : @param backlog The listen backlog.
230 : @return The error code from listen(), or success.
231 : */
232 : std::error_code do_listen(int backlog);
233 : };
234 :
235 : template<
236 : class Derived,
237 : class Service,
238 : class Op,
239 : class AcceptOp,
240 : class WaitOp,
241 : class DescState,
242 : class ImplBase,
243 : class Endpoint>
244 : void
245 30 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
246 : cancel_single_op(Op& op) noexcept
247 : {
248 30 : auto self = this->weak_from_this().lock();
249 30 : if (!self)
250 MIS 0 : return;
251 :
252 HIT 30 : op.request_cancel();
253 :
254 30 : reactor_op_base* claimed = nullptr;
255 : {
256 30 : std::lock_guard lock(desc_state_.mutex);
257 270 : auto try_claim = [&](reactor_op_base*& slot) {
258 120 : if (!claimed && slot == &op)
259 8 : claimed = std::exchange(slot, nullptr);
260 : };
261 30 : try_claim(desc_state_.read_op);
262 30 : try_claim(desc_state_.wait_read_op);
263 30 : try_claim(desc_state_.wait_write_op);
264 30 : try_claim(desc_state_.wait_error_op);
265 30 : }
266 30 : if (claimed)
267 : {
268 8 : op.impl_ptr = self;
269 8 : svc_.post(&op);
270 8 : svc_.work_finished();
271 : }
272 30 : }
273 :
274 : template<
275 : class Derived,
276 : class Service,
277 : class Op,
278 : class AcceptOp,
279 : class WaitOp,
280 : class DescState,
281 : class ImplBase,
282 : class Endpoint>
283 : void
284 6 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
285 : do_cancel() noexcept
286 : {
287 6 : cancel_single_op(acc_);
288 6 : cancel_single_op(wait_rd_);
289 6 : cancel_single_op(wait_wr_);
290 6 : cancel_single_op(wait_er_);
291 6 : }
292 :
293 : template<
294 : class Derived,
295 : class Service,
296 : class Op,
297 : class AcceptOp,
298 : class WaitOp,
299 : class DescState,
300 : class ImplBase,
301 : class Endpoint>
302 : void
303 770 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
304 : do_close_socket() noexcept
305 : {
306 770 : auto self = this->weak_from_this().lock();
307 770 : if (self)
308 : {
309 770 : acc_.request_cancel();
310 770 : wait_rd_.request_cancel();
311 770 : wait_wr_.request_cancel();
312 770 : wait_er_.request_cancel();
313 :
314 770 : reactor_op_base* claimed_acc = nullptr;
315 770 : reactor_op_base* claimed_wr = nullptr;
316 770 : reactor_op_base* claimed_ww = nullptr;
317 770 : reactor_op_base* claimed_we = nullptr;
318 : {
319 770 : std::lock_guard lock(desc_state_.mutex);
320 770 : claimed_acc = std::exchange(desc_state_.read_op, nullptr);
321 770 : claimed_wr = std::exchange(desc_state_.wait_read_op, nullptr);
322 770 : claimed_ww = std::exchange(desc_state_.wait_write_op, nullptr);
323 770 : claimed_we = std::exchange(desc_state_.wait_error_op, nullptr);
324 770 : desc_state_.read_ready = false;
325 770 : desc_state_.write_ready = false;
326 :
327 770 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
328 MIS 0 : desc_state_.impl_ref_ = self;
329 HIT 770 : }
330 :
331 6930 : auto repost = [&](reactor_op_base* claimed, reactor_op_base& op) {
332 3080 : if (claimed)
333 : {
334 4 : op.impl_ptr = self;
335 4 : svc_.post(&op);
336 4 : svc_.work_finished();
337 : }
338 : };
339 770 : repost(claimed_acc, acc_);
340 770 : repost(claimed_wr, wait_rd_);
341 770 : repost(claimed_ww, wait_wr_);
342 770 : repost(claimed_we, wait_er_);
343 : }
344 :
345 770 : if (fd_ >= 0)
346 : {
347 190 : if (desc_state_.registered_events != 0)
348 160 : svc_.scheduler().deregister_descriptor(fd_);
349 190 : ::close(fd_);
350 190 : fd_ = -1;
351 : }
352 :
353 770 : desc_state_.fd = -1;
354 770 : desc_state_.registered_events = 0;
355 :
356 770 : local_endpoint_ = Endpoint{};
357 770 : }
358 :
359 : template<
360 : class Derived,
361 : class Service,
362 : class Op,
363 : class AcceptOp,
364 : class WaitOp,
365 : class DescState,
366 : class ImplBase,
367 : class Endpoint>
368 : native_handle_type
369 MIS 0 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
370 : do_release_socket() noexcept
371 : {
372 0 : auto self = this->weak_from_this().lock();
373 0 : if (self)
374 : {
375 0 : acc_.request_cancel();
376 0 : wait_rd_.request_cancel();
377 0 : wait_wr_.request_cancel();
378 0 : wait_er_.request_cancel();
379 :
380 0 : reactor_op_base* claimed_acc = nullptr;
381 0 : reactor_op_base* claimed_wr = nullptr;
382 0 : reactor_op_base* claimed_ww = nullptr;
383 0 : reactor_op_base* claimed_we = nullptr;
384 : {
385 0 : std::lock_guard lock(desc_state_.mutex);
386 0 : claimed_acc = std::exchange(desc_state_.read_op, nullptr);
387 0 : claimed_wr = std::exchange(desc_state_.wait_read_op, nullptr);
388 0 : claimed_ww = std::exchange(desc_state_.wait_write_op, nullptr);
389 0 : claimed_we = std::exchange(desc_state_.wait_error_op, nullptr);
390 0 : desc_state_.read_ready = false;
391 0 : desc_state_.write_ready = false;
392 :
393 0 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
394 0 : desc_state_.impl_ref_ = self;
395 0 : }
396 :
397 0 : auto repost = [&](reactor_op_base* claimed, reactor_op_base& op) {
398 0 : if (claimed)
399 : {
400 0 : op.impl_ptr = self;
401 0 : svc_.post(&op);
402 0 : svc_.work_finished();
403 : }
404 : };
405 0 : repost(claimed_acc, acc_);
406 0 : repost(claimed_wr, wait_rd_);
407 0 : repost(claimed_ww, wait_wr_);
408 0 : repost(claimed_we, wait_er_);
409 : }
410 :
411 0 : native_handle_type released = fd_;
412 :
413 0 : if (fd_ >= 0)
414 : {
415 0 : if (desc_state_.registered_events != 0)
416 0 : svc_.scheduler().deregister_descriptor(fd_);
417 0 : fd_ = -1;
418 : }
419 :
420 0 : desc_state_.fd = -1;
421 0 : desc_state_.registered_events = 0;
422 :
423 0 : local_endpoint_ = Endpoint{};
424 :
425 0 : return released;
426 0 : }
427 :
428 : template<
429 : class Derived,
430 : class Service,
431 : class Op,
432 : class AcceptOp,
433 : class WaitOp,
434 : class DescState,
435 : class ImplBase,
436 : class Endpoint>
437 : std::error_code
438 HIT 188 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
439 : do_bind(Endpoint const& ep)
440 : {
441 188 : sockaddr_storage storage{};
442 188 : socklen_t addrlen = to_sockaddr(ep, storage);
443 188 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
444 8 : return make_err(errno);
445 :
446 : // Cache local endpoint (resolves ephemeral port / path)
447 180 : sockaddr_storage local{};
448 180 : socklen_t local_len = sizeof(local);
449 180 : if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local), &local_len) ==
450 : 0)
451 180 : set_local_endpoint(from_sockaddr_as(local, local_len, Endpoint{}));
452 :
453 180 : return {};
454 : }
455 :
456 : template<
457 : class Derived,
458 : class Service,
459 : class Op,
460 : class AcceptOp,
461 : class WaitOp,
462 : class DescState,
463 : class ImplBase,
464 : class Endpoint>
465 : std::error_code
466 160 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
467 : do_listen(int backlog)
468 : {
469 160 : if (::listen(fd_, backlog) < 0)
470 MIS 0 : return make_err(errno);
471 :
472 HIT 160 : svc_.scheduler().register_descriptor(fd_, &desc_state_);
473 160 : return {};
474 : }
475 :
476 : template<
477 : class Derived,
478 : class Service,
479 : class Op,
480 : class AcceptOp,
481 : class WaitOp,
482 : class DescState,
483 : class ImplBase,
484 : class Endpoint>
485 : std::coroutine_handle<>
486 2 : reactor_acceptor<Derived, Service, Op, AcceptOp, WaitOp, DescState, ImplBase, Endpoint>::
487 : do_wait(
488 : std::coroutine_handle<> h,
489 : capy::executor_ref ex,
490 : wait_type w,
491 : std::stop_token const& token,
492 : std::error_code* ec)
493 : {
494 : // wait_type::write completes immediately (see reactor_stream_socket::do_wait).
495 2 : if (w == wait_type::write)
496 : {
497 MIS 0 : auto& op = wait_wr_;
498 0 : op.reset();
499 0 : op.wait_event = reactor_event_write;
500 0 : op.h = h;
501 0 : op.ex = ex;
502 0 : op.ec_out = ec;
503 0 : op.fd = this->fd_;
504 0 : op.start(token, static_cast<Derived*>(this));
505 0 : op.impl_ptr = this->shared_from_this();
506 0 : op.complete(0, 0);
507 0 : svc_.post(&op);
508 0 : return std::noop_coroutine();
509 : }
510 :
511 : WaitOp* op_ptr;
512 : reactor_op_base** desc_slot_ptr;
513 : std::uint32_t event;
514 :
515 HIT 2 : if (w == wait_type::read)
516 : {
517 2 : op_ptr = &wait_rd_;
518 2 : desc_slot_ptr = &desc_state_.wait_read_op;
519 2 : event = reactor_event_read;
520 : }
521 : else // wait_type::error
522 : {
523 MIS 0 : op_ptr = &wait_er_;
524 0 : desc_slot_ptr = &desc_state_.wait_error_op;
525 0 : event = reactor_event_error;
526 : }
527 :
528 HIT 2 : auto& op = *op_ptr;
529 2 : op.reset();
530 2 : op.wait_event = event;
531 2 : op.h = h;
532 2 : op.ex = ex;
533 2 : op.ec_out = ec;
534 2 : op.fd = this->fd_;
535 2 : op.start(token, static_cast<Derived*>(this));
536 2 : op.impl_ptr = this->shared_from_this();
537 :
538 2 : svc_.work_started();
539 :
540 2 : std::lock_guard lock(desc_state_.mutex);
541 2 : if (op.cancelled.load(std::memory_order_acquire))
542 : {
543 MIS 0 : svc_.post(&op);
544 0 : svc_.work_finished();
545 : }
546 : else
547 : {
548 HIT 2 : *desc_slot_ptr = &op;
549 : }
550 2 : return std::noop_coroutine();
551 2 : }
552 :
553 : } // namespace boost::corosio::detail
554 :
555 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP
|