TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12 :
13 : #include <boost/corosio/detail/dispatch_coro.hpp>
14 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
15 : #include <boost/corosio/native/detail/make_err.hpp>
16 : #include <boost/corosio/io/io_object.hpp>
17 :
18 : #include <coroutine>
19 : #include <mutex>
20 : #include <utility>
21 :
22 : #include <netinet/in.h>
23 : #include <sys/socket.h>
24 : #include <unistd.h>
25 :
26 : namespace boost::corosio::detail {
27 :
28 : /** Complete a base read/write operation.
29 :
30 : Translates the recorded errno and cancellation state into
31 : an error_code, stores the byte count, then resumes the
32 : caller via symmetric transfer.
33 :
34 : @tparam Op The concrete operation type.
35 : @param op The operation to complete.
36 : */
37 : template<typename Op>
38 : void
39 HIT 78933 : complete_io_op(Op& op)
40 : {
41 78933 : op.stop_cb.reset();
42 78933 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
43 :
44 78933 : if (op.cancelled.load(std::memory_order_acquire))
45 306 : *op.ec_out = capy::error::canceled;
46 78627 : else if (op.errn != 0)
47 MIS 0 : *op.ec_out = make_err(op.errn);
48 HIT 78627 : else if (op.is_read_operation() && op.bytes_transferred == 0)
49 MIS 0 : *op.ec_out = capy::error::eof;
50 : else
51 HIT 78627 : *op.ec_out = {};
52 :
53 78933 : *op.bytes_out = op.bytes_transferred;
54 :
55 78933 : op.cont_op.cont.h = op.h;
56 78933 : capy::executor_ref saved_ex(op.ex);
57 78933 : auto prevent = std::move(op.impl_ptr);
58 78933 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
59 78933 : }
60 :
61 : /** Complete a datagram recv operation (connected mode).
62 :
63 : Like complete_io_op but does not translate zero bytes into
64 : EOF. Zero-length datagrams are valid and should be reported
65 : as success with 0 bytes transferred.
66 :
67 : @param op The operation to complete.
68 : */
69 : template<typename Op>
70 : void
71 : complete_dgram_recv_op(Op& op)
72 : {
73 : op.stop_cb.reset();
74 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
75 :
76 : if (op.cancelled.load(std::memory_order_acquire))
77 : *op.ec_out = capy::error::canceled;
78 : else if (op.errn != 0)
79 : *op.ec_out = make_err(op.errn);
80 : else
81 : *op.ec_out = {};
82 :
83 : *op.bytes_out = op.bytes_transferred;
84 :
85 : op.cont_op.cont.h = op.h;
86 : capy::executor_ref saved_ex(op.ex);
87 : auto prevent = std::move(op.impl_ptr);
88 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
89 : }
90 :
91 : /** Complete a wait operation.
92 :
93 : Wait operations report only an error_code — no bytes_transferred,
94 : no EOF translation. Used for socket and acceptor wait() awaitables;
95 : picks the impl pointer set by start() to reach the scheduler.
96 :
97 : @tparam Op The concrete wait operation type.
98 : @param op The operation to complete.
99 : */
100 : template<typename Op>
101 : void
102 14 : complete_wait_op(Op& op)
103 : {
104 14 : op.stop_cb.reset();
105 14 : if (op.socket_impl_)
106 12 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
107 : else
108 2 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
109 :
110 14 : if (op.cancelled.load(std::memory_order_acquire))
111 4 : *op.ec_out = capy::error::canceled;
112 10 : else if (op.errn != 0)
113 MIS 0 : *op.ec_out = make_err(op.errn);
114 : else
115 HIT 10 : *op.ec_out = {};
116 :
117 14 : op.cont_op.cont.h = op.h;
118 14 : capy::executor_ref saved_ex(op.ex);
119 14 : auto prevent = std::move(op.impl_ptr);
120 14 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
121 14 : }
122 :
123 : /** Complete a connect operation with endpoint caching.
124 :
125 : On success, queries the local endpoint via getsockname and
126 : caches both endpoints in the socket impl. Then resumes the
127 : caller via symmetric transfer.
128 :
129 : @tparam Op The concrete connect operation type.
130 : @param op The operation to complete.
131 : */
132 : template<typename Op>
133 : void
134 8439 : complete_connect_op(Op& op)
135 : {
136 8439 : op.stop_cb.reset();
137 8439 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
138 :
139 8439 : bool success =
140 8439 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
141 :
142 8439 : if (success && op.socket_impl_)
143 : {
144 : using ep_type = decltype(op.target_endpoint);
145 8423 : ep_type local_ep;
146 8423 : sockaddr_storage local_storage{};
147 8423 : socklen_t local_len = sizeof(local_storage);
148 8423 : if (::getsockname(
149 : op.fd, reinterpret_cast<sockaddr*>(&local_storage),
150 8423 : &local_len) == 0)
151 8417 : local_ep =
152 8423 : from_sockaddr_as(local_storage, local_len, ep_type{});
153 8423 : op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
154 : }
155 :
156 8439 : if (op.cancelled.load(std::memory_order_acquire))
157 2 : *op.ec_out = capy::error::canceled;
158 8437 : else if (op.errn != 0)
159 14 : *op.ec_out = make_err(op.errn);
160 : else
161 8423 : *op.ec_out = {};
162 :
163 8439 : op.cont_op.cont.h = op.h;
164 8439 : capy::executor_ref saved_ex(op.ex);
165 8439 : auto prevent = std::move(op.impl_ptr);
166 8439 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
167 8439 : }
168 :
169 : /** Construct and register a peer socket from an accepted fd.
170 :
171 : Creates a new socket impl via the acceptor's associated
172 : socket service, registers it with the scheduler, and caches
173 : the local and remote endpoints.
174 :
175 : @tparam SocketImpl The concrete socket implementation type.
176 : @tparam AcceptorImpl The concrete acceptor implementation type.
177 : @param acceptor_impl The acceptor that accepted the connection.
178 : @param accepted_fd The accepted file descriptor (set to -1 on success).
179 : @param peer_storage The peer address from accept().
180 : @param impl_out Output pointer for the new socket impl.
181 : @param ec_out Output pointer for any error.
182 : @return True on success, false on failure.
183 : */
184 : template<typename SocketImpl, typename AcceptorImpl>
185 : bool
186 8411 : setup_accepted_socket(
187 : AcceptorImpl* acceptor_impl,
188 : int& accepted_fd,
189 : sockaddr_storage const& peer_storage,
190 : socklen_t peer_addrlen,
191 : io_object::implementation** impl_out,
192 : std::error_code* ec_out)
193 : {
194 8411 : auto* socket_svc = acceptor_impl->service().stream_service();
195 8411 : if (!socket_svc)
196 : {
197 MIS 0 : *ec_out = make_err(ENOENT);
198 0 : return false;
199 : }
200 :
201 HIT 8411 : auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
202 8411 : impl.set_socket(accepted_fd);
203 :
204 8411 : impl.desc_state_.fd = accepted_fd;
205 : {
206 8411 : std::lock_guard lock(impl.desc_state_.mutex);
207 8411 : impl.desc_state_.read_op = nullptr;
208 8411 : impl.desc_state_.write_op = nullptr;
209 8411 : impl.desc_state_.connect_op = nullptr;
210 8411 : }
211 8411 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
212 :
213 : using ep_type = decltype(acceptor_impl->local_endpoint());
214 8411 : impl.set_endpoints(
215 : acceptor_impl->local_endpoint(),
216 8411 : from_sockaddr_as(
217 : peer_storage,
218 : peer_addrlen,
219 : ep_type{}));
220 :
221 8411 : if (impl_out)
222 8411 : *impl_out = &impl;
223 8411 : accepted_fd = -1;
224 8411 : return true;
225 : }
226 :
227 : /** Complete an accept operation.
228 :
229 : Sets up the peer socket on success, or closes the accepted
230 : fd on failure. Then resumes the caller via symmetric transfer.
231 :
232 : @tparam SocketImpl The concrete socket implementation type.
233 : @tparam Op The concrete accept operation type.
234 : @param op The operation to complete.
235 : */
236 : template<typename SocketImpl, typename Op>
237 : void
238 8423 : complete_accept_op(Op& op)
239 : {
240 8423 : op.stop_cb.reset();
241 8423 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
242 :
243 8423 : bool success =
244 8423 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
245 :
246 8423 : if (op.cancelled.load(std::memory_order_acquire))
247 12 : *op.ec_out = capy::error::canceled;
248 8411 : else if (op.errn != 0)
249 MIS 0 : *op.ec_out = make_err(op.errn);
250 : else
251 HIT 8411 : *op.ec_out = {};
252 :
253 8423 : if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
254 : {
255 8411 : if (!setup_accepted_socket<SocketImpl>(
256 8411 : op.acceptor_impl_, op.accepted_fd, op.peer_storage,
257 : op.peer_addrlen, op.impl_out, op.ec_out))
258 MIS 0 : success = false;
259 : }
260 :
261 HIT 8423 : if (!success || !op.acceptor_impl_)
262 : {
263 12 : if (op.accepted_fd >= 0)
264 : {
265 MIS 0 : ::close(op.accepted_fd);
266 0 : op.accepted_fd = -1;
267 : }
268 HIT 12 : if (op.impl_out)
269 12 : *op.impl_out = nullptr;
270 : }
271 :
272 8423 : op.cont_op.cont.h = op.h;
273 8423 : capy::executor_ref saved_ex(op.ex);
274 8423 : auto prevent = std::move(op.impl_ptr);
275 8423 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
276 8423 : }
277 :
278 : /** Complete a datagram operation (send_to or recv_from).
279 :
280 : For recv_from operations, writes the source endpoint from the
281 : recorded sockaddr_storage into the caller's endpoint pointer.
282 : Then resumes the caller via symmetric transfer.
283 :
284 : @tparam Op The concrete datagram operation type.
285 : @param op The operation to complete.
286 : */
287 : template<typename Op>
288 : void
289 6 : complete_datagram_op(Op& op)
290 : {
291 6 : op.stop_cb.reset();
292 6 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
293 :
294 6 : if (op.cancelled.load(std::memory_order_acquire))
295 2 : *op.ec_out = capy::error::canceled;
296 4 : else if (op.errn != 0)
297 MIS 0 : *op.ec_out = make_err(op.errn);
298 : else
299 HIT 4 : *op.ec_out = {};
300 :
301 6 : *op.bytes_out = op.bytes_transferred;
302 :
303 6 : op.cont_op.cont.h = op.h;
304 6 : capy::executor_ref saved_ex(op.ex);
305 6 : auto prevent = std::move(op.impl_ptr);
306 6 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
307 6 : }
308 :
309 : /** Complete a datagram operation with source endpoint capture.
310 :
311 : For recv_from operations, writes the source endpoint from the
312 : recorded sockaddr_storage into the caller's endpoint pointer.
313 : Then resumes the caller via symmetric transfer.
314 :
315 : @tparam Op The concrete datagram operation type.
316 : @param op The operation to complete.
317 : @param source_out Optional pointer to store source endpoint
318 : (non-null for recv_from, null for send_to).
319 : */
320 : template<typename Op, typename Endpoint>
321 : void
322 18 : complete_datagram_op(Op& op, Endpoint* source_out)
323 : {
324 18 : op.stop_cb.reset();
325 18 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
326 :
327 18 : if (op.cancelled.load(std::memory_order_acquire))
328 6 : *op.ec_out = capy::error::canceled;
329 12 : else if (op.errn != 0)
330 MIS 0 : *op.ec_out = make_err(op.errn);
331 : else
332 HIT 12 : *op.ec_out = {};
333 :
334 18 : *op.bytes_out = op.bytes_transferred;
335 :
336 28 : if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
337 10 : op.errn == 0)
338 20 : *source_out = from_sockaddr_as(
339 10 : op.source_storage,
340 : op.source_addrlen,
341 : Endpoint{});
342 :
343 18 : op.cont_op.cont.h = op.h;
344 18 : capy::executor_ref saved_ex(op.ex);
345 18 : auto prevent = std::move(op.impl_ptr);
346 18 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
347 18 : }
348 :
349 : } // namespace boost::corosio::detail
350 :
351 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
|