TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/detail/scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <coroutine>
23 : #include <cstddef>
24 : #include <cstdint>
25 : #include <limits>
26 : #include <memory>
27 : #include <stdexcept>
28 :
29 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : // Forward declarations
35 : class reactor_scheduler;
36 : class timer_service;
37 :
38 : /** Per-thread state for a reactor scheduler.
39 :
40 : Each thread running a scheduler's event loop has one of these
41 : on a thread-local stack. It holds a private work queue and
42 : inline completion budget for speculative I/O fast paths.
43 : */
44 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 : {
46 : /// Scheduler this context belongs to.
47 : reactor_scheduler const* key;
48 :
49 : /// Next context frame on this thread's stack.
50 : reactor_scheduler_context* next;
51 :
52 : /// Private work queue for reduced contention.
53 : op_queue private_queue;
54 :
55 : /// Unflushed work count for the private queue.
56 : std::int64_t private_outstanding_work;
57 :
58 : /// Remaining inline completions allowed this cycle.
59 : int inline_budget;
60 :
61 : /// Maximum inline budget (adaptive, 2-16).
62 : int inline_budget_max;
63 :
64 : /// True if no other thread absorbed queued work last cycle.
65 : bool unassisted;
66 :
67 : /// Construct a context frame linked to @a n.
68 : reactor_scheduler_context(
69 : reactor_scheduler const* k,
70 : reactor_scheduler_context* n);
71 : };
72 :
73 : /// Thread-local context stack for reactor schedulers.
74 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75 :
76 : /// Find the context frame for a scheduler on this thread.
77 : inline reactor_scheduler_context*
78 HIT 842054 : reactor_find_context(reactor_scheduler const* self) noexcept
79 : {
80 842054 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 : {
82 839006 : if (c->key == self)
83 839006 : return c;
84 : }
85 3048 : return nullptr;
86 : }
87 :
88 : /// Flush private work count to global counter.
89 : inline void
90 MIS 0 : reactor_flush_private_work(
91 : reactor_scheduler_context* ctx,
92 : std::atomic<std::int64_t>& outstanding_work) noexcept
93 : {
94 0 : if (ctx && ctx->private_outstanding_work > 0)
95 : {
96 0 : outstanding_work.fetch_add(
97 : ctx->private_outstanding_work, std::memory_order_relaxed);
98 0 : ctx->private_outstanding_work = 0;
99 : }
100 0 : }
101 :
102 : /** Drain private queue to global queue, flushing work count first.
103 :
104 : @return True if any ops were drained.
105 : */
106 : inline bool
107 0 : reactor_drain_private_queue(
108 : reactor_scheduler_context* ctx,
109 : std::atomic<std::int64_t>& outstanding_work,
110 : op_queue& completed_ops) noexcept
111 : {
112 0 : if (!ctx || ctx->private_queue.empty())
113 0 : return false;
114 :
115 0 : reactor_flush_private_work(ctx, outstanding_work);
116 0 : completed_ops.splice(ctx->private_queue);
117 0 : return true;
118 : }
119 :
120 : /** Non-template base for reactor-backed scheduler implementations.
121 :
122 : Provides the complete threading model shared by epoll, kqueue,
123 : and select schedulers: signal state machine, inline completion
124 : budget, work counting, run/poll methods, and the do_one event
125 : loop.
126 :
127 : Derived classes provide platform-specific hooks by overriding:
128 : - `run_task(lock, ctx)` to run the reactor poll
129 : - `interrupt_reactor()` to wake a blocked reactor
130 :
131 : De-templated from the original CRTP design to eliminate
132 : duplicate instantiations when multiple backends are compiled
133 : into the same binary. Virtual dispatch for run_task (called
134 : once per reactor cycle, before a blocking syscall) has
135 : negligible overhead.
136 :
137 : @par Thread Safety
138 : All public member functions are thread-safe.
139 : */
140 : class reactor_scheduler
141 : : public scheduler
142 : , public capy::execution_context::service
143 : {
144 : public:
145 : using key_type = scheduler;
146 : using context_type = reactor_scheduler_context;
147 : using mutex_type = conditionally_enabled_mutex;
148 : using lock_type = mutex_type::scoped_lock;
149 : using event_type = conditionally_enabled_event;
150 :
151 : /// Post a coroutine for deferred execution.
152 : void post(std::coroutine_handle<> h) const override;
153 :
154 : /// Post a scheduler operation for deferred execution.
155 : void post(scheduler_op* h) const override;
156 :
157 : /// Return true if called from a thread running this scheduler.
158 : bool running_in_this_thread() const noexcept override;
159 :
160 : /// Request the scheduler to stop dispatching handlers.
161 : void stop() override;
162 :
163 : /// Return true if the scheduler has been stopped.
164 : bool stopped() const noexcept override;
165 :
166 : /// Reset the stopped state so `run()` can resume.
167 : void restart() override;
168 :
169 : /// Run the event loop until no work remains.
170 : std::size_t run() override;
171 :
172 : /// Run until one handler completes or no work remains.
173 : std::size_t run_one() override;
174 :
175 : /// Run until one handler completes or @a usec elapses.
176 : std::size_t wait_one(long usec) override;
177 :
178 : /// Run ready handlers without blocking.
179 : std::size_t poll() override;
180 :
181 : /// Run at most one ready handler without blocking.
182 : std::size_t poll_one() override;
183 :
184 : /// Increment the outstanding work count.
185 : void work_started() noexcept override;
186 :
187 : /// Decrement the outstanding work count, stopping on zero.
188 : void work_finished() noexcept override;
189 :
190 : /** Reset the thread's inline completion budget.
191 :
192 : Called at the start of each posted completion handler to
193 : grant a fresh budget for speculative inline completions.
194 : */
195 : void reset_inline_budget() const noexcept;
196 :
197 : /** Consume one unit of inline budget if available.
198 :
199 : @return True if budget was available and consumed.
200 : */
201 : bool try_consume_inline_budget() const noexcept;
202 :
203 : /** Offset a forthcoming work_finished from work_cleanup.
204 :
205 : Called by descriptor_state when all I/O returned EAGAIN and
206 : no handler will be executed. Must be called from a scheduler
207 : thread.
208 : */
209 : void compensating_work_started() const noexcept;
210 :
211 : /** Drain work from thread context's private queue to global queue.
212 :
213 : Flushes private work count to the global counter, then
214 : transfers the queue under mutex protection.
215 :
216 : @param queue The private queue to drain.
217 : @param count Private work count to flush before draining.
218 : */
219 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220 :
221 : /** Post completed operations for deferred invocation.
222 :
223 : If called from a thread running this scheduler, operations
224 : go to the thread's private queue (fast path). Otherwise,
225 : operations are added to the global queue under mutex and a
226 : waiter is signaled.
227 :
228 : @par Preconditions
229 : work_started() must have been called for each operation.
230 :
231 : @param ops Queue of operations to post.
232 : */
233 : void post_deferred_completions(op_queue& ops) const;
234 :
235 : /** Apply runtime configuration to the scheduler.
236 :
237 : Called by `io_context` after construction. Values that do
238 : not apply to this backend are silently ignored.
239 :
240 : @param max_events Event buffer size for epoll/kqueue.
241 : @param budget_init Starting inline completion budget.
242 : @param budget_max Hard ceiling on adaptive budget ramp-up.
243 : @param unassisted Budget when single-threaded.
244 : */
245 : virtual void configure_reactor(
246 : unsigned max_events,
247 : unsigned budget_init,
248 : unsigned budget_max,
249 : unsigned unassisted);
250 :
251 : /// Return the configured initial inline budget.
252 HIT 487 : unsigned inline_budget_initial() const noexcept
253 : {
254 487 : return inline_budget_initial_;
255 : }
256 :
257 : /// Return true if single-threaded (lockless) mode is active.
258 64 : bool is_single_threaded() const noexcept
259 : {
260 64 : return single_threaded_;
261 : }
262 :
263 : /** Enable or disable single-threaded (lockless) mode.
264 :
265 : When enabled, all scheduler mutex and condition variable
266 : operations become no-ops. Cross-thread post() is
267 : undefined behavior.
268 : */
269 MIS 0 : void configure_single_threaded(bool v) noexcept
270 : {
271 0 : single_threaded_ = v;
272 0 : mutex_.set_enabled(!v);
273 0 : cond_.set_enabled(!v);
274 0 : }
275 :
276 : protected:
277 : timer_service* timer_svc_ = nullptr;
278 : bool single_threaded_ = false;
279 :
280 HIT 585 : reactor_scheduler() = default;
281 :
282 : /** Drain completed_ops during shutdown.
283 :
284 : Pops all operations from the global queue and destroys them,
285 : skipping the task sentinel. Signals all waiting threads.
286 : Derived classes call this from their shutdown() override
287 : before performing platform-specific cleanup.
288 : */
289 : void shutdown_drain();
290 :
291 : /// RAII guard that re-inserts the task sentinel after `run_task`.
292 : struct task_cleanup
293 : {
294 : reactor_scheduler const* sched;
295 : lock_type* lock;
296 : context_type* ctx;
297 : ~task_cleanup();
298 : };
299 :
300 : mutable mutex_type mutex_{true};
301 : mutable event_type cond_{true};
302 : mutable op_queue completed_ops_;
303 : mutable std::atomic<std::int64_t> outstanding_work_{0};
304 : std::atomic<bool> stopped_{false};
305 : mutable std::atomic<bool> task_running_{false};
306 : mutable bool task_interrupted_ = false;
307 :
308 : // Runtime-configurable reactor tuning parameters.
309 : // Defaults match the library's built-in values.
310 : unsigned max_events_per_poll_ = 128;
311 : unsigned inline_budget_initial_ = 2;
312 : unsigned inline_budget_max_ = 16;
313 : unsigned unassisted_budget_ = 4;
314 :
315 : /// Bit 0 of `state_`: set when the condvar should be signaled.
316 : static constexpr std::size_t signaled_bit = 1;
317 :
318 : /// Increment per waiting thread in `state_`.
319 : static constexpr std::size_t waiter_increment = 2;
320 : mutable std::size_t state_ = 0;
321 :
322 : /// Sentinel op that triggers a reactor poll when dequeued.
323 : struct task_op final : scheduler_op
324 : {
325 MIS 0 : void operator()() override {}
326 0 : void destroy() override {}
327 : };
328 : task_op task_op_;
329 :
330 : /// Run the platform-specific reactor poll.
331 : virtual void
332 : run_task(lock_type& lock, context_type* ctx,
333 : long timeout_us) = 0;
334 :
335 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 : virtual void interrupt_reactor() const = 0;
337 :
338 : private:
339 : struct work_cleanup
340 : {
341 : reactor_scheduler* sched;
342 : lock_type* lock;
343 : context_type* ctx;
344 : ~work_cleanup();
345 : };
346 :
347 : std::size_t do_one(
348 : lock_type& lock, long timeout_us, context_type* ctx);
349 :
350 : void signal_all(lock_type& lock) const;
351 : bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 : bool unlock_and_signal_one(lock_type& lock) const;
353 : void clear_signal() const;
354 : void wait_for_signal(lock_type& lock) const;
355 : void wait_for_signal_for(
356 : lock_type& lock, long timeout_us) const;
357 : void wake_one_thread_and_unlock(lock_type& lock) const;
358 : };
359 :
360 : /** RAII guard that pushes/pops a scheduler context frame.
361 :
362 : On construction, pushes a new context frame onto the
363 : thread-local stack. On destruction, drains any remaining
364 : private queue items to the global queue and pops the frame.
365 : */
366 : struct reactor_thread_context_guard
367 : {
368 : /// The context frame managed by this guard.
369 : reactor_scheduler_context frame_;
370 :
371 : /// Construct the guard, pushing a frame for @a sched.
372 HIT 487 : explicit reactor_thread_context_guard(
373 : reactor_scheduler const* sched) noexcept
374 487 : : frame_(sched, reactor_context_stack.get())
375 : {
376 487 : reactor_context_stack.set(&frame_);
377 487 : }
378 :
379 : /// Destroy the guard, draining private work and popping the frame.
380 487 : ~reactor_thread_context_guard() noexcept
381 : {
382 487 : if (!frame_.private_queue.empty())
383 MIS 0 : frame_.key->drain_thread_queue(
384 0 : frame_.private_queue, frame_.private_outstanding_work);
385 HIT 487 : reactor_context_stack.set(frame_.next);
386 487 : }
387 : };
388 :
389 : // ---- Inline implementations ------------------------------------------------
390 :
391 : inline
392 487 : reactor_scheduler_context::reactor_scheduler_context(
393 : reactor_scheduler const* k,
394 487 : reactor_scheduler_context* n)
395 487 : : key(k)
396 487 : , next(n)
397 487 : , private_outstanding_work(0)
398 487 : , inline_budget(0)
399 487 : , inline_budget_max(
400 487 : static_cast<int>(k->inline_budget_initial()))
401 487 : , unassisted(false)
402 : {
403 487 : }
404 :
405 : inline void
406 MIS 0 : reactor_scheduler::configure_reactor(
407 : unsigned max_events,
408 : unsigned budget_init,
409 : unsigned budget_max,
410 : unsigned unassisted)
411 : {
412 0 : if (max_events < 1 ||
413 0 : max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414 : throw std::out_of_range(
415 0 : "max_events_per_poll must be in [1, INT_MAX]");
416 0 : if (budget_max < 1 ||
417 0 : budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
418 : throw std::out_of_range(
419 0 : "inline_budget_max must be in [1, INT_MAX]");
420 :
421 : // Clamp initial and unassisted to budget_max.
422 0 : if (budget_init > budget_max)
423 0 : budget_init = budget_max;
424 0 : if (unassisted > budget_max)
425 0 : unassisted = budget_max;
426 :
427 0 : max_events_per_poll_ = max_events;
428 0 : inline_budget_initial_ = budget_init;
429 0 : inline_budget_max_ = budget_max;
430 0 : unassisted_budget_ = unassisted;
431 0 : }
432 :
433 : inline void
434 HIT 108842 : reactor_scheduler::reset_inline_budget() const noexcept
435 : {
436 108842 : if (auto* ctx = reactor_find_context(this))
437 : {
438 : // Cap when no other thread absorbed queued work
439 108842 : if (ctx->unassisted)
440 : {
441 108842 : ctx->inline_budget_max =
442 108842 : static_cast<int>(unassisted_budget_);
443 108842 : ctx->inline_budget =
444 108842 : static_cast<int>(unassisted_budget_);
445 108842 : return;
446 : }
447 : // Ramp up when previous cycle fully consumed budget
448 MIS 0 : if (ctx->inline_budget == 0)
449 0 : ctx->inline_budget_max = (std::min)(
450 0 : ctx->inline_budget_max * 2,
451 0 : static_cast<int>(inline_budget_max_));
452 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
453 0 : ctx->inline_budget_max = (std::max)(
454 0 : ctx->inline_budget_max / 2,
455 0 : static_cast<int>(inline_budget_initial_));
456 0 : ctx->inline_budget = ctx->inline_budget_max;
457 : }
458 : }
459 :
460 : inline bool
461 HIT 460025 : reactor_scheduler::try_consume_inline_budget() const noexcept
462 : {
463 460025 : if (auto* ctx = reactor_find_context(this))
464 : {
465 460025 : if (ctx->inline_budget > 0)
466 : {
467 368028 : --ctx->inline_budget;
468 368028 : return true;
469 : }
470 : }
471 91997 : return false;
472 : }
473 :
474 : inline void
475 2155 : reactor_scheduler::post(std::coroutine_handle<> h) const
476 : {
477 : struct post_handler final : scheduler_op
478 : {
479 : std::coroutine_handle<> h_;
480 :
481 2155 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
482 4310 : ~post_handler() override = default;
483 :
484 2146 : void operator()() override
485 : {
486 2146 : auto saved = h_;
487 2146 : delete this;
488 : // Ensure stores from the posting thread are visible
489 : std::atomic_thread_fence(std::memory_order_acquire);
490 2146 : saved.resume();
491 2146 : }
492 :
493 9 : void destroy() override
494 : {
495 9 : auto saved = h_;
496 9 : delete this;
497 9 : saved.destroy();
498 9 : }
499 : };
500 :
501 2155 : auto ph = std::make_unique<post_handler>(h);
502 :
503 2155 : if (auto* ctx = reactor_find_context(this))
504 : {
505 6 : ++ctx->private_outstanding_work;
506 6 : ctx->private_queue.push(ph.release());
507 6 : return;
508 : }
509 :
510 2149 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
511 :
512 2149 : lock_type lock(mutex_);
513 2149 : completed_ops_.push(ph.release());
514 2149 : wake_one_thread_and_unlock(lock);
515 2155 : }
516 :
517 : inline void
518 110136 : reactor_scheduler::post(scheduler_op* h) const
519 : {
520 110136 : if (auto* ctx = reactor_find_context(this))
521 : {
522 109964 : ++ctx->private_outstanding_work;
523 109964 : ctx->private_queue.push(h);
524 109964 : return;
525 : }
526 :
527 172 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
528 :
529 172 : lock_type lock(mutex_);
530 172 : completed_ops_.push(h);
531 172 : wake_one_thread_and_unlock(lock);
532 172 : }
533 :
534 : inline bool
535 1326 : reactor_scheduler::running_in_this_thread() const noexcept
536 : {
537 1326 : return reactor_find_context(this) != nullptr;
538 : }
539 :
540 : inline void
541 436 : reactor_scheduler::stop()
542 : {
543 436 : lock_type lock(mutex_);
544 436 : if (!stopped_.load(std::memory_order_acquire))
545 : {
546 400 : stopped_.store(true, std::memory_order_release);
547 400 : signal_all(lock);
548 400 : interrupt_reactor();
549 : }
550 436 : }
551 :
552 : inline bool
553 62 : reactor_scheduler::stopped() const noexcept
554 : {
555 62 : return stopped_.load(std::memory_order_acquire);
556 : }
557 :
558 : inline void
559 111 : reactor_scheduler::restart()
560 : {
561 111 : stopped_.store(false, std::memory_order_release);
562 111 : }
563 :
564 : inline std::size_t
565 412 : reactor_scheduler::run()
566 : {
567 824 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
568 : {
569 26 : stop();
570 26 : return 0;
571 : }
572 :
573 386 : reactor_thread_context_guard ctx(this);
574 386 : lock_type lock(mutex_);
575 :
576 386 : std::size_t n = 0;
577 : for (;;)
578 : {
579 288698 : if (!do_one(lock, -1, &ctx.frame_))
580 386 : break;
581 288312 : if (n != (std::numeric_limits<std::size_t>::max)())
582 288312 : ++n;
583 288312 : if (!lock.owns_lock())
584 187023 : lock.lock();
585 : }
586 386 : return n;
587 386 : }
588 :
589 : inline std::size_t
590 2 : reactor_scheduler::run_one()
591 : {
592 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
593 : {
594 MIS 0 : stop();
595 0 : return 0;
596 : }
597 :
598 HIT 2 : reactor_thread_context_guard ctx(this);
599 2 : lock_type lock(mutex_);
600 2 : return do_one(lock, -1, &ctx.frame_);
601 2 : }
602 :
603 : inline std::size_t
604 102 : reactor_scheduler::wait_one(long usec)
605 : {
606 204 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
607 : {
608 10 : stop();
609 10 : return 0;
610 : }
611 :
612 92 : reactor_thread_context_guard ctx(this);
613 92 : lock_type lock(mutex_);
614 92 : return do_one(lock, usec, &ctx.frame_);
615 92 : }
616 :
617 : inline std::size_t
618 6 : reactor_scheduler::poll()
619 : {
620 12 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
621 : {
622 1 : stop();
623 1 : return 0;
624 : }
625 :
626 5 : reactor_thread_context_guard ctx(this);
627 5 : lock_type lock(mutex_);
628 :
629 5 : std::size_t n = 0;
630 : for (;;)
631 : {
632 11 : if (!do_one(lock, 0, &ctx.frame_))
633 5 : break;
634 6 : if (n != (std::numeric_limits<std::size_t>::max)())
635 6 : ++n;
636 6 : if (!lock.owns_lock())
637 6 : lock.lock();
638 : }
639 5 : return n;
640 5 : }
641 :
642 : inline std::size_t
643 4 : reactor_scheduler::poll_one()
644 : {
645 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
646 : {
647 2 : stop();
648 2 : return 0;
649 : }
650 :
651 2 : reactor_thread_context_guard ctx(this);
652 2 : lock_type lock(mutex_);
653 2 : return do_one(lock, 0, &ctx.frame_);
654 2 : }
655 :
656 : inline void
657 27044 : reactor_scheduler::work_started() noexcept
658 : {
659 27044 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
660 27044 : }
661 :
662 : inline void
663 38007 : reactor_scheduler::work_finished() noexcept
664 : {
665 76014 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
666 392 : stop();
667 38007 : }
668 :
669 : inline void
670 159570 : reactor_scheduler::compensating_work_started() const noexcept
671 : {
672 159570 : auto* ctx = reactor_find_context(this);
673 159570 : if (ctx)
674 159570 : ++ctx->private_outstanding_work;
675 159570 : }
676 :
677 : inline void
678 MIS 0 : reactor_scheduler::drain_thread_queue(
679 : op_queue& queue, std::int64_t count) const
680 : {
681 0 : if (count > 0)
682 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
683 :
684 0 : lock_type lock(mutex_);
685 0 : completed_ops_.splice(queue);
686 0 : if (count > 0)
687 0 : maybe_unlock_and_signal_one(lock);
688 0 : }
689 :
690 : inline void
691 HIT 16529 : reactor_scheduler::post_deferred_completions(op_queue& ops) const
692 : {
693 16529 : if (ops.empty())
694 16529 : return;
695 :
696 MIS 0 : if (auto* ctx = reactor_find_context(this))
697 : {
698 0 : ctx->private_queue.splice(ops);
699 0 : return;
700 : }
701 :
702 0 : lock_type lock(mutex_);
703 0 : completed_ops_.splice(ops);
704 0 : wake_one_thread_and_unlock(lock);
705 0 : }
706 :
707 : inline void
708 HIT 585 : reactor_scheduler::shutdown_drain()
709 : {
710 585 : lock_type lock(mutex_);
711 :
712 1274 : while (auto* h = completed_ops_.pop())
713 : {
714 689 : if (h == &task_op_)
715 585 : continue;
716 104 : lock.unlock();
717 104 : h->destroy();
718 104 : lock.lock();
719 689 : }
720 :
721 585 : signal_all(lock);
722 585 : }
723 :
724 : inline void
725 985 : reactor_scheduler::signal_all(lock_type&) const
726 : {
727 985 : state_ |= signaled_bit;
728 985 : cond_.notify_all();
729 985 : }
730 :
731 : inline bool
732 2321 : reactor_scheduler::maybe_unlock_and_signal_one(
733 : lock_type& lock) const
734 : {
735 2321 : state_ |= signaled_bit;
736 2321 : if (state_ > signaled_bit)
737 : {
738 MIS 0 : lock.unlock();
739 0 : cond_.notify_one();
740 0 : return true;
741 : }
742 HIT 2321 : return false;
743 : }
744 :
745 : inline bool
746 343431 : reactor_scheduler::unlock_and_signal_one(
747 : lock_type& lock) const
748 : {
749 343431 : state_ |= signaled_bit;
750 343431 : bool have_waiters = state_ > signaled_bit;
751 343431 : lock.unlock();
752 343431 : if (have_waiters)
753 MIS 0 : cond_.notify_one();
754 HIT 343431 : return have_waiters;
755 : }
756 :
757 : inline void
758 MIS 0 : reactor_scheduler::clear_signal() const
759 : {
760 0 : state_ &= ~signaled_bit;
761 0 : }
762 :
763 : inline void
764 0 : reactor_scheduler::wait_for_signal(
765 : lock_type& lock) const
766 : {
767 0 : while ((state_ & signaled_bit) == 0)
768 : {
769 0 : state_ += waiter_increment;
770 0 : cond_.wait(lock);
771 0 : state_ -= waiter_increment;
772 : }
773 0 : }
774 :
775 : inline void
776 0 : reactor_scheduler::wait_for_signal_for(
777 : lock_type& lock, long timeout_us) const
778 : {
779 0 : if ((state_ & signaled_bit) == 0)
780 : {
781 0 : state_ += waiter_increment;
782 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
783 0 : state_ -= waiter_increment;
784 : }
785 0 : }
786 :
787 : inline void
788 HIT 2321 : reactor_scheduler::wake_one_thread_and_unlock(
789 : lock_type& lock) const
790 : {
791 2321 : if (maybe_unlock_and_signal_one(lock))
792 MIS 0 : return;
793 :
794 HIT 2321 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
795 : {
796 56 : task_interrupted_ = true;
797 56 : lock.unlock();
798 56 : interrupt_reactor();
799 : }
800 : else
801 : {
802 2265 : lock.unlock();
803 : }
804 : }
805 :
806 288373 : inline reactor_scheduler::work_cleanup::~work_cleanup()
807 : {
808 288373 : if (ctx)
809 : {
810 288373 : std::int64_t produced = ctx->private_outstanding_work;
811 288373 : if (produced > 1)
812 15 : sched->outstanding_work_.fetch_add(
813 : produced - 1, std::memory_order_relaxed);
814 288358 : else if (produced < 1)
815 27492 : sched->work_finished();
816 288373 : ctx->private_outstanding_work = 0;
817 :
818 288373 : if (!ctx->private_queue.empty())
819 : {
820 101311 : lock->lock();
821 101311 : sched->completed_ops_.splice(ctx->private_queue);
822 : }
823 : }
824 : else
825 : {
826 MIS 0 : sched->work_finished();
827 : }
828 HIT 288373 : }
829 :
830 393364 : inline reactor_scheduler::task_cleanup::~task_cleanup()
831 : {
832 196682 : if (!ctx)
833 MIS 0 : return;
834 :
835 HIT 196682 : if (ctx->private_outstanding_work > 0)
836 : {
837 8633 : sched->outstanding_work_.fetch_add(
838 8633 : ctx->private_outstanding_work, std::memory_order_relaxed);
839 8633 : ctx->private_outstanding_work = 0;
840 : }
841 :
842 196682 : if (!ctx->private_queue.empty())
843 : {
844 8633 : if (!lock->owns_lock())
845 MIS 0 : lock->lock();
846 HIT 8633 : sched->completed_ops_.splice(ctx->private_queue);
847 : }
848 196682 : }
849 :
850 : inline std::size_t
851 288805 : reactor_scheduler::do_one(
852 : lock_type& lock, long timeout_us, context_type* ctx)
853 : {
854 : for (;;)
855 : {
856 485446 : if (stopped_.load(std::memory_order_acquire))
857 386 : return 0;
858 :
859 485060 : scheduler_op* op = completed_ops_.pop();
860 :
861 : // Handle reactor sentinel — time to poll for I/O
862 485060 : if (op == &task_op_)
863 : {
864 : bool more_handlers =
865 196687 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
866 :
867 338316 : if (!more_handlers &&
868 283258 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
869 : timeout_us == 0))
870 : {
871 5 : completed_ops_.push(&task_op_);
872 5 : return 0;
873 : }
874 :
875 196682 : long task_timeout_us = more_handlers ? 0 : timeout_us;
876 196682 : task_interrupted_ = task_timeout_us == 0;
877 196682 : task_running_.store(true, std::memory_order_release);
878 :
879 196682 : if (more_handlers)
880 55058 : unlock_and_signal_one(lock);
881 :
882 : try
883 : {
884 196682 : run_task(lock, ctx, task_timeout_us);
885 : }
886 MIS 0 : catch (...)
887 : {
888 0 : task_running_.store(false, std::memory_order_relaxed);
889 0 : throw;
890 0 : }
891 :
892 HIT 196682 : task_running_.store(false, std::memory_order_relaxed);
893 196682 : completed_ops_.push(&task_op_);
894 196682 : if (timeout_us > 0)
895 41 : return 0;
896 196641 : continue;
897 196641 : }
898 :
899 : // Handle operation
900 288373 : if (op != nullptr)
901 : {
902 288373 : bool more = !completed_ops_.empty();
903 :
904 288373 : if (more)
905 288373 : ctx->unassisted = !unlock_and_signal_one(lock);
906 : else
907 : {
908 MIS 0 : ctx->unassisted = false;
909 0 : lock.unlock();
910 : }
911 :
912 HIT 288373 : work_cleanup on_exit{this, &lock, ctx};
913 : (void)on_exit;
914 :
915 288373 : (*op)();
916 288373 : return 1;
917 288373 : }
918 :
919 : // Try private queue before blocking
920 MIS 0 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
921 0 : continue;
922 :
923 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
924 : timeout_us == 0)
925 0 : return 0;
926 :
927 0 : clear_signal();
928 0 : if (timeout_us < 0)
929 0 : wait_for_signal(lock);
930 : else
931 0 : wait_for_signal_for(lock, timeout_us);
932 HIT 196641 : }
933 : }
934 :
935 : } // namespace boost::corosio::detail
936 :
937 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|