src/corosio/src/tcp_server.cpp

65.7% Lines (44/67) 86.7% Functions (13/15)
src/corosio/src/tcp_server.cpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #include <boost/corosio/tcp_server.hpp>
12 #include <boost/corosio/detail/except.hpp>
13 #include <condition_variable>
14 #include <mutex>
15 #include <utility>
16
17 namespace boost::corosio {
18
19 36 tcp_server::worker_base::worker_base() = default;
20 36 tcp_server::worker_base::~worker_base() = default;
21
22 struct tcp_server::impl
23 {
24 std::mutex join_mutex;
25 std::condition_variable join_cv;
26 capy::execution_context& ctx;
27 std::vector<tcp_acceptor> ports;
28 std::stop_source stop;
29
30 9 explicit impl(capy::execution_context& c) noexcept : ctx(c) {}
31 };
32
33 tcp_server::impl*
34 9 tcp_server::make_impl(capy::execution_context& ctx)
35 {
36 9 return new impl(ctx);
37 }
38
39 9 tcp_server::~tcp_server()
40 {
41 9 delete impl_;
42 9 }
43
44 tcp_server::tcp_server(tcp_server&& o) noexcept
45 : impl_(std::exchange(o.impl_, nullptr))
46 , ex_(o.ex_)
47 , waiters_(std::exchange(o.waiters_, nullptr))
48 , idle_head_(std::exchange(o.idle_head_, nullptr))
49 , active_head_(std::exchange(o.active_head_, nullptr))
50 , active_tail_(std::exchange(o.active_tail_, nullptr))
51 , active_accepts_(std::exchange(o.active_accepts_, 0))
52 , storage_(std::move(o.storage_))
53 , running_(std::exchange(o.running_, false))
54 {
55 }
56
57 tcp_server&
58 tcp_server::operator=(tcp_server&& o) noexcept
59 {
60 delete impl_;
61 impl_ = std::exchange(o.impl_, nullptr);
62 ex_ = o.ex_;
63 waiters_ = std::exchange(o.waiters_, nullptr);
64 idle_head_ = std::exchange(o.idle_head_, nullptr);
65 active_head_ = std::exchange(o.active_head_, nullptr);
66 active_tail_ = std::exchange(o.active_tail_, nullptr);
67 active_accepts_ = std::exchange(o.active_accepts_, 0);
68 storage_ = std::move(o.storage_);
69 running_ = std::exchange(o.running_, false);
70 return *this;
71 }
72
73 // Accept loop: wait for idle worker, accept connection, dispatch
74 capy::task<void>
75 8 tcp_server::do_accept(tcp_acceptor& acc)
76 {
77 // Analyzer can't trace value through coroutine await_transform
78 // NOLINTNEXTLINE(clang-analyzer-core.uninitialized.UndefReturn)
79 auto env = co_await capy::this_coro::environment;
80 while (!env->stop_token.stop_requested())
81 {
82 // Wait for an idle worker before blocking on accept
83 auto& w = co_await pop();
84 auto [ec] = co_await acc.accept(w.socket());
85 if (ec)
86 {
87 co_await push(w);
88 continue;
89 }
90 w.run(launcher{*this, w});
91 }
92 16 }
93
94 std::error_code
95 9 tcp_server::bind(endpoint ep)
96 {
97 try
98 {
99 9 impl_->ports.emplace_back(impl_->ctx, ep);
100 8 return {};
101 }
102 1 catch (std::system_error const& e)
103 {
104 1 return e.code();
105 1 }
106 }
107
108 void
109 10 tcp_server::start()
110 {
111 // Idempotent - only start if not already running
112 10 if (running_)
113 1 return;
114
115 // Previous session must be fully stopped before restart
116 9 if (active_accepts_ != 0)
117 1 detail::throw_logic_error(
118 "tcp_server::start: previous session not joined");
119
120 8 running_ = true;
121
122 8 impl_->stop = {}; // Fresh stop source
123 8 auto st = impl_->stop.get_token();
124
125 8 active_accepts_ = impl_->ports.size();
126
127 // Launch with completion handler that decrements counter
128 16 for (auto& t : impl_->ports)
129 16 capy::run_async(ex_, st, [this]() {
130 8 std::lock_guard lock(impl_->join_mutex);
131 8 if (--active_accepts_ == 0)
132 8 impl_->join_cv.notify_all();
133 16 })(do_accept(t));
134 8 }
135
136 void
137 10 tcp_server::stop()
138 {
139 // Idempotent - only stop if running
140 10 if (!running_)
141 2 return;
142 8 running_ = false;
143
144 // Stop accept loops
145 8 impl_->stop.request_stop();
146
147 // Launch cancellation coroutine on server executor
148 8 capy::run_async(ex_, std::stop_token{})(do_stop());
149 }
150
151 void
152 4 tcp_server::join()
153 {
154 4 std::unique_lock lock(impl_->join_mutex);
155 8 impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
156 4 }
157
158 capy::task<>
159 8 tcp_server::do_stop()
160 {
161 // Running on server executor - safe to iterate active list
162 // Just cancel, don't modify list - workers return themselves when done
163 for (auto* w = active_head_; w; w = w->next_)
164 w->stop_.request_stop();
165 co_return;
166 16 }
167
168 } // namespace boost::corosio
169