BUG-HUNT: [concurrency] _create_operator never passes AsyncIOScheduler to time-based operators — debounce, delay, throttle, buffer fire from thread-pool threads, racing with the asyncio event loop #6687

Open
opened 2026-04-09 23:34:50 +00:00 by HAL9000 · 1 comment
Owner

Bug Report: [concurrency] Time-based RxPY operators use default thread-based scheduler, not the configured AsyncIOScheduler

Severity Assessment

  • Impact: Results silently dropped in run_single_shot; result_container and error_container mutated from a non-asyncio thread while the asyncio coroutine reads them — data race / lost results
  • Likelihood: Triggered whenever a stream config uses debounce, delay, throttle, or buffer operators
  • Priority: High

Location

  • File: src/cleveragents/reactive/stream_router.py
  • Function: ReactiveStreamRouter._create_operator
  • Lines: ~390–430

Also relevant:

  • File: src/cleveragents/reactive/application.py
  • Function: ReactiveCleverAgentsApp.run_single_shot
  • Lines: ~277–316

Description

ReactiveStreamRouter stores an AsyncIOScheduler in self.scheduler, and
run_single_shot sets it before sending the first message:

# application.py ~line 285
loop = asyncio.get_running_loop()
scheduler = AsyncIOScheduler(loop=loop)
self.stream_router.scheduler = scheduler

However, _create_operator never reads self.scheduler. All four
time-based operators are created without a scheduler argument:

# stream_router.py
if op_type == "debounce":
    debounce_time = min(duration, 0.05)
    return ops.debounce(debounce_time)          # ← no scheduler

if op_type == "throttle":
    return ops.throttle_first(duration)         # ← no scheduler

if op_type == "delay":
    return ops.delay(duration)                  # ← no scheduler

if op_type == "buffer":
    return ops.buffer_with_time_or_count(timeout, count)  # ← no scheduler

In RxPY 3, every time-based operator defaults to TimeoutScheduler (backed
by Python's threading.Timer). Callbacks from TimeoutScheduler execute on
daemon thread-pool threads, not on the asyncio event loop thread.

Two cascading failures result:

  1. Race condition in run_single_shot: After streams["__input__"].on_next(prompt),
    the coroutine does await asyncio.sleep(0) and then immediately checks
    result_container. But the debounce/delay timer hasn't fired yet (it fires
    50 ms later on a thread-pool thread). result_container is empty at check
    time, so the function returns "" — silently discarding the real output.

  2. Thread-safety violation: When the timer callback fires, it calls
    Subject.on_next(value) on the downstream Subject from a thread-pool
    thread. This mutates result_container (a plain Python list) and the
    RxPY observer list concurrently with the asyncio coroutine, with no
    locking. This is an unsynchronized data race.

Evidence

# stream_router.py — no scheduler passed to any time-based op:
if op_type == "debounce":
    duration = params.get("duration", 1.0)
    debounce_time = min(duration, 0.05)
    return ops.debounce(debounce_time)        # TimeoutScheduler used by default

if op_type == "throttle":
    duration = params.get("duration", 0.2)
    return ops.throttle_first(duration)       # TimeoutScheduler used by default

if op_type == "delay":
    duration = params.get("duration", 0.1)
    return ops.delay(duration)                # TimeoutScheduler used by default

if op_type == "buffer":
    if "count" in params:
        count = params["count"]
        timeout = params.get("timeout", 0.1)
        return ops.buffer_with_time_or_count(timeout, count)  # TimeoutScheduler
# application.py — scheduler is set but operators never read it:
scheduler = AsyncIOScheduler(loop=loop)
self.stream_router.scheduler = scheduler
# ... but _create_operator never references self.scheduler

Expected Behavior

All time-based operators should be created with the AsyncIOScheduler so
that their callbacks fire within the asyncio event loop thread. Additionally,
run_single_shot should await long enough for the scheduler to have flushed
all pending work (e.g., await asyncio.sleep(max_debounce_time * 2)).

Actual Behavior

Time-based operators use TimeoutScheduler (thread-pool). Their callbacks
race with the asyncio coroutine in run_single_shot, producing:

  • Silent empty-string returns when asyncio.sleep(0) finishes before the
    timer fires
  • Unsynchronized mutations of result_container from off-loop threads

Suggested Fix

Pass self.scheduler to each time-based operator in _create_operator:

scheduler = self.scheduler  # AsyncIOScheduler if set, else None

if op_type == "debounce":
    return ops.debounce(debounce_time, scheduler=scheduler)

if op_type == "throttle":
    return ops.throttle_first(duration, scheduler=scheduler)

if op_type == "delay":
    return ops.delay(duration, scheduler=scheduler)

if op_type == "buffer":
    return ops.buffer_with_time_or_count(timeout, count, scheduler=scheduler)

