TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/shutdown_type.hpp>
15 : #include <boost/corosio/wait_type.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
17 : #include <boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp>
18 : #include <boost/corosio/detail/dispatch_coro.hpp>
19 : #include <boost/capy/buffers.hpp>
20 :
21 : #include <coroutine>
22 :
23 : #include <errno.h>
24 : #include <sys/socket.h>
25 : #include <sys/uio.h>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : /** CRTP base for reactor-backed stream socket implementations.
30 :
31 : Inherits shared data members and cancel/close/register logic
32 : from reactor_basic_socket. Adds the stream-specific remote
33 : endpoint, shutdown, and I/O dispatch (connect, read, write, wait).
34 :
35 : @tparam Derived The concrete socket type (CRTP).
36 : @tparam Service The backend's socket service type.
37 : @tparam ConnOp The backend's connect op type.
38 : @tparam ReadOp The backend's read op type.
39 : @tparam WriteOp The backend's write op type.
40 : @tparam WaitOp The backend's wait op type.
41 : @tparam DescState The backend's descriptor_state type.
42 : @tparam ImplBase The public vtable base
43 : (tcp_socket::implementation or
44 : local_stream_socket::implementation).
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class Service,
50 : class ConnOp,
51 : class ReadOp,
52 : class WriteOp,
53 : class WaitOp,
54 : class DescState,
55 : class ImplBase = tcp_socket::implementation,
56 : class Endpoint = endpoint>
57 : class reactor_stream_socket
58 : : public reactor_basic_socket<
59 : Derived,
60 : ImplBase,
61 : Service,
62 : DescState,
63 : Endpoint>
64 : {
65 : using base_type = reactor_basic_socket<
66 : Derived,
67 : ImplBase,
68 : Service,
69 : DescState,
70 : Endpoint>;
71 : using self_type = reactor_stream_socket<
72 : Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp,
73 : DescState, ImplBase, Endpoint>;
74 : friend base_type;
75 : friend Derived;
76 :
77 : protected:
78 : // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
79 HIT 25372 : explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
80 :
81 : protected:
82 : Endpoint remote_endpoint_;
83 :
84 : public:
85 : /// Pending connect operation slot.
86 : ConnOp conn_;
87 :
88 : /// Pending read operation slot.
89 : ReadOp rd_;
90 :
91 : /// Pending write operation slot.
92 : WriteOp wr_;
93 :
94 : /// Pending wait-for-read operation slot.
95 : WaitOp wait_rd_;
96 :
97 : /// Pending wait-for-write operation slot.
98 : WaitOp wait_wr_;
99 :
100 : /// Pending wait-for-error operation slot.
101 : WaitOp wait_er_;
102 :
103 25372 : ~reactor_stream_socket() override = default;
104 :
105 : /// Return the cached remote endpoint.
106 46 : Endpoint remote_endpoint() const noexcept override
107 : {
108 46 : return remote_endpoint_;
109 : }
110 :
111 : // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
112 :
113 8429 : std::coroutine_handle<> connect(
114 : std::coroutine_handle<> h,
115 : capy::executor_ref ex,
116 : Endpoint ep,
117 : std::stop_token token,
118 : std::error_code* ec) override
119 : {
120 8429 : return do_connect(h, ex, ep, token, ec);
121 : }
122 :
123 196731 : std::coroutine_handle<> read_some(
124 : std::coroutine_handle<> h,
125 : capy::executor_ref ex,
126 : buffer_param param,
127 : std::stop_token token,
128 : std::error_code* ec,
129 : std::size_t* bytes_out) override
130 : {
131 196731 : return do_read_some(h, ex, param, token, ec, bytes_out);
132 : }
133 :
134 196438 : std::coroutine_handle<> write_some(
135 : std::coroutine_handle<> h,
136 : capy::executor_ref ex,
137 : buffer_param param,
138 : std::stop_token token,
139 : std::error_code* ec,
140 : std::size_t* bytes_out) override
141 : {
142 196438 : return do_write_some(h, ex, param, token, ec, bytes_out);
143 : }
144 :
145 8 : std::coroutine_handle<> wait(
146 : std::coroutine_handle<> h,
147 : capy::executor_ref ex,
148 : wait_type w,
149 : std::stop_token token,
150 : std::error_code* ec) override
151 : {
152 8 : return do_wait(h, ex, w, token, ec);
153 : }
154 :
155 : std::error_code
156 6 : shutdown(corosio::shutdown_type what) noexcept override
157 : {
158 6 : return do_shutdown(static_cast<int>(what));
159 : }
160 :
161 190 : void cancel() noexcept override
162 : {
163 190 : this->do_cancel();
164 190 : }
165 :
166 : // --- End virtual overrides ---
167 :
168 : /// Close the socket (non-virtual, called by the service).
169 : void close_socket() noexcept
170 : {
171 : this->do_close_socket();
172 : }
173 :
174 : /** Shut down part or all of the full-duplex connection.
175 :
176 : @param what 0 = receive, 1 = send, 2 = both.
177 : */
178 6 : std::error_code do_shutdown(int what) noexcept
179 : {
180 : int how;
181 6 : switch (what)
182 : {
183 2 : case 0: // shutdown_receive
184 2 : how = SHUT_RD;
185 2 : break;
186 2 : case 1: // shutdown_send
187 2 : how = SHUT_WR;
188 2 : break;
189 2 : case 2: // shutdown_both
190 2 : how = SHUT_RDWR;
191 2 : break;
192 MIS 0 : default:
193 0 : return make_err(EINVAL);
194 : }
195 HIT 6 : if (::shutdown(this->fd_, how) != 0)
196 MIS 0 : return make_err(errno);
197 HIT 6 : return {};
198 : }
199 :
200 : /// Cache local and remote endpoints.
201 16842 : void set_endpoints(Endpoint local, Endpoint remote) noexcept
202 : {
203 16842 : this->local_endpoint_ = std::move(local);
204 16842 : remote_endpoint_ = std::move(remote);
205 16842 : }
206 :
207 : /** Shared connect dispatch.
208 :
209 : Tries the connect syscall speculatively. On synchronous
210 : completion, returns via inline budget or posts through queue.
211 : On EINPROGRESS, registers with the reactor.
212 : */
213 : std::coroutine_handle<> do_connect(
214 : std::coroutine_handle<>,
215 : capy::executor_ref,
216 : Endpoint const&,
217 : std::stop_token const&,
218 : std::error_code*);
219 :
220 : /** Shared scatter-read dispatch.
221 :
222 : Tries readv() speculatively. On success or hard error,
223 : returns via inline budget or posts through queue.
224 : On EAGAIN, registers with the reactor.
225 : */
226 : std::coroutine_handle<> do_read_some(
227 : std::coroutine_handle<>,
228 : capy::executor_ref,
229 : buffer_param,
230 : std::stop_token const&,
231 : std::error_code*,
232 : std::size_t*);
233 :
234 : /** Shared gather-write dispatch.
235 :
236 : Tries the write via WriteOp::write_policy speculatively.
237 : On success or hard error, returns via inline budget or
238 : posts through queue. On EAGAIN, registers with the reactor.
239 : */
240 : std::coroutine_handle<> do_write_some(
241 : std::coroutine_handle<>,
242 : capy::executor_ref,
243 : buffer_param,
244 : std::stop_token const&,
245 : std::error_code*,
246 : std::size_t*);
247 :
248 : /** Shared readiness-wait dispatch.
249 :
250 : Registers a wait op for the requested direction. Does not
251 : perform any I/O syscall — completion is signalled when the
252 : reactor delivers the matching edge event.
253 : */
254 : std::coroutine_handle<> do_wait(
255 : std::coroutine_handle<>,
256 : capy::executor_ref,
257 : wait_type,
258 : std::stop_token const&,
259 : std::error_code*);
260 :
261 : /** Close the socket and cancel pending operations.
262 :
263 : Extends the base do_close_socket() to also reset
264 : the remote endpoint.
265 : */
266 76133 : void do_close_socket() noexcept
267 : {
268 76133 : base_type::do_close_socket();
269 76133 : remote_endpoint_ = Endpoint{};
270 76133 : }
271 :
272 : private:
273 : // CRTP callbacks for reactor_basic_socket cancel/close
274 :
275 : template<class Op>
276 191 : reactor_op_base** op_to_desc_slot(Op& op) noexcept
277 : {
278 191 : if (&op == static_cast<void*>(&conn_))
279 MIS 0 : return &this->desc_state_.connect_op;
280 HIT 191 : if (&op == static_cast<void*>(&rd_))
281 191 : return &this->desc_state_.read_op;
282 MIS 0 : if (&op == static_cast<void*>(&wr_))
283 0 : return &this->desc_state_.write_op;
284 0 : if (&op == static_cast<void*>(&wait_rd_))
285 0 : return &this->desc_state_.wait_read_op;
286 0 : if (&op == static_cast<void*>(&wait_wr_))
287 0 : return &this->desc_state_.wait_write_op;
288 0 : if (&op == static_cast<void*>(&wait_er_))
289 0 : return &this->desc_state_.wait_error_op;
290 0 : return nullptr;
291 : }
292 :
293 : template<class Op>
294 0 : bool* op_to_cancel_flag(Op& op) noexcept
295 : {
296 0 : if (&op == static_cast<void*>(&conn_))
297 0 : return &this->desc_state_.connect_cancel_pending;
298 0 : if (&op == static_cast<void*>(&rd_))
299 0 : return &this->desc_state_.read_cancel_pending;
300 0 : if (&op == static_cast<void*>(&wr_))
301 0 : return &this->desc_state_.write_cancel_pending;
302 0 : if (&op == static_cast<void*>(&wait_rd_))
303 0 : return &this->desc_state_.wait_read_cancel_pending;
304 0 : if (&op == static_cast<void*>(&wait_wr_))
305 0 : return &this->desc_state_.wait_write_cancel_pending;
306 0 : if (&op == static_cast<void*>(&wait_er_))
307 0 : return &this->desc_state_.wait_error_cancel_pending;
308 0 : return nullptr;
309 : }
310 :
311 : template<class Fn>
312 HIT 76325 : void for_each_op(Fn fn) noexcept
313 : {
314 76325 : fn(conn_);
315 76325 : fn(rd_);
316 76325 : fn(wr_);
317 76325 : fn(wait_rd_);
318 76325 : fn(wait_wr_);
319 76325 : fn(wait_er_);
320 76325 : }
321 :
322 : template<class Fn>
323 76325 : void for_each_desc_entry(Fn fn) noexcept
324 : {
325 76325 : fn(conn_, this->desc_state_.connect_op);
326 76325 : fn(rd_, this->desc_state_.read_op);
327 76325 : fn(wr_, this->desc_state_.write_op);
328 76325 : fn(wait_rd_, this->desc_state_.wait_read_op);
329 76325 : fn(wait_wr_, this->desc_state_.wait_write_op);
330 76325 : fn(wait_er_, this->desc_state_.wait_error_op);
331 76325 : }
332 : };
333 :
334 : template<
335 : class Derived,
336 : class Service,
337 : class ConnOp,
338 : class ReadOp,
339 : class WriteOp,
340 : class WaitOp,
341 : class DescState,
342 : class ImplBase,
343 : class Endpoint>
344 : std::coroutine_handle<>
345 8429 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
346 : do_connect(
347 : std::coroutine_handle<> h,
348 : capy::executor_ref ex,
349 : Endpoint const& ep,
350 : std::stop_token const& token,
351 : std::error_code* ec)
352 : {
353 8429 : auto& op = conn_;
354 :
355 8429 : sockaddr_storage storage{};
356 8429 : socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
357 : int result =
358 8429 : ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
359 :
360 8429 : if (result == 0)
361 : {
362 6 : sockaddr_storage local_storage{};
363 6 : socklen_t local_len = sizeof(local_storage);
364 6 : if (::getsockname(
365 : this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
366 6 : &local_len) == 0)
367 MIS 0 : this->local_endpoint_ =
368 HIT 6 : from_sockaddr_as(local_storage, local_len, Endpoint{});
369 6 : remote_endpoint_ = ep;
370 : }
371 :
372 8429 : if (result == 0 || errno != EINPROGRESS)
373 : {
374 6 : int err = (result < 0) ? errno : 0;
375 6 : if (this->svc_.scheduler().try_consume_inline_budget())
376 : {
377 MIS 0 : *ec = err ? make_err(err) : std::error_code{};
378 0 : op.cont_op.cont.h = h;
379 0 : return dispatch_coro(ex, op.cont_op.cont);
380 : }
381 HIT 6 : op.reset();
382 6 : op.h = h;
383 6 : op.ex = ex;
384 6 : op.ec_out = ec;
385 6 : op.fd = this->fd_;
386 6 : op.target_endpoint = ep;
387 6 : op.start(token, static_cast<Derived*>(this));
388 6 : op.impl_ptr = this->shared_from_this();
389 6 : op.complete(err, 0);
390 6 : this->svc_.post(&op);
391 6 : return std::noop_coroutine();
392 : }
393 :
394 : // EINPROGRESS — register with reactor
395 8423 : op.reset();
396 8423 : op.h = h;
397 8423 : op.ex = ex;
398 8423 : op.ec_out = ec;
399 8423 : op.fd = this->fd_;
400 8423 : op.target_endpoint = ep;
401 8423 : op.start(token, static_cast<Derived*>(this));
402 8423 : op.impl_ptr = this->shared_from_this();
403 :
404 8423 : this->register_op(
405 8423 : op, this->desc_state_.connect_op, this->desc_state_.write_ready,
406 8423 : this->desc_state_.connect_cancel_pending, true);
407 8423 : return std::noop_coroutine();
408 : }
409 :
410 : template<
411 : class Derived,
412 : class Service,
413 : class ConnOp,
414 : class ReadOp,
415 : class WriteOp,
416 : class WaitOp,
417 : class DescState,
418 : class ImplBase,
419 : class Endpoint>
420 : std::coroutine_handle<>
421 196731 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
422 : do_read_some(
423 : std::coroutine_handle<> h,
424 : capy::executor_ref ex,
425 : buffer_param param,
426 : std::stop_token const& token,
427 : std::error_code* ec,
428 : std::size_t* bytes_out)
429 : {
430 196731 : auto& op = rd_;
431 196731 : op.reset();
432 :
433 196731 : capy::mutable_buffer bufs[ReadOp::max_buffers];
434 196731 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
435 :
436 196731 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
437 : {
438 2 : op.empty_buffer_read = true;
439 2 : op.h = h;
440 2 : op.ex = ex;
441 2 : op.ec_out = ec;
442 2 : op.bytes_out = bytes_out;
443 2 : op.start(token, static_cast<Derived*>(this));
444 2 : op.impl_ptr = this->shared_from_this();
445 2 : op.complete(0, 0);
446 2 : this->svc_.post(&op);
447 2 : return std::noop_coroutine();
448 : }
449 :
450 393458 : for (int i = 0; i < op.iovec_count; ++i)
451 : {
452 196729 : op.iovecs[i].iov_base = bufs[i].data();
453 196729 : op.iovecs[i].iov_len = bufs[i].size();
454 : }
455 :
456 : // Speculative read; for the single-buffer case use recv() so the
457 : // kernel skips the readv iov_iter setup.
458 : ssize_t n;
459 196729 : if (op.iovec_count == 1)
460 : {
461 : do
462 : {
463 196729 : n = ::recv(this->fd_, bufs[0].data(), bufs[0].size(), 0);
464 : }
465 196729 : while (n < 0 && errno == EINTR);
466 : }
467 : else
468 : {
469 : do
470 : {
471 MIS 0 : n = ::readv(this->fd_, op.iovecs, op.iovec_count);
472 : }
473 0 : while (n < 0 && errno == EINTR);
474 : }
475 :
476 HIT 196729 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
477 : {
478 196341 : int err = (n < 0) ? errno : 0;
479 196341 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
480 :
481 196341 : if (this->svc_.scheduler().try_consume_inline_budget())
482 : {
483 157106 : if (err)
484 MIS 0 : *ec = make_err(err);
485 HIT 157106 : else if (n == 0)
486 10 : *ec = capy::error::eof;
487 : else
488 157096 : *ec = {};
489 157106 : *bytes_out = bytes;
490 157106 : op.cont_op.cont.h = h;
491 157106 : return dispatch_coro(ex, op.cont_op.cont);
492 : }
493 39235 : op.h = h;
494 39235 : op.ex = ex;
495 39235 : op.ec_out = ec;
496 39235 : op.bytes_out = bytes_out;
497 39235 : op.start(token, static_cast<Derived*>(this));
498 39235 : op.impl_ptr = this->shared_from_this();
499 39235 : op.complete(err, bytes);
500 39235 : this->svc_.post(&op);
501 39235 : return std::noop_coroutine();
502 : }
503 :
504 : // EAGAIN — register with reactor
505 388 : op.h = h;
506 388 : op.ex = ex;
507 388 : op.ec_out = ec;
508 388 : op.bytes_out = bytes_out;
509 388 : op.fd = this->fd_;
510 388 : op.start(token, static_cast<Derived*>(this));
511 388 : op.impl_ptr = this->shared_from_this();
512 :
513 388 : this->register_op(
514 388 : op, this->desc_state_.read_op, this->desc_state_.read_ready,
515 388 : this->desc_state_.read_cancel_pending);
516 388 : return std::noop_coroutine();
517 : }
518 :
519 : template<
520 : class Derived,
521 : class Service,
522 : class ConnOp,
523 : class ReadOp,
524 : class WriteOp,
525 : class WaitOp,
526 : class DescState,
527 : class ImplBase,
528 : class Endpoint>
529 : std::coroutine_handle<>
530 196438 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
531 : do_write_some(
532 : std::coroutine_handle<> h,
533 : capy::executor_ref ex,
534 : buffer_param param,
535 : std::stop_token const& token,
536 : std::error_code* ec,
537 : std::size_t* bytes_out)
538 : {
539 196438 : auto& op = wr_;
540 196438 : op.reset();
541 :
542 196438 : capy::mutable_buffer bufs[WriteOp::max_buffers];
543 196438 : op.iovec_count =
544 196438 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
545 :
546 196438 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
547 : {
548 2 : op.h = h;
549 2 : op.ex = ex;
550 2 : op.ec_out = ec;
551 2 : op.bytes_out = bytes_out;
552 2 : op.start(token, static_cast<Derived*>(this));
553 2 : op.impl_ptr = this->shared_from_this();
554 2 : op.complete(0, 0);
555 2 : this->svc_.post(&op);
556 2 : return std::noop_coroutine();
557 : }
558 :
559 392872 : for (int i = 0; i < op.iovec_count; ++i)
560 : {
561 196436 : op.iovecs[i].iov_base = bufs[i].data();
562 196436 : op.iovecs[i].iov_len = bufs[i].size();
563 : }
564 :
565 : // Speculative write; the single-buffer case dispatches to a
566 : // backend-specific fast path so the kernel skips msghdr/iov_iter
567 : // setup (and so each backend can pick the right SIGPIPE strategy).
568 : ssize_t n;
569 196436 : if (op.iovec_count == 1)
570 : {
571 392872 : n = WriteOp::write_policy::write_one(
572 196436 : this->fd_, bufs[0].data(), bufs[0].size());
573 : }
574 : else
575 : {
576 MIS 0 : n = WriteOp::write_policy::write(
577 0 : this->fd_, op.iovecs, op.iovec_count);
578 : }
579 :
580 HIT 196436 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
581 : {
582 196436 : int err = (n < 0) ? errno : 0;
583 196436 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
584 :
585 196436 : if (this->svc_.scheduler().try_consume_inline_budget())
586 : {
587 157160 : *ec = err ? make_err(err) : std::error_code{};
588 157160 : *bytes_out = bytes;
589 157160 : op.cont_op.cont.h = h;
590 157160 : return dispatch_coro(ex, op.cont_op.cont);
591 : }
592 39276 : op.h = h;
593 39276 : op.ex = ex;
594 39276 : op.ec_out = ec;
595 39276 : op.bytes_out = bytes_out;
596 39276 : op.start(token, static_cast<Derived*>(this));
597 39276 : op.impl_ptr = this->shared_from_this();
598 39276 : op.complete(err, bytes);
599 39276 : this->svc_.post(&op);
600 39276 : return std::noop_coroutine();
601 : }
602 :
603 : // EAGAIN — register with reactor
604 MIS 0 : op.h = h;
605 0 : op.ex = ex;
606 0 : op.ec_out = ec;
607 0 : op.bytes_out = bytes_out;
608 0 : op.fd = this->fd_;
609 0 : op.start(token, static_cast<Derived*>(this));
610 0 : op.impl_ptr = this->shared_from_this();
611 :
612 0 : this->register_op(
613 0 : op, this->desc_state_.write_op, this->desc_state_.write_ready,
614 0 : this->desc_state_.write_cancel_pending, true);
615 0 : return std::noop_coroutine();
616 : }
617 :
618 : template<
619 : class Derived,
620 : class Service,
621 : class ConnOp,
622 : class ReadOp,
623 : class WriteOp,
624 : class WaitOp,
625 : class DescState,
626 : class ImplBase,
627 : class Endpoint>
628 : std::coroutine_handle<>
629 HIT 8 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
630 : do_wait(
631 : std::coroutine_handle<> h,
632 : capy::executor_ref ex,
633 : wait_type w,
634 : std::stop_token const& token,
635 : std::error_code* ec)
636 : {
637 : // wait_type::write completes immediately on a connected socket,
638 : // matching asio's behavior on IOCP. Corosio's reactor backends use
639 : // edge-triggered EPOLLOUT, which would never fire on an already-
640 : // writable socket; an immediate completion is also a more useful
641 : // contract than parking until a non-writable -> writable transition.
642 8 : if (w == wait_type::write)
643 : {
644 2 : auto& op = wait_wr_;
645 2 : if (this->svc_.scheduler().try_consume_inline_budget())
646 : {
647 MIS 0 : *ec = std::error_code{};
648 0 : op.cont_op.cont.h = h;
649 0 : return dispatch_coro(ex, op.cont_op.cont);
650 : }
651 HIT 2 : op.reset();
652 2 : op.wait_event = reactor_event_write;
653 2 : op.h = h;
654 2 : op.ex = ex;
655 2 : op.ec_out = ec;
656 2 : op.fd = this->fd_;
657 2 : op.start(token, static_cast<Derived*>(this));
658 2 : op.impl_ptr = this->shared_from_this();
659 2 : op.complete(0, 0);
660 2 : this->svc_.post(&op);
661 2 : return std::noop_coroutine();
662 : }
663 :
664 : // Pick refs up-front to avoid duplicating the register_op call.
665 : WaitOp* op_ptr;
666 : reactor_op_base** desc_slot_ptr;
667 : bool* ready_flag_ptr;
668 : bool* cancel_flag_ptr;
669 : std::uint32_t event;
670 :
671 6 : bool dummy_ready = false; // placeholder for error waits (no cached edge)
672 :
673 6 : if (w == wait_type::read)
674 : {
675 6 : op_ptr = &wait_rd_;
676 6 : desc_slot_ptr = &this->desc_state_.wait_read_op;
677 6 : ready_flag_ptr = &this->desc_state_.read_ready;
678 6 : cancel_flag_ptr = &this->desc_state_.wait_read_cancel_pending;
679 6 : event = reactor_event_read;
680 : }
681 : else // wait_type::error
682 : {
683 MIS 0 : op_ptr = &wait_er_;
684 0 : desc_slot_ptr = &this->desc_state_.wait_error_op;
685 0 : ready_flag_ptr = &dummy_ready;
686 0 : cancel_flag_ptr = &this->desc_state_.wait_error_cancel_pending;
687 0 : event = reactor_event_error;
688 : }
689 :
690 HIT 6 : auto& op = *op_ptr;
691 6 : op.reset();
692 6 : op.wait_event = event;
693 6 : op.h = h;
694 6 : op.ex = ex;
695 6 : op.ec_out = ec;
696 6 : op.fd = this->fd_;
697 6 : op.start(token, static_cast<Derived*>(this));
698 6 : op.impl_ptr = this->shared_from_this();
699 :
700 6 : this->register_op(op, *desc_slot_ptr, *ready_flag_ptr, *cancel_flag_ptr,
701 : false);
702 6 : return std::noop_coroutine();
703 : }
704 :
705 : } // namespace boost::corosio::detail
706 :
707 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
|