TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
11 : #define BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/except.hpp>
15 : #include <boost/corosio/detail/op_base.hpp>
16 : #include <boost/corosio/wait_type.hpp>
17 : #include <boost/corosio/io/io_object.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/corosio/local_endpoint.hpp>
20 : #include <boost/corosio/local_stream.hpp>
21 : #include <boost/corosio/local_stream_socket.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <boost/capy/ex/execution_context.hpp>
24 : #include <boost/capy/ex/io_env.hpp>
25 : #include <boost/capy/concept/executor.hpp>
26 :
27 : #include <system_error>
28 :
29 : #include <cassert>
30 : #include <concepts>
31 : #include <coroutine>
32 : #include <cstddef>
33 : #include <stop_token>
34 : #include <type_traits>
35 :
36 : namespace boost::corosio {
37 :
38 : /** Options for @ref local_stream_acceptor::bind().
39 :
40 : Controls filesystem cleanup behavior before binding
41 : to a Unix domain socket path.
42 : */
43 : enum class bind_option
44 : {
45 : none,
46 : /// Unlink the socket path before binding (ignored for abstract paths).
47 : unlink_existing
48 : };
49 :
50 : /** An asynchronous Unix domain stream acceptor for coroutine I/O.
51 :
52 : This class provides asynchronous Unix domain stream accept
53 : operations that return awaitable types. The acceptor binds
54 : to a local endpoint (filesystem path or abstract name) and
55 : listens for incoming connections.
56 :
57 : The library does NOT automatically unlink the socket path
58 : on close. Callers are responsible for removing the socket
59 : file before bind (via @ref bind_option::unlink_existing) or
60 : after close.
61 :
62 : @par Thread Safety
63 : Distinct objects: Safe.@n
64 : Shared objects: Unsafe. An acceptor must not have concurrent
65 : accept operations.
66 :
67 : @par Example
68 : @code
69 : io_context ioc;
70 : local_stream_acceptor acc(ioc);
71 : acc.open();
72 : acc.bind(local_endpoint("/tmp/my.sock"),
73 : bind_option::unlink_existing);
74 : acc.listen();
75 : auto [ec, peer] = co_await acc.accept();
76 : @endcode
77 : */
78 : class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
79 : {
80 : struct wait_awaitable
81 : : detail::void_op_base<wait_awaitable>
82 : {
83 : local_stream_acceptor& acc_;
84 : wait_type w_;
85 :
86 : wait_awaitable(local_stream_acceptor& acc, wait_type w) noexcept
87 : : acc_(acc), w_(w) {}
88 :
89 : std::coroutine_handle<> dispatch(
90 : std::coroutine_handle<> h, capy::executor_ref ex) const
91 : {
92 : return acc_.get().wait(h, ex, w_, token_, &ec_);
93 : }
94 : };
95 :
96 : struct move_accept_awaitable
97 : {
98 : local_stream_acceptor& acc_;
99 : std::stop_token token_;
100 : mutable std::error_code ec_;
101 : mutable io_object::implementation* peer_impl_ = nullptr;
102 :
103 HIT 2 : explicit move_accept_awaitable(
104 : local_stream_acceptor& acc) noexcept
105 2 : : acc_(acc)
106 : {
107 2 : }
108 :
109 2 : bool await_ready() const noexcept
110 : {
111 2 : return token_.stop_requested();
112 : }
113 :
114 2 : capy::io_result<local_stream_socket> await_resume() const noexcept
115 : {
116 2 : if (token_.stop_requested())
117 MIS 0 : return {make_error_code(std::errc::operation_canceled),
118 0 : local_stream_socket()};
119 :
120 HIT 2 : if (ec_ || !peer_impl_)
121 MIS 0 : return {ec_, local_stream_socket()};
122 :
123 HIT 2 : local_stream_socket peer(acc_.ctx_);
124 2 : reset_peer_impl(peer, peer_impl_);
125 2 : return {ec_, std::move(peer)};
126 2 : }
127 :
128 2 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
129 : -> std::coroutine_handle<>
130 : {
131 2 : token_ = env->stop_token;
132 6 : return acc_.get().accept(
133 6 : h, env->executor, token_, &ec_, &peer_impl_);
134 : }
135 : };
136 :
137 : struct accept_awaitable
138 : {
139 : local_stream_acceptor& acc_;
140 : local_stream_socket& peer_;
141 : std::stop_token token_;
142 : mutable std::error_code ec_;
143 : mutable io_object::implementation* peer_impl_ = nullptr;
144 :
145 4 : accept_awaitable(
146 : local_stream_acceptor& acc, local_stream_socket& peer) noexcept
147 4 : : acc_(acc)
148 4 : , peer_(peer)
149 : {
150 4 : }
151 :
152 4 : bool await_ready() const noexcept
153 : {
154 4 : return token_.stop_requested();
155 : }
156 :
157 4 : capy::io_result<> await_resume() const noexcept
158 : {
159 4 : if (token_.stop_requested())
160 MIS 0 : return {make_error_code(std::errc::operation_canceled)};
161 :
162 HIT 4 : if (!ec_ && peer_impl_)
163 4 : peer_.h_.reset(peer_impl_);
164 4 : return {ec_};
165 : }
166 :
167 4 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
168 : -> std::coroutine_handle<>
169 : {
170 4 : token_ = env->stop_token;
171 12 : return acc_.get().accept(
172 12 : h, env->executor, token_, &ec_, &peer_impl_);
173 : }
174 : };
175 :
176 : public:
177 : /** Destructor.
178 :
179 : Closes the acceptor if open, cancelling any pending operations.
180 : */
181 : ~local_stream_acceptor() override;
182 :
183 : /** Construct an acceptor from an execution context.
184 :
185 : @param ctx The execution context that will own this acceptor.
186 : */
187 : explicit local_stream_acceptor(capy::execution_context& ctx);
188 :
189 : /** Construct an acceptor from an executor.
190 :
191 : The acceptor is associated with the executor's context.
192 :
193 : @param ex The executor whose context will own the acceptor.
194 :
195 : @tparam Ex A type satisfying @ref capy::Executor. Must not
196 : be `local_stream_acceptor` itself (disables implicit
197 : conversion from move).
198 : */
199 : template<class Ex>
200 : requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_acceptor>) &&
201 : capy::Executor<Ex>
202 : explicit local_stream_acceptor(Ex const& ex) : local_stream_acceptor(ex.context())
203 : {
204 : }
205 :
206 : /** Move constructor.
207 :
208 : Transfers ownership of the acceptor resources.
209 :
210 : @param other The acceptor to move from.
211 :
212 : @pre No awaitables returned by @p other's methods exist.
213 : @pre The execution context associated with @p other must
214 : outlive this acceptor.
215 : */
216 : local_stream_acceptor(local_stream_acceptor&& other) noexcept
217 : : local_stream_acceptor(other.ctx_, std::move(other))
218 : {
219 : }
220 :
221 : /** Move assignment operator.
222 :
223 : Closes any existing acceptor and transfers ownership.
224 : Both acceptors must share the same execution context.
225 :
226 : @param other The acceptor to move from.
227 :
228 : @return Reference to this acceptor.
229 :
230 : @pre `&ctx_ == &other.ctx_` (same execution context).
231 : @pre No awaitables returned by either `*this` or @p other's
232 : methods exist.
233 : */
234 : local_stream_acceptor& operator=(local_stream_acceptor&& other) noexcept
235 : {
236 : assert(&ctx_ == &other.ctx_ &&
237 : "move-assign requires the same execution_context");
238 : if (this != &other)
239 : {
240 : close();
241 : io_object::operator=(std::move(other));
242 : }
243 : return *this;
244 : }
245 :
246 : local_stream_acceptor(local_stream_acceptor const&) = delete;
247 : local_stream_acceptor& operator=(local_stream_acceptor const&) = delete;
248 :
249 : /** Create the acceptor socket.
250 :
251 : @param proto The protocol. Defaults to local_stream{}.
252 :
253 : @throws std::system_error on failure.
254 : */
255 : void open(local_stream proto = {});
256 :
257 : /** Bind to a local endpoint.
258 :
259 : @param ep The local endpoint (path) to bind to.
260 : @param opt Bind options. Pass bind_option::unlink_existing
261 : to unlink the socket path before binding (ignored for
262 : abstract sockets and empty endpoints).
263 :
264 : @return An error code on failure, empty on success.
265 :
266 : @throws std::logic_error if the acceptor is not open.
267 : */
268 : [[nodiscard]] std::error_code
269 : bind(corosio::local_endpoint ep,
270 : bind_option opt = bind_option::none);
271 :
272 : /** Start listening for incoming connections.
273 :
274 : @param backlog The maximum pending connection queue length.
275 :
276 : @return An error code on failure, empty on success.
277 :
278 : @throws std::logic_error if the acceptor is not open.
279 : */
280 : [[nodiscard]] std::error_code listen(int backlog = 128);
281 :
282 : /** Close the acceptor.
283 :
284 : Cancels any pending accept operations and releases the
285 : underlying socket. Has no effect if the acceptor is not
286 : open.
287 :
288 : @post is_open() == false
289 : */
290 : void close();
291 :
292 : /// Check if the acceptor has an open socket handle.
293 54 : bool is_open() const noexcept
294 : {
295 54 : return h_ && get().is_open();
296 : }
297 :
298 : /** Initiate an asynchronous accept into an existing socket.
299 :
300 : Completes when a new connection is available. On success
301 : @p peer is reset to the accepted connection. Only one
302 : accept may be in flight at a time.
303 :
304 : @param peer The socket to receive the accepted connection.
305 :
306 : @par Cancellation
307 : Supports cancellation via stop_token or cancel().
308 : On cancellation, yields `capy::cond::canceled` and
309 : @p peer is not modified.
310 :
311 : @return An awaitable that completes with io_result<>.
312 :
313 : @throws std::logic_error if the acceptor is not open.
314 : */
315 4 : auto accept(local_stream_socket& peer)
316 : {
317 4 : if (!is_open())
318 MIS 0 : detail::throw_logic_error("accept: acceptor not listening");
319 HIT 4 : return accept_awaitable(*this, peer);
320 : }
321 :
322 : /** Wait for an incoming connection or readiness condition.
323 :
324 : Suspends until the listen socket is ready in the
325 : requested direction. For `wait_type::read`, completion
326 : signals that a subsequent @ref accept will succeed
327 : without blocking. No connection is consumed.
328 :
329 : @param w The wait direction.
330 :
331 : @return An awaitable that completes with `io_result<>`.
332 :
333 : @par Preconditions
334 : The acceptor must be listening.
335 : */
336 : [[nodiscard]] auto wait(wait_type w)
337 : {
338 : if (!is_open())
339 : detail::throw_logic_error("wait: acceptor not listening");
340 : return wait_awaitable(*this, w);
341 : }
342 :
343 : /** Initiate an asynchronous accept, returning the socket.
344 :
345 : Completes when a new connection is available. Only one
346 : accept may be in flight at a time.
347 :
348 : @par Cancellation
349 : Supports cancellation via stop_token or cancel().
350 : On cancellation, yields `capy::cond::canceled` with
351 : a default-constructed socket.
352 :
353 : @return An awaitable that completes with
354 : io_result<local_stream_socket>.
355 :
356 : @throws std::logic_error if the acceptor is not open.
357 : */
358 2 : auto accept()
359 : {
360 2 : if (!is_open())
361 MIS 0 : detail::throw_logic_error("accept: acceptor not listening");
362 HIT 2 : return move_accept_awaitable(*this);
363 : }
364 :
365 : /** Cancel pending asynchronous accept operations.
366 :
367 : Outstanding accept operations complete with
368 : @c capy::cond::canceled. Safe to call when no
369 : operations are pending (no-op).
370 : */
371 : void cancel();
372 :
373 : /** Release ownership of the native socket handle.
374 :
375 : Deregisters the acceptor from the reactor and cancels
376 : pending operations without closing the descriptor. The
377 : caller takes ownership of the returned handle.
378 :
379 : @return The native handle.
380 :
381 : @throws std::logic_error if the acceptor is not open.
382 :
383 : @post is_open() == false
384 : */
385 : native_handle_type release();
386 :
387 : /** Return the local endpoint the acceptor is bound to.
388 :
389 : Returns a default-constructed (empty) endpoint if the
390 : acceptor is not open or not yet bound. Safe to call in
391 : any state.
392 : */
393 : corosio::local_endpoint local_endpoint() const noexcept;
394 :
395 : /** Set a socket option on the acceptor.
396 :
397 : Applies a type-safe socket option to the underlying socket.
398 : The option type encodes the protocol level and option name.
399 :
400 : @param opt The option to set.
401 :
402 : @tparam Option A socket option type providing static
403 : `level()` and `name()` members, and `data()` / `size()`
404 : accessors.
405 :
406 : @throws std::logic_error if the acceptor is not open.
407 : @throws std::system_error on failure.
408 : */
409 : template<class Option>
410 : void set_option(Option const& opt)
411 : {
412 : if (!is_open())
413 : detail::throw_logic_error("set_option: acceptor not open");
414 : std::error_code ec = get().set_option(
415 : Option::level(), Option::name(), opt.data(), opt.size());
416 : if (ec)
417 : detail::throw_system_error(ec, "local_stream_acceptor::set_option");
418 : }
419 :
420 : /** Get a socket option from the acceptor.
421 :
422 : Retrieves the current value of a type-safe socket option.
423 :
424 : @return The current option value.
425 :
426 : @tparam Option A socket option type providing static
427 : `level()` and `name()` members, and `data()` / `size()`
428 : / `resize()` members.
429 :
430 : @throws std::logic_error if the acceptor is not open.
431 : @throws std::system_error on failure.
432 : */
433 : template<class Option>
434 : Option get_option() const
435 : {
436 : if (!is_open())
437 : detail::throw_logic_error("get_option: acceptor not open");
438 : Option opt{};
439 : std::size_t sz = opt.size();
440 : std::error_code ec =
441 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
442 : if (ec)
443 : detail::throw_system_error(ec, "local_stream_acceptor::get_option");
444 : opt.resize(sz);
445 : return opt;
446 : }
447 :
448 : /** Backend hooks for local stream acceptor operations.
449 :
450 : Platform backends derive from this to implement
451 : accept, option, and lifecycle management.
452 : */
453 : struct implementation : io_object::implementation
454 : {
455 : /** Initiate an asynchronous accept.
456 :
457 : On completion the backend sets @p *ec and, on
458 : success, stores a pointer to the new socket
459 : implementation in @p *impl_out.
460 :
461 : @param h Coroutine handle to resume.
462 : @param ex Executor for dispatching the completion.
463 : @param token Stop token for cancellation.
464 : @param ec Output error code.
465 : @param impl_out Output pointer for the accepted socket.
466 : @return Coroutine handle to resume immediately.
467 : */
468 : virtual std::coroutine_handle<> accept(
469 : std::coroutine_handle<>,
470 : capy::executor_ref,
471 : std::stop_token,
472 : std::error_code*,
473 : io_object::implementation**) = 0;
474 :
475 : /** Initiate an asynchronous wait for acceptor readiness.
476 :
477 : Completes when the listen socket becomes ready for
478 : the specified direction. No connection is consumed.
479 : */
480 : virtual std::coroutine_handle<> wait(
481 : std::coroutine_handle<> h,
482 : capy::executor_ref ex,
483 : wait_type w,
484 : std::stop_token token,
485 : std::error_code* ec) = 0;
486 :
487 : /// Return the cached local endpoint.
488 : virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
489 :
490 : /// Return whether the underlying socket is open.
491 : virtual bool is_open() const noexcept = 0;
492 :
493 : /// Release and return the native handle without closing.
494 : virtual native_handle_type release_socket() noexcept = 0;
495 :
496 : /// Cancel pending accept operations.
497 : virtual void cancel() noexcept = 0;
498 :
499 : /// Set a raw socket option.
500 : virtual std::error_code set_option(
501 : int level,
502 : int optname,
503 : void const* data,
504 : std::size_t size) noexcept = 0;
505 :
506 : /// Get a raw socket option.
507 : virtual std::error_code
508 : get_option(int level, int optname, void* data, std::size_t* size)
509 : const noexcept = 0;
510 : };
511 :
512 : protected:
513 : local_stream_acceptor(handle h, capy::execution_context& ctx) noexcept
514 : : io_object(std::move(h))
515 : , ctx_(ctx)
516 : {
517 : }
518 :
519 : local_stream_acceptor(
520 : capy::execution_context& ctx, local_stream_acceptor&& other) noexcept
521 : : io_object(std::move(other))
522 : , ctx_(ctx)
523 : {
524 : }
525 :
526 2 : static void reset_peer_impl(
527 : local_stream_socket& peer, io_object::implementation* impl) noexcept
528 : {
529 2 : if (impl)
530 2 : peer.h_.reset(impl);
531 2 : }
532 :
533 : private:
534 : capy::execution_context& ctx_;
535 :
536 60 : inline implementation& get() const noexcept
537 : {
538 60 : return *static_cast<implementation*>(h_.get());
539 : }
540 : };
541 :
542 : } // namespace boost::corosio
543 :
544 : #endif // BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
|