And in run_single_shot, wait for the max configured duration before
checking results, or use a Future/asyncio.Event signalled from on_next.

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be
created for TDD. The test will use tags: @tdd_issue,
@tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug
exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [concurrency] Time-based RxPY operators use default thread-based scheduler, not the configured `AsyncIOScheduler` ### Severity Assessment - **Impact**: Results silently dropped in `run_single_shot`; `result_container` and `error_container` mutated from a non-asyncio thread while the asyncio coroutine reads them — data race / lost results - **Likelihood**: Triggered whenever a stream config uses `debounce`, `delay`, `throttle`, or `buffer` operators - **Priority**: High ### Location - **File**: `src/cleveragents/reactive/stream_router.py` - **Function**: `ReactiveStreamRouter._create_operator` - **Lines**: ~390–430 Also relevant: - **File**: `src/cleveragents/reactive/application.py` - **Function**: `ReactiveCleverAgentsApp.run_single_shot` - **Lines**: ~277–316 ### Description `ReactiveStreamRouter` stores an `AsyncIOScheduler` in `self.scheduler`, and `run_single_shot` sets it before sending the first message: ```python # application.py ~line 285 loop = asyncio.get_running_loop() scheduler = AsyncIOScheduler(loop=loop) self.stream_router.scheduler = scheduler ``` However, `_create_operator` **never reads `self.scheduler`**. All four time-based operators are created without a scheduler argument: ```python # stream_router.py if op_type == "debounce": debounce_time = min(duration, 0.05) return ops.debounce(debounce_time) # ← no scheduler if op_type == "throttle": return ops.throttle_first(duration) # ← no scheduler if op_type == "delay": return ops.delay(duration) # ← no scheduler if op_type == "buffer": return ops.buffer_with_time_or_count(timeout, count) # ← no scheduler ``` In RxPY 3, every time-based operator defaults to `TimeoutScheduler` (backed by Python's `threading.Timer`). Callbacks from `TimeoutScheduler` execute on daemon **thread-pool threads**, not on the asyncio event loop thread. **Two cascading failures result:** 1. **Race condition in `run_single_shot`**: After `streams["__input__"].on_next(prompt)`, the coroutine does `await asyncio.sleep(0)` and then immediately checks `result_container`. But the debounce/delay timer hasn't fired yet (it fires 50 ms later on a thread-pool thread). `result_container` is empty at check time, so the function returns `""` — silently discarding the real output. 2. **Thread-safety violation**: When the timer callback fires, it calls `Subject.on_next(value)` on the downstream Subject from a thread-pool thread. This mutates `result_container` (a plain Python `list`) and the RxPY observer list concurrently with the asyncio coroutine, with no locking. This is an unsynchronized data race. ### Evidence ```python # stream_router.py — no scheduler passed to any time-based op: if op_type == "debounce": duration = params.get("duration", 1.0) debounce_time = min(duration, 0.05) return ops.debounce(debounce_time) # TimeoutScheduler used by default if op_type == "throttle": duration = params.get("duration", 0.2) return ops.throttle_first(duration) # TimeoutScheduler used by default if op_type == "delay": duration = params.get("duration", 0.1) return ops.delay(duration) # TimeoutScheduler used by default if op_type == "buffer": if "count" in params: count = params["count"] timeout = params.get("timeout", 0.1) return ops.buffer_with_time_or_count(timeout, count) # TimeoutScheduler ``` ```python # application.py — scheduler is set but operators never read it: scheduler = AsyncIOScheduler(loop=loop) self.stream_router.scheduler = scheduler # ... but _create_operator never references self.scheduler ``` ### Expected Behavior All time-based operators should be created with the `AsyncIOScheduler` so that their callbacks fire within the asyncio event loop thread. Additionally, `run_single_shot` should await long enough for the scheduler to have flushed all pending work (e.g., `await asyncio.sleep(max_debounce_time * 2)`). ### Actual Behavior Time-based operators use `TimeoutScheduler` (thread-pool). Their callbacks race with the asyncio coroutine in `run_single_shot`, producing: - Silent empty-string returns when `asyncio.sleep(0)` finishes before the timer fires - Unsynchronized mutations of `result_container` from off-loop threads ### Suggested Fix Pass `self.scheduler` to each time-based operator in `_create_operator`: ```python scheduler = self.scheduler # AsyncIOScheduler if set, else None if op_type == "debounce": return ops.debounce(debounce_time, scheduler=scheduler) if op_type == "throttle": return ops.throttle_first(duration, scheduler=scheduler) if op_type == "delay": return ops.delay(duration, scheduler=scheduler) if op_type == "buffer": return ops.buffer_with_time_or_count(timeout, count, scheduler=scheduler) ``` And in `run_single_shot`, wait for the max configured duration before checking results, or use a `Future`/`asyncio.Event` signalled from `on_next`. ### Category concurrency ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 23:40:29 +00:00
Author
Owner

Verified — Concurrency bug: time-based operators fire from thread-pool threads, racing with asyncio event loop. MoSCoW: Should-have. Priority: High.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: time-based operators fire from thread-pool threads, racing with asyncio event loop. MoSCoW: Should-have. Priority: High. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6687
No description provided.