TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/epoll/epoll_traits.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29 :
30 : #include <boost/corosio/detail/except.hpp>
31 :
32 : #include <atomic>
33 : #include <chrono>
34 : #include <cstdint>
35 : #include <mutex>
36 : #include <vector>
37 :
38 : #include <errno.h>
39 : #include <sys/epoll.h>
40 : #include <sys/eventfd.h>
41 : #include <sys/timerfd.h>
42 : #include <unistd.h>
43 :
44 : namespace boost::corosio::detail {
45 :
46 : /** Linux scheduler using epoll for I/O multiplexing.
47 :
48 : This scheduler implements the scheduler interface using Linux epoll
49 : for efficient I/O event notification. It uses a single reactor model
50 : where one thread runs epoll_wait while other threads
51 : wait on a condition variable for handler work. This design provides:
52 :
53 : - Handler parallelism: N posted handlers can execute on N threads
54 : - No thundering herd: condition_variable wakes exactly one thread
55 : - IOCP parity: Behavior matches Windows I/O completion port semantics
56 :
57 : When threads call run(), they first try to execute queued handlers.
58 : If the queue is empty and no reactor is running, one thread becomes
59 : the reactor and runs epoll_wait. Other threads wait on a condition
60 : variable until handlers are available.
61 :
62 : @par Thread Safety
63 : All public member functions are thread-safe.
64 : */
65 : class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler
66 : {
67 : public:
68 : /** Construct the scheduler.
69 :
70 : Creates an epoll instance, eventfd for reactor interruption,
71 : and timerfd for kernel-managed timer expiry.
72 :
73 : @param ctx Reference to the owning execution_context.
74 : @param concurrency_hint Hint for expected thread count (unused).
75 : */
76 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77 :
78 : /// Destroy the scheduler.
79 : ~epoll_scheduler() override;
80 :
81 : epoll_scheduler(epoll_scheduler const&) = delete;
82 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83 :
84 : /// Shut down the scheduler, draining pending operations.
85 : void shutdown() override;
86 :
87 : /// Apply runtime configuration, resizing the event buffer.
88 : void configure_reactor(
89 : unsigned max_events,
90 : unsigned budget_init,
91 : unsigned budget_max,
92 : unsigned unassisted) override;
93 :
94 : /** Return the epoll file descriptor.
95 :
96 : Used by socket services to register file descriptors
97 : for I/O event notification.
98 :
99 : @return The epoll file descriptor.
100 : */
101 : int epoll_fd() const noexcept
102 : {
103 : return epoll_fd_;
104 : }
105 :
106 : /** Register a descriptor for persistent monitoring.
107 :
108 : The fd is registered once and stays registered until explicitly
109 : deregistered. Events are dispatched via reactor_descriptor_state which
110 : tracks pending read/write/connect operations.
111 :
112 : @param fd The file descriptor to register.
113 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
114 : */
115 : void register_descriptor(int fd, reactor_descriptor_state* desc) const;
116 :
117 : /** Deregister a persistently registered descriptor.
118 :
119 : @param fd The file descriptor to deregister.
120 : */
121 : void deregister_descriptor(int fd) const;
122 :
123 : private:
124 : void
125 : run_task(lock_type& lock, context_type* ctx,
126 : long timeout_us) override;
127 : void interrupt_reactor() const override;
128 : void update_timerfd() const;
129 :
130 : int epoll_fd_;
131 : int event_fd_;
132 : int timer_fd_;
133 :
134 : // Edge-triggered eventfd state
135 : mutable std::atomic<bool> eventfd_armed_{false};
136 :
137 : // Set when the earliest timer changes; flushed before epoll_wait
138 : mutable std::atomic<bool> timerfd_stale_{false};
139 :
140 : // Event buffer sized from max_events_per_poll_ (set at construction,
141 : // resized by configure_reactor via io_context_options).
142 : std::vector<epoll_event> event_buffer_;
143 : };
144 :
145 HIT 356 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
146 356 : : epoll_fd_(-1)
147 356 : , event_fd_(-1)
148 356 : , timer_fd_(-1)
149 712 : , event_buffer_(max_events_per_poll_)
150 : {
151 356 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
152 356 : if (epoll_fd_ < 0)
153 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
154 :
155 HIT 356 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
156 356 : if (event_fd_ < 0)
157 : {
158 MIS 0 : int errn = errno;
159 0 : ::close(epoll_fd_);
160 0 : detail::throw_system_error(make_err(errn), "eventfd");
161 : }
162 :
163 HIT 356 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
164 356 : if (timer_fd_ < 0)
165 : {
166 MIS 0 : int errn = errno;
167 0 : ::close(event_fd_);
168 0 : ::close(epoll_fd_);
169 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
170 : }
171 :
172 HIT 356 : epoll_event ev{};
173 356 : ev.events = EPOLLIN | EPOLLET;
174 356 : ev.data.ptr = nullptr;
175 356 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
176 : {
177 MIS 0 : int errn = errno;
178 0 : ::close(timer_fd_);
179 0 : ::close(event_fd_);
180 0 : ::close(epoll_fd_);
181 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
182 : }
183 :
184 HIT 356 : epoll_event timer_ev{};
185 356 : timer_ev.events = EPOLLIN | EPOLLERR;
186 356 : timer_ev.data.ptr = &timer_fd_;
187 356 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
188 : {
189 MIS 0 : int errn = errno;
190 0 : ::close(timer_fd_);
191 0 : ::close(event_fd_);
192 0 : ::close(epoll_fd_);
193 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
194 : }
195 :
196 HIT 356 : timer_svc_ = &get_timer_service(ctx, *this);
197 356 : timer_svc_->set_on_earliest_changed(
198 5271 : timer_service::callback(this, [](void* p) {
199 4915 : auto* self = static_cast<epoll_scheduler*>(p);
200 4915 : self->timerfd_stale_.store(true, std::memory_order_release);
201 4915 : self->interrupt_reactor();
202 4915 : }));
203 :
204 356 : get_resolver_service(ctx, *this);
205 356 : get_signal_service(ctx, *this);
206 356 : get_stream_file_service(ctx, *this);
207 356 : get_random_access_file_service(ctx, *this);
208 :
209 356 : completed_ops_.push(&task_op_);
210 356 : }
211 :
212 712 : inline epoll_scheduler::~epoll_scheduler()
213 : {
214 356 : if (timer_fd_ >= 0)
215 356 : ::close(timer_fd_);
216 356 : if (event_fd_ >= 0)
217 356 : ::close(event_fd_);
218 356 : if (epoll_fd_ >= 0)
219 356 : ::close(epoll_fd_);
220 712 : }
221 :
222 : inline void
223 356 : epoll_scheduler::shutdown()
224 : {
225 356 : shutdown_drain();
226 :
227 356 : if (event_fd_ >= 0)
228 356 : interrupt_reactor();
229 356 : }
230 :
231 : inline void
232 MIS 0 : epoll_scheduler::configure_reactor(
233 : unsigned max_events,
234 : unsigned budget_init,
235 : unsigned budget_max,
236 : unsigned unassisted)
237 : {
238 0 : reactor_scheduler::configure_reactor(
239 : max_events, budget_init, budget_max, unassisted);
240 0 : event_buffer_.resize(max_events_per_poll_);
241 0 : }
242 :
243 : inline void
244 HIT 9538 : epoll_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
245 : {
246 9538 : epoll_event ev{};
247 9538 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
248 9538 : ev.data.ptr = desc;
249 :
250 9538 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
251 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
252 :
253 HIT 9538 : desc->registered_events = ev.events;
254 9538 : desc->fd = fd;
255 9538 : desc->scheduler_ = this;
256 9538 : desc->mutex.set_enabled(!single_threaded_);
257 9538 : desc->ready_events_.store(0, std::memory_order_relaxed);
258 :
259 9538 : conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
260 9538 : desc->impl_ref_.reset();
261 9538 : desc->read_ready = false;
262 9538 : desc->write_ready = false;
263 9538 : }
264 :
265 : inline void
266 9538 : epoll_scheduler::deregister_descriptor(int fd) const
267 : {
268 9538 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
269 9538 : }
270 :
271 : inline void
272 5572 : epoll_scheduler::interrupt_reactor() const
273 : {
274 5572 : bool expected = false;
275 5572 : if (eventfd_armed_.compare_exchange_strong(
276 : expected, true, std::memory_order_release,
277 : std::memory_order_relaxed))
278 : {
279 5349 : std::uint64_t val = 1;
280 5349 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
281 : }
282 5572 : }
283 :
284 : inline void
285 9789 : epoll_scheduler::update_timerfd() const
286 : {
287 9789 : auto nearest = timer_svc_->nearest_expiry();
288 :
289 9789 : itimerspec ts{};
290 9789 : int flags = 0;
291 :
292 9789 : if (nearest == timer_service::time_point::max())
293 : {
294 : // No timers — disarm by setting to 0 (relative)
295 : }
296 : else
297 : {
298 9735 : auto now = std::chrono::steady_clock::now();
299 9735 : if (nearest <= now)
300 : {
301 : // Use 1ns instead of 0 — zero disarms the timerfd
302 282 : ts.it_value.tv_nsec = 1;
303 : }
304 : else
305 : {
306 9453 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
307 9453 : nearest - now)
308 9453 : .count();
309 9453 : ts.it_value.tv_sec = nsec / 1000000000;
310 9453 : ts.it_value.tv_nsec = nsec % 1000000000;
311 9453 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
312 MIS 0 : ts.it_value.tv_nsec = 1;
313 : }
314 : }
315 :
316 HIT 9789 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
317 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
318 HIT 9789 : }
319 :
320 : inline void
321 42691 : epoll_scheduler::run_task(
322 : lock_type& lock, context_type* ctx, long timeout_us)
323 : {
324 : int timeout_ms;
325 42691 : if (task_interrupted_)
326 28863 : timeout_ms = 0;
327 13828 : else if (timeout_us < 0)
328 13820 : timeout_ms = -1;
329 : else
330 8 : timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
331 :
332 42691 : if (lock.owns_lock())
333 13828 : lock.unlock();
334 :
335 42691 : task_cleanup on_exit{this, &lock, ctx};
336 :
337 : // Flush deferred timerfd programming before blocking
338 42691 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
339 4893 : update_timerfd();
340 :
341 42691 : int nfds = ::epoll_wait(
342 : epoll_fd_, event_buffer_.data(),
343 42691 : static_cast<int>(event_buffer_.size()), timeout_ms);
344 :
345 42691 : if (nfds < 0 && errno != EINTR)
346 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
347 :
348 HIT 42691 : bool check_timers = false;
349 42691 : op_queue local_ops;
350 :
351 94612 : for (int i = 0; i < nfds; ++i)
352 : {
353 51921 : if (event_buffer_[i].data.ptr == nullptr)
354 : {
355 : std::uint64_t val;
356 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
357 4993 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
358 4993 : eventfd_armed_.store(false, std::memory_order_relaxed);
359 4993 : continue;
360 4993 : }
361 :
362 46928 : if (event_buffer_[i].data.ptr == &timer_fd_)
363 : {
364 : std::uint64_t expirations;
365 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
366 : [[maybe_unused]] auto r =
367 4896 : ::read(timer_fd_, &expirations, sizeof(expirations));
368 4896 : check_timers = true;
369 4896 : continue;
370 4896 : }
371 :
372 : auto* desc =
373 42032 : static_cast<reactor_descriptor_state*>(event_buffer_[i].data.ptr);
374 42032 : desc->add_ready_events(event_buffer_[i].events);
375 :
376 42032 : bool expected = false;
377 42032 : if (desc->is_enqueued_.compare_exchange_strong(
378 : expected, true, std::memory_order_release,
379 : std::memory_order_relaxed))
380 : {
381 42032 : local_ops.push(desc);
382 : }
383 : }
384 :
385 42691 : if (check_timers)
386 : {
387 4896 : timer_svc_->process_expired();
388 4896 : update_timerfd();
389 : }
390 :
391 42691 : lock.lock();
392 :
393 42691 : if (!local_ops.empty())
394 28326 : completed_ops_.splice(local_ops);
395 42691 : }
396 :
397 : } // namespace boost::corosio::detail
398 :
399 : #endif // BOOST_COROSIO_HAS_EPOLL
400 :
401 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|