include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

81.0% Lines (336/415) 93.3% Functions (28/30)
include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 239 explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 239 }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code open_socket(
128 tcp_socket::implementation& impl,
129 int family,
130 int type,
131 int protocol) override;
132
133 312752 epoll_scheduler& scheduler() const noexcept
134 {
135 312752 return state_->sched_;
136 }
137 void post(epoll_op* op);
138 void work_started() noexcept;
139 void work_finished() noexcept;
140
141 private:
142 std::unique_ptr<epoll_socket_state> state_;
143 };
144
145 //--------------------------------------------------------------------------
146 //
147 // Implementation
148 //
149 //--------------------------------------------------------------------------
150
151 // Register an op with the reactor, handling cached edge events.
152 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
153 inline void
154 4917 epoll_socket::register_op(
155 epoll_op& op,
156 epoll_op*& desc_slot,
157 bool& ready_flag,
158 bool& cancel_flag) noexcept
159 {
160 4917 svc_.work_started();
161
162 4917 std::lock_guard lock(desc_state_.mutex);
163 4917 bool io_done = false;
164 4917 if (ready_flag)
165 {
166 142 ready_flag = false;
167 142 op.perform_io();
168 142 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
169 142 if (!io_done)
170 142 op.errn = 0;
171 }
172
173 4917 if (cancel_flag)
174 {
175 95 cancel_flag = false;
176 95 op.cancelled.store(true, std::memory_order_relaxed);
177 }
178
179 4917 if (io_done || op.cancelled.load(std::memory_order_acquire))
180 {
181 95 svc_.post(&op);
182 95 svc_.work_finished();
183 }
184 else
185 {
186 4822 desc_slot = &op;
187 }
188 4917 }
189
190 inline void
191 104 epoll_op::canceller::operator()() const noexcept
192 {
193 104 op->cancel();
194 104 }
195
196 inline void
197 epoll_connect_op::cancel() noexcept
198 {
199 if (socket_impl_)
200 socket_impl_->cancel_single_op(*this);
201 else
202 request_cancel();
203 }
204
205 inline void
206 98 epoll_read_op::cancel() noexcept
207 {
208 98 if (socket_impl_)
209 98 socket_impl_->cancel_single_op(*this);
210 else
211 request_cancel();
212 98 }
213
214 inline void
215 epoll_write_op::cancel() noexcept
216 {
217 if (socket_impl_)
218 socket_impl_->cancel_single_op(*this);
219 else
220 request_cancel();
221 }
222
223 inline void
224 48299 epoll_op::operator()()
225 {
226 48299 stop_cb.reset();
227
228 48299 socket_impl_->svc_.scheduler().reset_inline_budget();
229
230 48299 if (cancelled.load(std::memory_order_acquire))
231 205 *ec_out = capy::error::canceled;
232 48094 else if (errn != 0)
233 *ec_out = make_err(errn);
234 48094 else if (is_read_operation() && bytes_transferred == 0)
235 *ec_out = capy::error::eof;
236 else
237 48094 *ec_out = {};
238
239 48299 *bytes_out = bytes_transferred;
240
241 // Move to stack before resuming coroutine. The coroutine might close
242 // the socket, releasing the last wrapper ref. If impl_ptr were the
243 // last ref and we destroyed it while still in operator(), we'd have
244 // use-after-free. Moving to local ensures destruction happens at
245 // function exit, after all member accesses are complete.
246 48299 capy::executor_ref saved_ex(ex);
247 48299 std::coroutine_handle<> saved_h(h);
248 48299 auto prevent_premature_destruction = std::move(impl_ptr);
249 48299 dispatch_coro(saved_ex, saved_h).resume();
250 48299 }
251
252 inline void
253 4716 epoll_connect_op::operator()()
254 {
255 4716 stop_cb.reset();
256
257 4716 socket_impl_->svc_.scheduler().reset_inline_budget();
258
259 4716 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
260
261 // Cache endpoints on successful connect
262 4716 if (success && socket_impl_)
263 {
264 4713 endpoint local_ep;
265 4713 sockaddr_storage local_storage{};
266 4713 socklen_t local_len = sizeof(local_storage);
267 4713 if (::getsockname(
268 4713 fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
269 0)
270 4713 local_ep = from_sockaddr(local_storage);
271 4713 static_cast<epoll_socket*>(socket_impl_)
272 4713 ->set_endpoints(local_ep, target_endpoint);
273 }
274
275 4716 if (cancelled.load(std::memory_order_acquire))
276 *ec_out = capy::error::canceled;
277 4716 else if (errn != 0)
278 3 *ec_out = make_err(errn);
279 else
280 4713 *ec_out = {};
281
282 // Move to stack before resuming. See epoll_op::operator()() for rationale.
283 4716 capy::executor_ref saved_ex(ex);
284 4716 std::coroutine_handle<> saved_h(h);
285 4716 auto prevent_premature_destruction = std::move(impl_ptr);
286 4716 dispatch_coro(saved_ex, saved_h).resume();
287 4716 }
288
289 14203 inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
290 14203 : svc_(svc)
291 {
292 14203 }
293
294 14203 inline epoll_socket::~epoll_socket() = default;
295
296 inline std::coroutine_handle<>
297 4716 epoll_socket::connect(
298 std::coroutine_handle<> h,
299 capy::executor_ref ex,
300 endpoint ep,
301 std::stop_token token,
302 std::error_code* ec)
303 {
304 4716 auto& op = conn_;
305
306 4716 sockaddr_storage storage{};
307 socklen_t addrlen =
308 4716 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
309 4716 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
310
311 4716 if (result == 0)
312 {
313 sockaddr_storage local_storage{};
314 socklen_t local_len = sizeof(local_storage);
315 if (::getsockname(
316 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
317 0)
318 local_endpoint_ = detail::from_sockaddr(local_storage);
319 remote_endpoint_ = ep;
320 }
321
322 4716 if (result == 0 || errno != EINPROGRESS)
323 {
324 int err = (result < 0) ? errno : 0;
325 if (svc_.scheduler().try_consume_inline_budget())
326 {
327 *ec = err ? make_err(err) : std::error_code{};
328 return dispatch_coro(ex, h);
329 }
330 op.reset();
331 op.h = h;
332 op.ex = ex;
333 op.ec_out = ec;
334 op.fd = fd_;
335 op.target_endpoint = ep;
336 op.start(token, this);
337 op.impl_ptr = shared_from_this();
338 op.complete(err, 0);
339 svc_.post(&op);
340 return std::noop_coroutine();
341 }
342
343 // EINPROGRESS — register with reactor
344 4716 op.reset();
345 4716 op.h = h;
346 4716 op.ex = ex;
347 4716 op.ec_out = ec;
348 4716 op.fd = fd_;
349 4716 op.target_endpoint = ep;
350 4716 op.start(token, this);
351 4716 op.impl_ptr = shared_from_this();
352
353 4716 register_op(
354 4716 op, desc_state_.connect_op, desc_state_.write_ready,
355 4716 desc_state_.connect_cancel_pending);
356 4716 return std::noop_coroutine();
357 }
358
359 inline std::coroutine_handle<>
360 120626 epoll_socket::read_some(
361 std::coroutine_handle<> h,
362 capy::executor_ref ex,
363 buffer_param param,
364 std::stop_token token,
365 std::error_code* ec,
366 std::size_t* bytes_out)
367 {
368 120626 auto& op = rd_;
369 120626 op.reset();
370
371 120626 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
372 120626 op.iovec_count =
373 120626 static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
374
375 120626 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
376 {
377 1 op.empty_buffer_read = true;
378 1 op.h = h;
379 1 op.ex = ex;
380 1 op.ec_out = ec;
381 1 op.bytes_out = bytes_out;
382 1 op.start(token, this);
383 1 op.impl_ptr = shared_from_this();
384 1 op.complete(0, 0);
385 1 svc_.post(&op);
386 1 return std::noop_coroutine();
387 }
388
389 241250 for (int i = 0; i < op.iovec_count; ++i)
390 {
391 120625 op.iovecs[i].iov_base = bufs[i].data();
392 120625 op.iovecs[i].iov_len = bufs[i].size();
393 }
394
395 // Speculative read
396 ssize_t n;
397 do
398 {
399 120625 n = ::readv(fd_, op.iovecs, op.iovec_count);
400 }
401 120625 while (n < 0 && errno == EINTR);
402
403 120625 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
404 {
405 120424 int err = (n < 0) ? errno : 0;
406 120424 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
407
408 120424 if (svc_.scheduler().try_consume_inline_budget())
409 {
410 96385 if (err)
411 *ec = make_err(err);
412 96385 else if (n == 0)
413 5 *ec = capy::error::eof;
414 else
415 96380 *ec = {};
416 96385 *bytes_out = bytes;
417 96385 return dispatch_coro(ex, h);
418 }
419 24039 op.h = h;
420 24039 op.ex = ex;
421 24039 op.ec_out = ec;
422 24039 op.bytes_out = bytes_out;
423 24039 op.start(token, this);
424 24039 op.impl_ptr = shared_from_this();
425 24039 op.complete(err, bytes);
426 24039 svc_.post(&op);
427 24039 return std::noop_coroutine();
428 }
429
430 // EAGAIN — register with reactor
431 201 op.h = h;
432 201 op.ex = ex;
433 201 op.ec_out = ec;
434 201 op.bytes_out = bytes_out;
435 201 op.fd = fd_;
436 201 op.start(token, this);
437 201 op.impl_ptr = shared_from_this();
438
439 201 register_op(
440 201 op, desc_state_.read_op, desc_state_.read_ready,
441 201 desc_state_.read_cancel_pending);
442 201 return std::noop_coroutine();
443 }
444
445 inline std::coroutine_handle<>
446 120426 epoll_socket::write_some(
447 std::coroutine_handle<> h,
448 capy::executor_ref ex,
449 buffer_param param,
450 std::stop_token token,
451 std::error_code* ec,
452 std::size_t* bytes_out)
453 {
454 120426 auto& op = wr_;
455 120426 op.reset();
456
457 120426 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
458 120426 op.iovec_count =
459 120426 static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
460
461 120426 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
462 {
463 1 op.h = h;
464 1 op.ex = ex;
465 1 op.ec_out = ec;
466 1 op.bytes_out = bytes_out;
467 1 op.start(token, this);
468 1 op.impl_ptr = shared_from_this();
469 1 op.complete(0, 0);
470 1 svc_.post(&op);
471 1 return std::noop_coroutine();
472 }
473
474 240850 for (int i = 0; i < op.iovec_count; ++i)
475 {
476 120425 op.iovecs[i].iov_base = bufs[i].data();
477 120425 op.iovecs[i].iov_len = bufs[i].size();
478 }
479
480 // Speculative write
481 120425 msghdr msg{};
482 120425 msg.msg_iov = op.iovecs;
483 120425 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
484
485 ssize_t n;
486 do
487 {
488 120425 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
489 }
490 120425 while (n < 0 && errno == EINTR);
491
492 120425 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
493 {
494 120425 int err = (n < 0) ? errno : 0;
495 120425 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
496
497 120425 if (svc_.scheduler().try_consume_inline_budget())
498 {
499 96368 *ec = err ? make_err(err) : std::error_code{};
500 96368 *bytes_out = bytes;
501 96368 return dispatch_coro(ex, h);
502 }
503 24057 op.h = h;
504 24057 op.ex = ex;
505 24057 op.ec_out = ec;
506 24057 op.bytes_out = bytes_out;
507 24057 op.start(token, this);
508 24057 op.impl_ptr = shared_from_this();
509 24057 op.complete(err, bytes);
510 24057 svc_.post(&op);
511 24057 return std::noop_coroutine();
512 }
513
514 // EAGAIN — register with reactor
515 op.h = h;
516 op.ex = ex;
517 op.ec_out = ec;
518 op.bytes_out = bytes_out;
519 op.fd = fd_;
520 op.start(token, this);
521 op.impl_ptr = shared_from_this();
522
523 register_op(
524 op, desc_state_.write_op, desc_state_.write_ready,
525 desc_state_.write_cancel_pending);
526 return std::noop_coroutine();
527 }
528
529 inline std::error_code
530 3 epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
531 {
532 int how;
533 3 switch (what)
534 {
535 1 case tcp_socket::shutdown_receive:
536 1 how = SHUT_RD;
537 1 break;
538 1 case tcp_socket::shutdown_send:
539 1 how = SHUT_WR;
540 1 break;
541 1 case tcp_socket::shutdown_both:
542 1 how = SHUT_RDWR;
543 1 break;
544 default:
545 return make_err(EINVAL);
546 }
547 3 if (::shutdown(fd_, how) != 0)
548 return make_err(errno);
549 3 return {};
550 }
551
552 inline std::error_code
553 32 epoll_socket::set_option(
554 int level, int optname, void const* data, std::size_t size) noexcept
555 {
556 32 if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
557 0)
558 return make_err(errno);
559 32 return {};
560 }
561
562 inline std::error_code
563 31 epoll_socket::get_option(
564 int level, int optname, void* data, std::size_t* size) const noexcept
565 {
566 31 socklen_t len = static_cast<socklen_t>(*size);
567 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
568 return make_err(errno);
569 31 *size = static_cast<std::size_t>(len);
570 31 return {};
571 }
572
573 inline void
574 187 epoll_socket::cancel() noexcept
575 {
576 187 auto self = weak_from_this().lock();
577 187 if (!self)
578 return;
579
580 187 conn_.request_cancel();
581 187 rd_.request_cancel();
582 187 wr_.request_cancel();
583
584 187 epoll_op* conn_claimed = nullptr;
585 187 epoll_op* rd_claimed = nullptr;
586 187 epoll_op* wr_claimed = nullptr;
587 {
588 187 std::lock_guard lock(desc_state_.mutex);
589 187 if (desc_state_.connect_op == &conn_)
590 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
591 else
592 187 desc_state_.connect_cancel_pending = true;
593 187 if (desc_state_.read_op == &rd_)
594 3 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
595 else
596 184 desc_state_.read_cancel_pending = true;
597 187 if (desc_state_.write_op == &wr_)
598 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
599 else
600 187 desc_state_.write_cancel_pending = true;
601 187 }
602
603 187 if (conn_claimed)
604 {
605 conn_.impl_ptr = self;
606 svc_.post(&conn_);
607 svc_.work_finished();
608 }
609 187 if (rd_claimed)
610 {
611 3 rd_.impl_ptr = self;
612 3 svc_.post(&rd_);
613 3 svc_.work_finished();
614 }
615 187 if (wr_claimed)
616 {
617 wr_.impl_ptr = self;
618 svc_.post(&wr_);
619 svc_.work_finished();
620 }
621 187 }
622
623 inline void
624 98 epoll_socket::cancel_single_op(epoll_op& op) noexcept
625 {
626 98 auto self = weak_from_this().lock();
627 98 if (!self)
628 return;
629
630 98 op.request_cancel();
631
632 98 epoll_op** desc_op_ptr = nullptr;
633 98 if (&op == &conn_)
634 desc_op_ptr = &desc_state_.connect_op;
635 98 else if (&op == &rd_)
636 98 desc_op_ptr = &desc_state_.read_op;
637 else if (&op == &wr_)
638 desc_op_ptr = &desc_state_.write_op;
639
640 98 if (desc_op_ptr)
641 {
642 98 epoll_op* claimed = nullptr;
643 {
644 98 std::lock_guard lock(desc_state_.mutex);
645 98 if (*desc_op_ptr == &op)
646 98 claimed = std::exchange(*desc_op_ptr, nullptr);
647 else if (&op == &conn_)
648 desc_state_.connect_cancel_pending = true;
649 else if (&op == &rd_)
650 desc_state_.read_cancel_pending = true;
651 else if (&op == &wr_)
652 desc_state_.write_cancel_pending = true;
653 98 }
654 98 if (claimed)
655 {
656 98 op.impl_ptr = self;
657 98 svc_.post(&op);
658 98 svc_.work_finished();
659 }
660 }
661 98 }
662
663 inline void
664 42581 epoll_socket::close_socket() noexcept
665 {
666 42581 auto self = weak_from_this().lock();
667 42581 if (self)
668 {
669 42581 conn_.request_cancel();
670 42581 rd_.request_cancel();
671 42581 wr_.request_cancel();
672
673 42581 epoll_op* conn_claimed = nullptr;
674 42581 epoll_op* rd_claimed = nullptr;
675 42581 epoll_op* wr_claimed = nullptr;
676 {
677 42581 std::lock_guard lock(desc_state_.mutex);
678 42581 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
679 42581 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
680 42581 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
681 42581 desc_state_.read_ready = false;
682 42581 desc_state_.write_ready = false;
683 42581 desc_state_.read_cancel_pending = false;
684 42581 desc_state_.write_cancel_pending = false;
685 42581 desc_state_.connect_cancel_pending = false;
686 42581 }
687
688 42581 if (conn_claimed)
689 {
690 conn_.impl_ptr = self;
691 svc_.post(&conn_);
692 svc_.work_finished();
693 }
694 42581 if (rd_claimed)
695 {
696 1 rd_.impl_ptr = self;
697 1 svc_.post(&rd_);
698 1 svc_.work_finished();
699 }
700 42581 if (wr_claimed)
701 {
702 wr_.impl_ptr = self;
703 svc_.post(&wr_);
704 svc_.work_finished();
705 }
706
707 42581 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708 97 desc_state_.impl_ref_ = self;
709 }
710
711 42581 if (fd_ >= 0)
712 {
713 9444 if (desc_state_.registered_events != 0)
714 9444 svc_.scheduler().deregister_descriptor(fd_);
715 9444 ::close(fd_);
716 9444 fd_ = -1;
717 }
718
719 42581 desc_state_.fd = -1;
720 42581 desc_state_.registered_events = 0;
721
722 42581 local_endpoint_ = endpoint{};
723 42581 remote_endpoint_ = endpoint{};
724 42581 }
725
726 239 inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
727 239 : state_(
728 std::make_unique<epoll_socket_state>(
729 239 ctx.use_service<epoll_scheduler>()))
730 {
731 239 }
732
733 478 inline epoll_socket_service::~epoll_socket_service() {}
734
735 inline void
736 239 epoll_socket_service::shutdown()
737 {
738 239 std::lock_guard lock(state_->mutex_);
739
740 239 while (auto* impl = state_->socket_list_.pop_front())
741 impl->close_socket();
742
743 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
744 // drains completed_ops_, calling destroy() on each queued op. If we
745 // released our shared_ptrs now, an epoll_op::destroy() could free the
746 // last ref to an impl whose embedded descriptor_state is still linked
747 // in the queue — use-after-free on the next pop(). Letting ~state_
748 // release the ptrs (during service destruction, after scheduler
749 // shutdown) keeps every impl alive until all ops have been drained.
750 239 }
751
752 inline io_object::implementation*
753 14203 epoll_socket_service::construct()
754 {
755 14203 auto impl = std::make_shared<epoll_socket>(*this);
756 14203 auto* raw = impl.get();
757
758 {
759 14203 std::lock_guard lock(state_->mutex_);
760 14203 state_->socket_list_.push_back(raw);
761 14203 state_->socket_ptrs_.emplace(raw, std::move(impl));
762 14203 }
763
764 14203 return raw;
765 14203 }
766
767 inline void
768 14203 epoll_socket_service::destroy(io_object::implementation* impl)
769 {
770 14203 auto* epoll_impl = static_cast<epoll_socket*>(impl);
771 14203 epoll_impl->close_socket();
772 14203 std::lock_guard lock(state_->mutex_);
773 14203 state_->socket_list_.remove(epoll_impl);
774 14203 state_->socket_ptrs_.erase(epoll_impl);
775 14203 }
776
777 inline std::error_code
778 4731 epoll_socket_service::open_socket(
779 tcp_socket::implementation& impl, int family, int type, int protocol)
780 {
781 4731 auto* epoll_impl = static_cast<epoll_socket*>(&impl);
782 4731 epoll_impl->close_socket();
783
784 4731 int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
785 4731 if (fd < 0)
786 return make_err(errno);
787
788 4731 if (family == AF_INET6)
789 {
790 5 int one = 1;
791 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
792 }
793
794 4731 epoll_impl->fd_ = fd;
795
796 // Register fd with epoll (edge-triggered mode)
797 4731 epoll_impl->desc_state_.fd = fd;
798 {
799 4731 std::lock_guard lock(epoll_impl->desc_state_.mutex);
800 4731 epoll_impl->desc_state_.read_op = nullptr;
801 4731 epoll_impl->desc_state_.write_op = nullptr;
802 4731 epoll_impl->desc_state_.connect_op = nullptr;
803 4731 }
804 4731 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
805
806 4731 return {};
807 }
808
809 inline void
810 23647 epoll_socket_service::close(io_object::handle& h)
811 {
812 23647 static_cast<epoll_socket*>(h.get())->close_socket();
813 23647 }
814
815 inline void
816 48295 epoll_socket_service::post(epoll_op* op)
817 {
818 48295 state_->sched_.post(op);
819 48295 }
820
821 inline void
822 4917 epoll_socket_service::work_started() noexcept
823 {
824 4917 state_->sched_.work_started();
825 4917 }
826
827 inline void
828 197 epoll_socket_service::work_finished() noexcept
829 {
830 197 state_->sched_.work_finished();
831 197 }
832
833 } // namespace boost::corosio::detail
834
835 #endif // BOOST_COROSIO_HAS_EPOLL
836
837 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
838