include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

70.9% Lines (241/340) 78.3% List of functions (36/46)
reactor_scheduler.hpp
f(x) Functions (46)
Function Calls Lines Blocks
boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler const*) :78 842054x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :90 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :107 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::inline_budget_initial() const :252 487x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::is_single_threaded() const :258 64x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_single_threaded(bool) :269 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::reactor_scheduler() :280 585x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::operator()() :325 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::task_op::destroy() :326 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler const*) :372 487x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :380 487x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler const*, boost::corosio::detail::reactor_scheduler_context*) :392 487x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :406 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::reset_inline_budget() const :434 108842x 47.1% 42.0% boost::corosio::detail::reactor_scheduler::try_consume_inline_budget() const :461 460025x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const :475 2155x 100.0% 84.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :481 2155x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :482 4310x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :484 2146x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :493 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(boost::corosio::detail::scheduler_op*) const :518 110136x 100.0% 87.0% boost::corosio::detail::reactor_scheduler::running_in_this_thread() const :535 1326x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::stop() :541 436x 100.0% 82.0% boost::corosio::detail::reactor_scheduler::stopped() const :553 62x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::restart() :559 111x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::run() :565 412x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::run_one() :590 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler::wait_one(long) :604 102x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::poll() :618 6x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::poll_one() :643 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::work_started() :657 27044x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::work_finished() :663 38007x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::compensating_work_started() const :670 159570x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :678 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :691 16529x 30.0% 35.0% boost::corosio::detail::reactor_scheduler::shutdown_drain() :708 585x 100.0% 88.0% boost::corosio::detail::reactor_scheduler::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :725 985x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :732 2321x 57.1% 50.0% boost::corosio::detail::reactor_scheduler::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :746 343431x 85.7% 80.0% boost::corosio::detail::reactor_scheduler::clear_signal() const :758 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :764 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :776 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :788 2321x 87.5% 92.0% boost::corosio::detail::reactor_scheduler::work_cleanup::~work_cleanup() :806 288373x 92.3% 92.0% boost::corosio::detail::reactor_scheduler::task_cleanup::~task_cleanup() :830 196682x 83.3% 86.0% boost::corosio::detail::reactor_scheduler::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :851 288805x 68.9% 55.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/detail/scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27 #include <stdexcept>
28
29 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31
32 namespace boost::corosio::detail {
33
34 // Forward declarations
35 class reactor_scheduler;
36 class timer_service;
37
38 /** Per-thread state for a reactor scheduler.
39
40 Each thread running a scheduler's event loop has one of these
41 on a thread-local stack. It holds a private work queue and
42 inline completion budget for speculative I/O fast paths.
43 */
44 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 {
46 /// Scheduler this context belongs to.
47 reactor_scheduler const* key;
48
49 /// Next context frame on this thread's stack.
50 reactor_scheduler_context* next;
51
52 /// Private work queue for reduced contention.
53 op_queue private_queue;
54
55 /// Unflushed work count for the private queue.
56 std::int64_t private_outstanding_work;
57
58 /// Remaining inline completions allowed this cycle.
59 int inline_budget;
60
61 /// Maximum inline budget (adaptive, 2-16).
62 int inline_budget_max;
63
64 /// True if no other thread absorbed queued work last cycle.
65 bool unassisted;
66
67 /// Construct a context frame linked to @a n.
68 reactor_scheduler_context(
69 reactor_scheduler const* k,
70 reactor_scheduler_context* n);
71 };
72
73 /// Thread-local context stack for reactor schedulers.
74 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75
76 /// Find the context frame for a scheduler on this thread.
77 inline reactor_scheduler_context*
78 842054x reactor_find_context(reactor_scheduler const* self) noexcept
79 {
80 842054x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 {
82 839006x if (c->key == self)
83 839006x return c;
84 }
85 3048x return nullptr;
86 }
87
88 /// Flush private work count to global counter.
89 inline void
90 reactor_flush_private_work(
91 reactor_scheduler_context* ctx,
92 std::atomic<std::int64_t>& outstanding_work) noexcept
93 {
94 if (ctx && ctx->private_outstanding_work > 0)
95 {
96 outstanding_work.fetch_add(
97 ctx->private_outstanding_work, std::memory_order_relaxed);
98 ctx->private_outstanding_work = 0;
99 }
100 }
101
102 /** Drain private queue to global queue, flushing work count first.
103
104 @return True if any ops were drained.
105 */
106 inline bool
107 reactor_drain_private_queue(
108 reactor_scheduler_context* ctx,
109 std::atomic<std::int64_t>& outstanding_work,
110 op_queue& completed_ops) noexcept
111 {
112 if (!ctx || ctx->private_queue.empty())
113 return false;
114
115 reactor_flush_private_work(ctx, outstanding_work);
116 completed_ops.splice(ctx->private_queue);
117 return true;
118 }
119
120 /** Non-template base for reactor-backed scheduler implementations.
121
122 Provides the complete threading model shared by epoll, kqueue,
123 and select schedulers: signal state machine, inline completion
124 budget, work counting, run/poll methods, and the do_one event
125 loop.
126
127 Derived classes provide platform-specific hooks by overriding:
128 - `run_task(lock, ctx)` to run the reactor poll
129 - `interrupt_reactor()` to wake a blocked reactor
130
131 De-templated from the original CRTP design to eliminate
132 duplicate instantiations when multiple backends are compiled
133 into the same binary. Virtual dispatch for run_task (called
134 once per reactor cycle, before a blocking syscall) has
135 negligible overhead.
136
137 @par Thread Safety
138 All public member functions are thread-safe.
139 */
140 class reactor_scheduler
141 : public scheduler
142 , public capy::execution_context::service
143 {
144 public:
145 using key_type = scheduler;
146 using context_type = reactor_scheduler_context;
147 using mutex_type = conditionally_enabled_mutex;
148 using lock_type = mutex_type::scoped_lock;
149 using event_type = conditionally_enabled_event;
150
151 /// Post a coroutine for deferred execution.
152 void post(std::coroutine_handle<> h) const override;
153
154 /// Post a scheduler operation for deferred execution.
155 void post(scheduler_op* h) const override;
156
157 /// Return true if called from a thread running this scheduler.
158 bool running_in_this_thread() const noexcept override;
159
160 /// Request the scheduler to stop dispatching handlers.
161 void stop() override;
162
163 /// Return true if the scheduler has been stopped.
164 bool stopped() const noexcept override;
165
166 /// Reset the stopped state so `run()` can resume.
167 void restart() override;
168
169 /// Run the event loop until no work remains.
170 std::size_t run() override;
171
172 /// Run until one handler completes or no work remains.
173 std::size_t run_one() override;
174
175 /// Run until one handler completes or @a usec elapses.
176 std::size_t wait_one(long usec) override;
177
178 /// Run ready handlers without blocking.
179 std::size_t poll() override;
180
181 /// Run at most one ready handler without blocking.
182 std::size_t poll_one() override;
183
184 /// Increment the outstanding work count.
185 void work_started() noexcept override;
186
187 /// Decrement the outstanding work count, stopping on zero.
188 void work_finished() noexcept override;
189
190 /** Reset the thread's inline completion budget.
191
192 Called at the start of each posted completion handler to
193 grant a fresh budget for speculative inline completions.
194 */
195 void reset_inline_budget() const noexcept;
196
197 /** Consume one unit of inline budget if available.
198
199 @return True if budget was available and consumed.
200 */
201 bool try_consume_inline_budget() const noexcept;
202
203 /** Offset a forthcoming work_finished from work_cleanup.
204
205 Called by descriptor_state when all I/O returned EAGAIN and
206 no handler will be executed. Must be called from a scheduler
207 thread.
208 */
209 void compensating_work_started() const noexcept;
210
211 /** Drain work from thread context's private queue to global queue.
212
213 Flushes private work count to the global counter, then
214 transfers the queue under mutex protection.
215
216 @param queue The private queue to drain.
217 @param count Private work count to flush before draining.
218 */
219 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220
221 /** Post completed operations for deferred invocation.
222
223 If called from a thread running this scheduler, operations
224 go to the thread's private queue (fast path). Otherwise,
225 operations are added to the global queue under mutex and a
226 waiter is signaled.
227
228 @par Preconditions
229 work_started() must have been called for each operation.
230
231 @param ops Queue of operations to post.
232 */
233 void post_deferred_completions(op_queue& ops) const;
234
235 /** Apply runtime configuration to the scheduler.
236
237 Called by `io_context` after construction. Values that do
238 not apply to this backend are silently ignored.
239
240 @param max_events Event buffer size for epoll/kqueue.
241 @param budget_init Starting inline completion budget.
242 @param budget_max Hard ceiling on adaptive budget ramp-up.
243 @param unassisted Budget when single-threaded.
244 */
245 virtual void configure_reactor(
246 unsigned max_events,
247 unsigned budget_init,
248 unsigned budget_max,
249 unsigned unassisted);
250
251 /// Return the configured initial inline budget.
252 487x unsigned inline_budget_initial() const noexcept
253 {
254 487x return inline_budget_initial_;
255 }
256
257 /// Return true if single-threaded (lockless) mode is active.
258 64x bool is_single_threaded() const noexcept
259 {
260 64x return single_threaded_;
261 }
262
263 /** Enable or disable single-threaded (lockless) mode.
264
265 When enabled, all scheduler mutex and condition variable
266 operations become no-ops. Cross-thread post() is
267 undefined behavior.
268 */
269 void configure_single_threaded(bool v) noexcept
270 {
271 single_threaded_ = v;
272 mutex_.set_enabled(!v);
273 cond_.set_enabled(!v);
274 }
275
276 protected:
277 timer_service* timer_svc_ = nullptr;
278 bool single_threaded_ = false;
279
280 585x reactor_scheduler() = default;
281
282 /** Drain completed_ops during shutdown.
283
284 Pops all operations from the global queue and destroys them,
285 skipping the task sentinel. Signals all waiting threads.
286 Derived classes call this from their shutdown() override
287 before performing platform-specific cleanup.
288 */
289 void shutdown_drain();
290
291 /// RAII guard that re-inserts the task sentinel after `run_task`.
292 struct task_cleanup
293 {
294 reactor_scheduler const* sched;
295 lock_type* lock;
296 context_type* ctx;
297 ~task_cleanup();
298 };
299
300 mutable mutex_type mutex_{true};
301 mutable event_type cond_{true};
302 mutable op_queue completed_ops_;
303 mutable std::atomic<std::int64_t> outstanding_work_{0};
304 std::atomic<bool> stopped_{false};
305 mutable std::atomic<bool> task_running_{false};
306 mutable bool task_interrupted_ = false;
307
308 // Runtime-configurable reactor tuning parameters.
309 // Defaults match the library's built-in values.
310 unsigned max_events_per_poll_ = 128;
311 unsigned inline_budget_initial_ = 2;
312 unsigned inline_budget_max_ = 16;
313 unsigned unassisted_budget_ = 4;
314
315 /// Bit 0 of `state_`: set when the condvar should be signaled.
316 static constexpr std::size_t signaled_bit = 1;
317
318 /// Increment per waiting thread in `state_`.
319 static constexpr std::size_t waiter_increment = 2;
320 mutable std::size_t state_ = 0;
321
322 /// Sentinel op that triggers a reactor poll when dequeued.
323 struct task_op final : scheduler_op
324 {
325 void operator()() override {}
326 void destroy() override {}
327 };
328 task_op task_op_;
329
330 /// Run the platform-specific reactor poll.
331 virtual void
332 run_task(lock_type& lock, context_type* ctx,
333 long timeout_us) = 0;
334
335 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 virtual void interrupt_reactor() const = 0;
337
338 private:
339 struct work_cleanup
340 {
341 reactor_scheduler* sched;
342 lock_type* lock;
343 context_type* ctx;
344 ~work_cleanup();
345 };
346
347 std::size_t do_one(
348 lock_type& lock, long timeout_us, context_type* ctx);
349
350 void signal_all(lock_type& lock) const;
351 bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 bool unlock_and_signal_one(lock_type& lock) const;
353 void clear_signal() const;
354 void wait_for_signal(lock_type& lock) const;
355 void wait_for_signal_for(
356 lock_type& lock, long timeout_us) const;
357 void wake_one_thread_and_unlock(lock_type& lock) const;
358 };
359
360 /** RAII guard that pushes/pops a scheduler context frame.
361
362 On construction, pushes a new context frame onto the
363 thread-local stack. On destruction, drains any remaining
364 private queue items to the global queue and pops the frame.
365 */
366 struct reactor_thread_context_guard
367 {
368 /// The context frame managed by this guard.
369 reactor_scheduler_context frame_;
370
371 /// Construct the guard, pushing a frame for @a sched.
372 487x explicit reactor_thread_context_guard(
373 reactor_scheduler const* sched) noexcept
374 487x : frame_(sched, reactor_context_stack.get())
375 {
376 487x reactor_context_stack.set(&frame_);
377 487x }
378
379 /// Destroy the guard, draining private work and popping the frame.
380 487x ~reactor_thread_context_guard() noexcept
381 {
382 487x if (!frame_.private_queue.empty())
383 frame_.key->drain_thread_queue(
384 frame_.private_queue, frame_.private_outstanding_work);
385 487x reactor_context_stack.set(frame_.next);
386 487x }
387 };
388
389 // ---- Inline implementations ------------------------------------------------
390
391 inline
392 487x reactor_scheduler_context::reactor_scheduler_context(
393 reactor_scheduler const* k,
394 487x reactor_scheduler_context* n)
395 487x : key(k)
396 487x , next(n)
397 487x , private_outstanding_work(0)
398 487x , inline_budget(0)
399 487x , inline_budget_max(
400 487x static_cast<int>(k->inline_budget_initial()))
401 487x , unassisted(false)
402 {
403 487x }
404
405 inline void
406 reactor_scheduler::configure_reactor(
407 unsigned max_events,
408 unsigned budget_init,
409 unsigned budget_max,
410 unsigned unassisted)
411 {
412 if (max_events < 1 ||
413 max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414 throw std::out_of_range(
415 "max_events_per_poll must be in [1, INT_MAX]");
416 if (budget_max < 1 ||
417 budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
418 throw std::out_of_range(
419 "inline_budget_max must be in [1, INT_MAX]");
420
421 // Clamp initial and unassisted to budget_max.
422 if (budget_init > budget_max)
423 budget_init = budget_max;
424 if (unassisted > budget_max)
425 unassisted = budget_max;
426
427 max_events_per_poll_ = max_events;
428 inline_budget_initial_ = budget_init;
429 inline_budget_max_ = budget_max;
430 unassisted_budget_ = unassisted;
431 }
432
433 inline void
434 108842x reactor_scheduler::reset_inline_budget() const noexcept
435 {
436 108842x if (auto* ctx = reactor_find_context(this))
437 {
438 // Cap when no other thread absorbed queued work
439 108842x if (ctx->unassisted)
440 {
441 108842x ctx->inline_budget_max =
442 108842x static_cast<int>(unassisted_budget_);
443 108842x ctx->inline_budget =
444 108842x static_cast<int>(unassisted_budget_);
445 108842x return;
446 }
447 // Ramp up when previous cycle fully consumed budget
448 if (ctx->inline_budget == 0)
449 ctx->inline_budget_max = (std::min)(
450 ctx->inline_budget_max * 2,
451 static_cast<int>(inline_budget_max_));
452 else if (ctx->inline_budget < ctx->inline_budget_max)
453 ctx->inline_budget_max = (std::max)(
454 ctx->inline_budget_max / 2,
455 static_cast<int>(inline_budget_initial_));
456 ctx->inline_budget = ctx->inline_budget_max;
457 }
458 }
459
460 inline bool
461 460025x reactor_scheduler::try_consume_inline_budget() const noexcept
462 {
463 460025x if (auto* ctx = reactor_find_context(this))
464 {
465 460025x if (ctx->inline_budget > 0)
466 {
467 368028x --ctx->inline_budget;
468 368028x return true;
469 }
470 }
471 91997x return false;
472 }
473
474 inline void
475 2155x reactor_scheduler::post(std::coroutine_handle<> h) const
476 {
477 struct post_handler final : scheduler_op
478 {
479 std::coroutine_handle<> h_;
480
481 2155x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
482 4310x ~post_handler() override = default;
483
484 2146x void operator()() override
485 {
486 2146x auto saved = h_;
487 2146x delete this;
488 // Ensure stores from the posting thread are visible
489 std::atomic_thread_fence(std::memory_order_acquire);
490 2146x saved.resume();
491 2146x }
492
493 9x void destroy() override
494 {
495 9x auto saved = h_;
496 9x delete this;
497 9x saved.destroy();
498 9x }
499 };
500
501 2155x auto ph = std::make_unique<post_handler>(h);
502
503 2155x if (auto* ctx = reactor_find_context(this))
504 {
505 6x ++ctx->private_outstanding_work;
506 6x ctx->private_queue.push(ph.release());
507 6x return;
508 }
509
510 2149x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
511
512 2149x lock_type lock(mutex_);
513 2149x completed_ops_.push(ph.release());
514 2149x wake_one_thread_and_unlock(lock);
515 2155x }
516
517 inline void
518 110136x reactor_scheduler::post(scheduler_op* h) const
519 {
520 110136x if (auto* ctx = reactor_find_context(this))
521 {
522 109964x ++ctx->private_outstanding_work;
523 109964x ctx->private_queue.push(h);
524 109964x return;
525 }
526
527 172x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
528
529 172x lock_type lock(mutex_);
530 172x completed_ops_.push(h);
531 172x wake_one_thread_and_unlock(lock);
532 172x }
533
534 inline bool
535 1326x reactor_scheduler::running_in_this_thread() const noexcept
536 {
537 1326x return reactor_find_context(this) != nullptr;
538 }
539
540 inline void
541 436x reactor_scheduler::stop()
542 {
543 436x lock_type lock(mutex_);
544 436x if (!stopped_.load(std::memory_order_acquire))
545 {
546 400x stopped_.store(true, std::memory_order_release);
547 400x signal_all(lock);
548 400x interrupt_reactor();
549 }
550 436x }
551
552 inline bool
553 62x reactor_scheduler::stopped() const noexcept
554 {
555 62x return stopped_.load(std::memory_order_acquire);
556 }
557
558 inline void
559 111x reactor_scheduler::restart()
560 {
561 111x stopped_.store(false, std::memory_order_release);
562 111x }
563
564 inline std::size_t
565 412x reactor_scheduler::run()
566 {
567 824x if (outstanding_work_.load(std::memory_order_acquire) == 0)
568 {
569 26x stop();
570 26x return 0;
571 }
572
573 386x reactor_thread_context_guard ctx(this);
574 386x lock_type lock(mutex_);
575
576 386x std::size_t n = 0;
577 for (;;)
578 {
579 288698x if (!do_one(lock, -1, &ctx.frame_))
580 386x break;
581 288312x if (n != (std::numeric_limits<std::size_t>::max)())
582 288312x ++n;
583 288312x if (!lock.owns_lock())
584 187023x lock.lock();
585 }
586 386x return n;
587 386x }
588
589 inline std::size_t
590 2x reactor_scheduler::run_one()
591 {
592 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
593 {
594 stop();
595 return 0;
596 }
597
598 2x reactor_thread_context_guard ctx(this);
599 2x lock_type lock(mutex_);
600 2x return do_one(lock, -1, &ctx.frame_);
601 2x }
602
603 inline std::size_t
604 102x reactor_scheduler::wait_one(long usec)
605 {
606 204x if (outstanding_work_.load(std::memory_order_acquire) == 0)
607 {
608 10x stop();
609 10x return 0;
610 }
611
612 92x reactor_thread_context_guard ctx(this);
613 92x lock_type lock(mutex_);
614 92x return do_one(lock, usec, &ctx.frame_);
615 92x }
616
617 inline std::size_t
618 6x reactor_scheduler::poll()
619 {
620 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
621 {
622 1x stop();
623 1x return 0;
624 }
625
626 5x reactor_thread_context_guard ctx(this);
627 5x lock_type lock(mutex_);
628
629 5x std::size_t n = 0;
630 for (;;)
631 {
632 11x if (!do_one(lock, 0, &ctx.frame_))
633 5x break;
634 6x if (n != (std::numeric_limits<std::size_t>::max)())
635 6x ++n;
636 6x if (!lock.owns_lock())
637 6x lock.lock();
638 }
639 5x return n;
640 5x }
641
642 inline std::size_t
643 4x reactor_scheduler::poll_one()
644 {
645 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
646 {
647 2x stop();
648 2x return 0;
649 }
650
651 2x reactor_thread_context_guard ctx(this);
652 2x lock_type lock(mutex_);
653 2x return do_one(lock, 0, &ctx.frame_);
654 2x }
655
656 inline void
657 27044x reactor_scheduler::work_started() noexcept
658 {
659 27044x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
660 27044x }
661
662 inline void
663 38007x reactor_scheduler::work_finished() noexcept
664 {
665 76014x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
666 392x stop();
667 38007x }
668
669 inline void
670 159570x reactor_scheduler::compensating_work_started() const noexcept
671 {
672 159570x auto* ctx = reactor_find_context(this);
673 159570x if (ctx)
674 159570x ++ctx->private_outstanding_work;
675 159570x }
676
677 inline void
678 reactor_scheduler::drain_thread_queue(
679 op_queue& queue, std::int64_t count) const
680 {
681 if (count > 0)
682 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
683
684 lock_type lock(mutex_);
685 completed_ops_.splice(queue);
686 if (count > 0)
687 maybe_unlock_and_signal_one(lock);
688 }
689
690 inline void
691 16529x reactor_scheduler::post_deferred_completions(op_queue& ops) const
692 {
693 16529x if (ops.empty())
694 16529x return;
695
696 if (auto* ctx = reactor_find_context(this))
697 {
698 ctx->private_queue.splice(ops);
699 return;
700 }
701
702 lock_type lock(mutex_);
703 completed_ops_.splice(ops);
704 wake_one_thread_and_unlock(lock);
705 }
706
707 inline void
708 585x reactor_scheduler::shutdown_drain()
709 {
710 585x lock_type lock(mutex_);
711
712 1274x while (auto* h = completed_ops_.pop())
713 {
714 689x if (h == &task_op_)
715 585x continue;
716 104x lock.unlock();
717 104x h->destroy();
718 104x lock.lock();
719 689x }
720
721 585x signal_all(lock);
722 585x }
723
724 inline void
725 985x reactor_scheduler::signal_all(lock_type&) const
726 {
727 985x state_ |= signaled_bit;
728 985x cond_.notify_all();
729 985x }
730
731 inline bool
732 2321x reactor_scheduler::maybe_unlock_and_signal_one(
733 lock_type& lock) const
734 {
735 2321x state_ |= signaled_bit;
736 2321x if (state_ > signaled_bit)
737 {
738 lock.unlock();
739 cond_.notify_one();
740 return true;
741 }
742 2321x return false;
743 }
744
745 inline bool
746 343431x reactor_scheduler::unlock_and_signal_one(
747 lock_type& lock) const
748 {
749 343431x state_ |= signaled_bit;
750 343431x bool have_waiters = state_ > signaled_bit;
751 343431x lock.unlock();
752 343431x if (have_waiters)
753 cond_.notify_one();
754 343431x return have_waiters;
755 }
756
757 inline void
758 reactor_scheduler::clear_signal() const
759 {
760 state_ &= ~signaled_bit;
761 }
762
763 inline void
764 reactor_scheduler::wait_for_signal(
765 lock_type& lock) const
766 {
767 while ((state_ & signaled_bit) == 0)
768 {
769 state_ += waiter_increment;
770 cond_.wait(lock);
771 state_ -= waiter_increment;
772 }
773 }
774
775 inline void
776 reactor_scheduler::wait_for_signal_for(
777 lock_type& lock, long timeout_us) const
778 {
779 if ((state_ & signaled_bit) == 0)
780 {
781 state_ += waiter_increment;
782 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
783 state_ -= waiter_increment;
784 }
785 }
786
787 inline void
788 2321x reactor_scheduler::wake_one_thread_and_unlock(
789 lock_type& lock) const
790 {
791 2321x if (maybe_unlock_and_signal_one(lock))
792 return;
793
794 2321x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
795 {
796 56x task_interrupted_ = true;
797 56x lock.unlock();
798 56x interrupt_reactor();
799 }
800 else
801 {
802 2265x lock.unlock();
803 }
804 }
805
806 288373x inline reactor_scheduler::work_cleanup::~work_cleanup()
807 {
808 288373x if (ctx)
809 {
810 288373x std::int64_t produced = ctx->private_outstanding_work;
811 288373x if (produced > 1)
812 15x sched->outstanding_work_.fetch_add(
813 produced - 1, std::memory_order_relaxed);
814 288358x else if (produced < 1)
815 27492x sched->work_finished();
816 288373x ctx->private_outstanding_work = 0;
817
818 288373x if (!ctx->private_queue.empty())
819 {
820 101311x lock->lock();
821 101311x sched->completed_ops_.splice(ctx->private_queue);
822 }
823 }
824 else
825 {
826 sched->work_finished();
827 }
828 288373x }
829
830 393364x inline reactor_scheduler::task_cleanup::~task_cleanup()
831 {
832 196682x if (!ctx)
833 return;
834
835 196682x if (ctx->private_outstanding_work > 0)
836 {
837 8633x sched->outstanding_work_.fetch_add(
838 8633x ctx->private_outstanding_work, std::memory_order_relaxed);
839 8633x ctx->private_outstanding_work = 0;
840 }
841
842 196682x if (!ctx->private_queue.empty())
843 {
844 8633x if (!lock->owns_lock())
845 lock->lock();
846 8633x sched->completed_ops_.splice(ctx->private_queue);
847 }
848 196682x }
849
850 inline std::size_t
851 288805x reactor_scheduler::do_one(
852 lock_type& lock, long timeout_us, context_type* ctx)
853 {
854 for (;;)
855 {
856 485446x if (stopped_.load(std::memory_order_acquire))
857 386x return 0;
858
859 485060x scheduler_op* op = completed_ops_.pop();
860
861 // Handle reactor sentinel — time to poll for I/O
862 485060x if (op == &task_op_)
863 {
864 bool more_handlers =
865 196687x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
866
867 338316x if (!more_handlers &&
868 283258x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
869 timeout_us == 0))
870 {
871 5x completed_ops_.push(&task_op_);
872 5x return 0;
873 }
874
875 196682x long task_timeout_us = more_handlers ? 0 : timeout_us;
876 196682x task_interrupted_ = task_timeout_us == 0;
877 196682x task_running_.store(true, std::memory_order_release);
878
879 196682x if (more_handlers)
880 55058x unlock_and_signal_one(lock);
881
882 try
883 {
884 196682x run_task(lock, ctx, task_timeout_us);
885 }
886 catch (...)
887 {
888 task_running_.store(false, std::memory_order_relaxed);
889 throw;
890 }
891
892 196682x task_running_.store(false, std::memory_order_relaxed);
893 196682x completed_ops_.push(&task_op_);
894 196682x if (timeout_us > 0)
895 41x return 0;
896 196641x continue;
897 196641x }
898
899 // Handle operation
900 288373x if (op != nullptr)
901 {
902 288373x bool more = !completed_ops_.empty();
903
904 288373x if (more)
905 288373x ctx->unassisted = !unlock_and_signal_one(lock);
906 else
907 {
908 ctx->unassisted = false;
909 lock.unlock();
910 }
911
912 288373x work_cleanup on_exit{this, &lock, ctx};
913 (void)on_exit;
914
915 288373x (*op)();
916 288373x return 1;
917 288373x }
918
919 // Try private queue before blocking
920 if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
921 continue;
922
923 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
924 timeout_us == 0)
925 return 0;
926
927 clear_signal();
928 if (timeout_us < 0)
929 wait_for_signal(lock);
930 else
931 wait_for_signal_for(lock, timeout_us);
932 196641x }
933 }
934
935 } // namespace boost::corosio::detail
936
937 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
938