include/boost/corosio/tcp_server.hpp

78.7% Lines (107/136) 91.2% Functions (31/34)
include/boost/corosio/tcp_server.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 #define BOOST_COROSIO_TCP_SERVER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/except.hpp>
15 #include <boost/corosio/tcp_acceptor.hpp>
16 #include <boost/corosio/tcp_socket.hpp>
17 #include <boost/corosio/io_context.hpp>
18 #include <boost/corosio/endpoint.hpp>
19 #include <boost/capy/task.hpp>
20 #include <boost/capy/concept/execution_context.hpp>
21 #include <boost/capy/concept/io_awaitable.hpp>
22 #include <boost/capy/concept/executor.hpp>
23 #include <boost/capy/ex/any_executor.hpp>
24 #include <boost/capy/ex/frame_allocator.hpp>
25 #include <boost/capy/ex/io_env.hpp>
26 #include <boost/capy/ex/run_async.hpp>
27
28 #include <coroutine>
29 #include <memory>
30 #include <ranges>
31 #include <vector>
32
33 namespace boost::corosio {
34
35 #ifdef _MSC_VER
36 #pragma warning(push)
37 #pragma warning(disable : 4251) // class needs to have dll-interface
38 #endif
39
40 /** TCP server with pooled workers.
41
42 This class manages a pool of reusable worker objects that handle
43 incoming connections. When a connection arrives, an idle worker
44 is dispatched to handle it. After the connection completes, the
45 worker returns to the pool for reuse, avoiding allocation overhead
46 per connection.
47
48 Workers are set via @ref set_workers as a forward range of
49 pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 takes ownership of the container via type erasure.
51
52 @par Thread Safety
53 Distinct objects: Safe.
54 Shared objects: Unsafe.
55
56 @par Lifecycle
57 The server operates in three states:
58
59 - **Stopped**: Initial state, or after @ref join completes.
60 - **Running**: After @ref start, actively accepting connections.
61 - **Stopping**: After @ref stop, draining active work.
62
63 State transitions:
64 @code
65 [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 @endcode
67
68 @par Running the Server
69 @code
70 io_context ioc;
71 tcp_server srv(ioc, ioc.get_executor());
72 srv.set_workers(make_workers(ioc, 100));
73 srv.bind(endpoint{address_v4::any(), 8080});
74 srv.start();
75 ioc.run(); // Blocks until all work completes
76 @endcode
77
78 @par Graceful Shutdown
79 To shut down gracefully, call @ref stop then drain the io_context:
80 @code
81 // From a signal handler or timer callback:
82 srv.stop();
83
84 // ioc.run() returns after pending work drains.
85 // Then from the thread that called ioc.run():
86 srv.join(); // Wait for accept loops to finish
87 @endcode
88
89 @par Restart After Stop
90 The server can be restarted after a complete shutdown cycle.
91 You must drain the io_context and call @ref join before restarting:
92 @code
93 srv.start();
94 ioc.run_for( 10s ); // Run for a while
95 srv.stop(); // Signal shutdown
96 ioc.run(); // REQUIRED: drain pending completions
97 srv.join(); // REQUIRED: wait for accept loops
98
99 // Now safe to restart
100 srv.start();
101 ioc.run();
102 @endcode
103
104 @par WARNING: What NOT to Do
105 - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 - Do NOT call @ref start without completing @ref join after @ref stop.
108 - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109
110 @par Example
111 @code
112 class my_worker : public tcp_server::worker_base
113 {
114 corosio::tcp_socket sock_;
115 capy::any_executor ex_;
116 public:
117 my_worker(io_context& ctx)
118 : sock_(ctx)
119 , ex_(ctx.get_executor())
120 {
121 }
122
123 corosio::tcp_socket& socket() override { return sock_; }
124
125 void run(launcher launch) override
126 {
127 launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 {
129 // handle connection using sock
130 co_return;
131 }(&sock_));
132 }
133 };
134
135 auto make_workers(io_context& ctx, int n)
136 {
137 std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 v.reserve(n);
139 for(int i = 0; i < n; ++i)
140 v.push_back(std::make_unique<my_worker>(ctx));
141 return v;
142 }
143
144 io_context ioc;
145 tcp_server srv(ioc, ioc.get_executor());
146 srv.set_workers(make_workers(ioc, 100));
147 @endcode
148
149 @see worker_base, set_workers, launcher
150 */
151 class BOOST_COROSIO_DECL tcp_server
152 {
153 public:
154 class worker_base; ///< Abstract base for connection handlers.
155 class launcher; ///< Move-only handle to launch worker coroutines.
156
157 private:
158 struct waiter
159 {
160 waiter* next;
161 std::coroutine_handle<> h;
162 worker_base* w;
163 };
164
165 struct impl;
166
167 static impl* make_impl(capy::execution_context& ctx);
168
169 impl* impl_;
170 capy::any_executor ex_;
171 waiter* waiters_ = nullptr;
172 worker_base* idle_head_ = nullptr; // Forward list: available workers
173 worker_base* active_head_ =
174 nullptr; // Doubly linked: workers handling connections
175 worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
176 std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
177 std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
178 bool running_ = false;
179
180 // Idle list (forward/singly linked) - push front, pop front
181 45 void idle_push(worker_base* w) noexcept
182 {
183 45 w->next_ = idle_head_;
184 45 idle_head_ = w;
185 45 }
186
187 9 worker_base* idle_pop() noexcept
188 {
189 9 auto* w = idle_head_;
190 9 if (w)
191 9 idle_head_ = w->next_;
192 9 return w;
193 }
194
195 9 bool idle_empty() const noexcept
196 {
197 9 return idle_head_ == nullptr;
198 }
199
200 // Active list (doubly linked) - push back, remove anywhere
201 3 void active_push(worker_base* w) noexcept
202 {
203 3 w->next_ = nullptr;
204 3 w->prev_ = active_tail_;
205 3 if (active_tail_)
206 active_tail_->next_ = w;
207 else
208 3 active_head_ = w;
209 3 active_tail_ = w;
210 3 }
211
212 9 void active_remove(worker_base* w) noexcept
213 {
214 // Skip if not in active list (e.g., after failed accept)
215 9 if (w != active_head_ && w->prev_ == nullptr)
216 6 return;
217 3 if (w->prev_)
218 w->prev_->next_ = w->next_;
219 else
220 3 active_head_ = w->next_;
221 3 if (w->next_)
222 w->next_->prev_ = w->prev_;
223 else
224 3 active_tail_ = w->prev_;
225 3 w->prev_ = nullptr; // Mark as not in active list
226 }
227
228 template<capy::Executor Ex>
229 struct launch_wrapper
230 {
231 struct promise_type
232 {
233 Ex ex; // Executor stored directly in frame (outlives child tasks)
234 capy::io_env env_;
235
236 // For regular coroutines: first arg is executor, second is stop token
237 template<class E, class S, class... Args>
238 requires capy::Executor<std::decay_t<E>>
239 promise_type(E e, S s, Args&&...)
240 : ex(std::move(e))
241 , env_{
242 capy::executor_ref(ex), std::move(s),
243 capy::get_current_frame_allocator()}
244 {
245 }
246
247 // For lambda coroutines: first arg is closure, second is executor, third is stop token
248 template<class Closure, class E, class S, class... Args>
249 requires(!capy::Executor<std::decay_t<Closure>> &&
250 capy::Executor<std::decay_t<E>>)
251 3 promise_type(Closure&&, E e, S s, Args&&...)
252 3 : ex(std::move(e))
253 3 , env_{
254 3 capy::executor_ref(ex), std::move(s),
255 3 capy::get_current_frame_allocator()}
256 {
257 3 }
258
259 3 launch_wrapper get_return_object() noexcept
260 {
261 return {
262 3 std::coroutine_handle<promise_type>::from_promise(*this)};
263 }
264 3 std::suspend_always initial_suspend() noexcept
265 {
266 3 return {};
267 }
268 3 std::suspend_never final_suspend() noexcept
269 {
270 3 return {};
271 }
272 3 void return_void() noexcept {}
273 void unhandled_exception()
274 {
275 std::terminate();
276 }
277
278 // Inject io_env for IoAwaitable
279 template<capy::IoAwaitable Awaitable>
280 6 auto await_transform(Awaitable&& a)
281 {
282 using AwaitableT = std::decay_t<Awaitable>;
283 struct adapter
284 {
285 AwaitableT aw;
286 capy::io_env const* env;
287
288 bool await_ready()
289 {
290 return aw.await_ready();
291 }
292 decltype(auto) await_resume()
293 {
294 return aw.await_resume();
295 }
296
297 auto await_suspend(std::coroutine_handle<promise_type> h)
298 {
299 return aw.await_suspend(h, env);
300 }
301 };
302 9 return adapter{std::forward<Awaitable>(a), &env_};
303 3 }
304 };
305
306 std::coroutine_handle<promise_type> h;
307
308 3 launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
309 3 : h(handle)
310 {
311 3 }
312
313 3 ~launch_wrapper()
314 {
315 3 if (h)
316 h.destroy();
317 3 }
318
319 launch_wrapper(launch_wrapper&& o) noexcept
320 : h(std::exchange(o.h, nullptr))
321 {
322 }
323
324 launch_wrapper(launch_wrapper const&) = delete;
325 launch_wrapper& operator=(launch_wrapper const&) = delete;
326 launch_wrapper& operator=(launch_wrapper&&) = delete;
327 };
328
329 // Named functor to avoid incomplete lambda type in coroutine promise
330 template<class Executor>
331 struct launch_coro
332 {
333 3 launch_wrapper<Executor> operator()(
334 Executor,
335 std::stop_token,
336 tcp_server* self,
337 capy::task<void> t,
338 worker_base* wp)
339 {
340 // Executor and stop token stored in promise via constructor
341 co_await std::move(t);
342 co_await self->push(*wp); // worker goes back to idle list
343 6 }
344 };
345
346 class push_awaitable
347 {
348 tcp_server& self_;
349 worker_base& w_;
350
351 public:
352 9 push_awaitable(tcp_server& self, worker_base& w) noexcept
353 9 : self_(self)
354 9 , w_(w)
355 {
356 9 }
357
358 9 bool await_ready() const noexcept
359 {
360 9 return false;
361 }
362
363 std::coroutine_handle<>
364 9 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
365 {
366 // Symmetric transfer to server's executor
367 9 return self_.ex_.dispatch(h);
368 }
369
370 9 void await_resume() noexcept
371 {
372 // Running on server executor - safe to modify lists
373 // Remove from active (if present), then wake waiter or add to idle
374 9 self_.active_remove(&w_);
375 9 if (self_.waiters_)
376 {
377 auto* wait = self_.waiters_;
378 self_.waiters_ = wait->next;
379 wait->w = &w_;
380 self_.ex_.post(wait->h);
381 }
382 else
383 {
384 9 self_.idle_push(&w_);
385 }
386 9 }
387 };
388
389 class pop_awaitable
390 {
391 tcp_server& self_;
392 waiter wait_;
393
394 public:
395 9 pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
396
397 9 bool await_ready() const noexcept
398 {
399 9 return !self_.idle_empty();
400 }
401
402 bool
403 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
404 {
405 // Running on server executor (do_accept runs there)
406 wait_.h = h;
407 wait_.w = nullptr;
408 wait_.next = self_.waiters_;
409 self_.waiters_ = &wait_;
410 return true;
411 }
412
413 9 worker_base& await_resume() noexcept
414 {
415 // Running on server executor
416 9 if (wait_.w)
417 return *wait_.w; // Woken by push_awaitable
418 9 return *self_.idle_pop();
419 }
420 };
421
422 9 push_awaitable push(worker_base& w)
423 {
424 9 return push_awaitable{*this, w};
425 }
426
427 // Synchronous version for destructor/guard paths
428 // Must be called from server executor context
429 void push_sync(worker_base& w) noexcept
430 {
431 active_remove(&w);
432 if (waiters_)
433 {
434 auto* wait = waiters_;
435 waiters_ = wait->next;
436 wait->w = &w;
437 ex_.post(wait->h);
438 }
439 else
440 {
441 idle_push(&w);
442 }
443 }
444
445 9 pop_awaitable pop()
446 {
447 9 return pop_awaitable{*this};
448 }
449
450 capy::task<void> do_accept(tcp_acceptor& acc);
451
452 public:
453 /** Abstract base class for connection handlers.
454
455 Derive from this class to implement custom connection handling.
456 Each worker owns a socket and is reused across multiple
457 connections to avoid per-connection allocation.
458
459 @see tcp_server, launcher
460 */
461 class BOOST_COROSIO_DECL worker_base
462 {
463 // Ordered largest to smallest for optimal packing
464 std::stop_source stop_; // ~16 bytes
465 worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
466 worker_base* prev_ = nullptr; // 8 bytes - used only by active list
467
468 friend class tcp_server;
469
470 public:
471 /// Construct a worker.
472 worker_base();
473
474 /// Destroy the worker.
475 virtual ~worker_base();
476
477 /** Handle an accepted connection.
478
479 Called when this worker is dispatched to handle a new
480 connection. The implementation must invoke the launcher
481 exactly once to start the handling coroutine.
482
483 @param launch Handle to launch the connection coroutine.
484 */
485 virtual void run(launcher launch) = 0;
486
487 /// Return the socket used for connections.
488 virtual corosio::tcp_socket& socket() = 0;
489 };
490
491 /** Move-only handle to launch a worker coroutine.
492
493 Passed to @ref worker_base::run to start the connection-handling
494 coroutine. The launcher ensures the worker returns to the idle
495 pool when the coroutine completes or if launching fails.
496
497 The launcher must be invoked exactly once via `operator()`.
498 If destroyed without invoking, the worker is returned to the
499 idle pool automatically.
500
501 @see worker_base::run
502 */
503 class BOOST_COROSIO_DECL launcher
504 {
505 tcp_server* srv_;
506 worker_base* w_;
507
508 friend class tcp_server;
509
510 3 launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
511 {
512 3 }
513
514 public:
515 /// Return the worker to the pool if not launched.
516 3 ~launcher()
517 {
518 3 if (w_)
519 srv_->push_sync(*w_);
520 3 }
521
522 launcher(launcher&& o) noexcept
523 : srv_(o.srv_)
524 , w_(std::exchange(o.w_, nullptr))
525 {
526 }
527 launcher(launcher const&) = delete;
528 launcher& operator=(launcher const&) = delete;
529 launcher& operator=(launcher&&) = delete;
530
531 /** Launch the connection-handling coroutine.
532
533 Starts the given coroutine on the specified executor. When
534 the coroutine completes, the worker is automatically returned
535 to the idle pool.
536
537 @param ex The executor to run the coroutine on.
538 @param task The coroutine to execute.
539
540 @throws std::logic_error If this launcher was already invoked.
541 */
542 template<class Executor>
543 3 void operator()(Executor const& ex, capy::task<void> task)
544 {
545 3 if (!w_)
546 detail::throw_logic_error(); // launcher already invoked
547
548 3 auto* w = std::exchange(w_, nullptr);
549
550 // Worker is being dispatched - add to active list
551 3 srv_->active_push(w);
552
553 // Return worker to pool if coroutine setup throws
554 struct guard_t
555 {
556 tcp_server* srv;
557 worker_base* w;
558 3 ~guard_t()
559 {
560 3 if (w)
561 srv->push_sync(*w);
562 3 }
563 3 } guard{srv_, w};
564
565 // Reset worker's stop source for this connection
566 3 w->stop_ = {};
567 3 auto st = w->stop_.get_token();
568
569 3 auto wrapper =
570 3 launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
571
572 // Executor and stop token stored in promise via constructor
573 3 ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
574 3 guard.w = nullptr; // Success - dismiss guard
575 3 }
576 };
577
578 /** Construct a TCP server.
579
580 @tparam Ctx Execution context type satisfying ExecutionContext.
581 @tparam Ex Executor type satisfying Executor.
582
583 @param ctx The execution context for socket operations.
584 @param ex The executor for dispatching coroutines.
585
586 @par Example
587 @code
588 tcp_server srv(ctx, ctx.get_executor());
589 srv.set_workers(make_workers(ctx, 100));
590 srv.bind(endpoint{...});
591 srv.start();
592 @endcode
593 */
594 template<capy::ExecutionContext Ctx, capy::Executor Ex>
595 9 tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
596 9 , ex_(std::move(ex))
597 {
598 9 }
599
600 public:
601 ~tcp_server();
602 tcp_server(tcp_server const&) = delete;
603 tcp_server& operator=(tcp_server const&) = delete;
604 tcp_server(tcp_server&& o) noexcept;
605 tcp_server& operator=(tcp_server&& o) noexcept;
606
607 /** Bind to a local endpoint.
608
609 Creates an acceptor listening on the specified endpoint.
610 Multiple endpoints can be bound by calling this method
611 multiple times before @ref start.
612
613 @param ep The local endpoint to bind to.
614
615 @return The error code if binding fails.
616 */
617 std::error_code bind(endpoint ep);
618
619 /** Set the worker pool.
620
621 Replaces any existing workers with the given range. Any
622 previous workers are released and the idle/active lists
623 are cleared before populating with new workers.
624
625 @tparam Range Forward range of pointer-like objects to worker_base.
626
627 @param workers Range of workers to manage. Each element must
628 support `std::to_address()` yielding `worker_base*`.
629
630 @par Example
631 @code
632 std::vector<std::unique_ptr<my_worker>> workers;
633 for(int i = 0; i < 100; ++i)
634 workers.push_back(std::make_unique<my_worker>(ctx));
635 srv.set_workers(std::move(workers));
636 @endcode
637 */
638 template<std::ranges::forward_range Range>
639 requires std::convertible_to<
640 decltype(std::to_address(
641 std::declval<std::ranges::range_value_t<Range>&>())),
642 worker_base*>
643 9 void set_workers(Range&& workers)
644 {
645 // Clear existing state
646 9 storage_.reset();
647 9 idle_head_ = nullptr;
648 9 active_head_ = nullptr;
649 9 active_tail_ = nullptr;
650
651 // Take ownership and populate idle list
652 using StorageType = std::decay_t<Range>;
653 9 auto* p = new StorageType(std::forward<Range>(workers));
654 9 storage_ = std::shared_ptr<void>(
655 9 p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
656 45 for (auto&& elem : *static_cast<StorageType*>(p))
657 36 idle_push(std::to_address(elem));
658 9 }
659
660 /** Start accepting connections.
661
662 Launches accept loops for all bound endpoints. Incoming
663 connections are dispatched to idle workers from the pool.
664
665 Calling `start()` on an already-running server has no effect.
666
667 @par Preconditions
668 - At least one endpoint bound via @ref bind.
669 - Workers provided to the constructor.
670 - If restarting, @ref join must have completed first.
671
672 @par Effects
673 Creates one accept coroutine per bound endpoint. Each coroutine
674 runs on the server's executor, waiting for connections and
675 dispatching them to idle workers.
676
677 @par Restart Sequence
678 To restart after stopping, complete the full shutdown cycle:
679 @code
680 srv.start();
681 ioc.run_for( 1s );
682 srv.stop(); // 1. Signal shutdown
683 ioc.run(); // 2. Drain remaining completions
684 srv.join(); // 3. Wait for accept loops
685
686 // Now safe to restart
687 srv.start();
688 ioc.run();
689 @endcode
690
691 @par Thread Safety
692 Not thread safe.
693
694 @throws std::logic_error If a previous session has not been
695 joined (accept loops still active).
696 */
697 void start();
698
699 /** Stop accepting connections.
700
701 Signals all listening ports to stop accepting new connections
702 and requests cancellation of active workers via their stop tokens.
703
704 This function returns immediately; it does not wait for workers
705 to finish. Pending I/O operations complete asynchronously.
706
707 Calling `stop()` on a non-running server has no effect.
708
709 @par Effects
710 - Closes all acceptors (pending accepts complete with error).
711 - Requests stop on each active worker's stop token.
712 - Workers observing their stop token should exit promptly.
713
714 @par Postconditions
715 No new connections will be accepted. Active workers continue
716 until they observe their stop token or complete naturally.
717
718 @par What Happens Next
719 After calling `stop()`:
720 1. Let `ioc.run()` return (drains pending completions).
721 2. Call @ref join to wait for accept loops to finish.
722 3. Only then is it safe to restart or destroy the server.
723
724 @par Thread Safety
725 Not thread safe.
726
727 @see join, start
728 */
729 void stop();
730
731 /** Block until all accept loops complete.
732
733 Blocks the calling thread until all accept coroutines launched
734 by @ref start have finished executing. This synchronizes the
735 shutdown sequence, ensuring the server is fully stopped before
736 restarting or destroying it.
737
738 @par Preconditions
739 @ref stop has been called and `ioc.run()` has returned.
740
741 @par Postconditions
742 All accept loops have completed. The server is in the stopped
743 state and may be restarted via @ref start.
744
745 @par Example (Correct Usage)
746 @code
747 // main thread
748 srv.start();
749 ioc.run(); // Blocks until work completes
750 srv.join(); // Safe: called after ioc.run() returns
751 @endcode
752
753 @par WARNING: Deadlock Scenarios
754 Calling `join()` from the wrong context causes deadlock:
755
756 @code
757 // WRONG: calling join() from inside a worker coroutine
758 void run( launcher launch ) override
759 {
760 launch( ex, [this]() -> capy::task<>
761 {
762 srv_.join(); // DEADLOCK: blocks the executor
763 co_return;
764 }());
765 }
766
767 // WRONG: calling join() while ioc.run() is still active
768 std::thread t( [&]{ ioc.run(); } );
769 srv.stop();
770 srv.join(); // DEADLOCK: ioc.run() still running in thread t
771 @endcode
772
773 @par Thread Safety
774 May be called from any thread, but will deadlock if called
775 from within the io_context event loop or from a worker coroutine.
776
777 @see stop, start
778 */
779 void join();
780
781 private:
782 capy::task<> do_stop();
783 };
784
785 #ifdef _MSC_VER
786 #pragma warning(pop)
787 #endif
788
789 } // namespace boost::corosio
790
791 #endif
792