BUG-HUNT: [concurrency] Race condition in stream processing completion detection causes incomplete results #7203

Open
opened 2026-04-10 08:53:54 +00:00 by HAL9000 · 3 comments
Owner

Background and Context

The run_single_shot() method in src/cleveragents/reactive/application.py (lines 395–415) sets up RxPY observable streams and then immediately calls await asyncio.sleep(0) to yield control, expecting stream processing to have completed synchronously. This assumption is incorrect: a single event-loop yield is insufficient to guarantee that all downstream RxPY operators have finished processing.

This is a concurrency design flaw in the reactive application layer. RxPY streams are asynchronous pipelines; emitting a value with on_next() schedules downstream processing but does not block until it completes. The single await asyncio.sleep(0) only yields control once, which may not be enough for all scheduled callbacks to run to completion before results are collected.

Current Behavior

async def run_single_shot(self, prompt: str, *, context_manager: ContextManager | None = None, allow_rxpy_in_run_mode: bool = False) -> str:
    # ... stream setup ...
    self.stream_router.streams["__input__"].on_next(prompt)
    await asyncio.sleep(0)  # This is insufficient for stream completion

    if error_container:
        raise CleverAgentsException("; ".join(error_container))

    if not result_container:
        return ""

await asyncio.sleep(0) yields control once but does not wait for stream processing to complete. Streams may still be processing when results are collected, causing the method to return an empty string or partial results.

Expected Behavior

The method should wait for stream processing to complete before collecting and returning results. Proper completion signaling — such as RxPY on_completed callbacks, asyncio.Event, or explicit awaitable completion markers — must be used to ensure all stream operators have finished before results are read.

Acceptance Criteria

  • run_single_shot() reliably returns complete results when asynchronous stream processing is involved
  • A proper completion-detection mechanism (e.g., asyncio.Event, RxPY on_completed, or equivalent) is used instead of await asyncio.sleep(0)
  • No race condition exists between stream emission and result collection
  • All existing BDD scenarios for run_single_shot() continue to pass
  • New BDD scenarios cover the race condition regression case
  • All nox stages pass; coverage ≥ 97%

Supporting Information

  • File: src/cleveragents/reactive/application.py
  • Function: run_single_shot()
  • Lines: 395–415
  • Likelihood: High — occurs whenever asynchronous stream processing is used
  • Impact: Stream processing may return incomplete or incorrect results due to premature completion detection
  • Suggested Fix: Implement proper stream completion detection using RxPY completion operators (on_completed) or explicit asyncio.Event signaling mechanisms

Backlog note: This issue was discovered during autonomous operation
on milestone v3.2.0. It does not block milestone completion and has been
placed in the backlog for human review and future milestone assignment.

Metadata

  • Branch: fix/concurrency-stream-completion-detection
  • Commit Message: fix(reactive): replace asyncio.sleep(0) with proper stream completion signaling in run_single_shot
  • Milestone: (backlog — see note above)
  • Parent Epic: #7052

Subtasks

  • Reproduce the race condition with a failing BDD scenario tagged @tdd_issue and @tdd_expected_fail
  • Identify the correct completion-signaling mechanism for the RxPY stream pipeline
  • Implement proper awaitable completion detection in run_single_shot()
  • Remove @tdd_expected_fail tag once the fix is in place
  • Update integration tests to cover the async completion path
  • Verify no performance regression via ASV benchmarks
  • Update relevant documentation / inline comments

Definition of Done

  • A BDD scenario tagged @tdd_issue and @tdd_issue_<N> exists and passes after the fix
  • run_single_shot() no longer uses bare await asyncio.sleep(0) as a completion guard
  • Proper completion signaling is implemented and documented
  • No new type errors (nox -e typecheck passes)
  • All nox stages pass
  • Coverage >= 97%

Automated by CleverAgents Bot
Supervisor: Acting on behalf of: Bug Hunter | Agent: new-issue-creator

## Background and Context The `run_single_shot()` method in `src/cleveragents/reactive/application.py` (lines 395–415) sets up RxPY observable streams and then immediately calls `await asyncio.sleep(0)` to yield control, expecting stream processing to have completed synchronously. This assumption is incorrect: a single event-loop yield is insufficient to guarantee that all downstream RxPY operators have finished processing. This is a concurrency design flaw in the reactive application layer. RxPY streams are asynchronous pipelines; emitting a value with `on_next()` schedules downstream processing but does not block until it completes. The single `await asyncio.sleep(0)` only yields control once, which may not be enough for all scheduled callbacks to run to completion before results are collected. ## Current Behavior ```python async def run_single_shot(self, prompt: str, *, context_manager: ContextManager | None = None, allow_rxpy_in_run_mode: bool = False) -> str: # ... stream setup ... self.stream_router.streams["__input__"].on_next(prompt) await asyncio.sleep(0) # This is insufficient for stream completion if error_container: raise CleverAgentsException("; ".join(error_container)) if not result_container: return "" ``` `await asyncio.sleep(0)` yields control once but does not wait for stream processing to complete. Streams may still be processing when results are collected, causing the method to return an empty string or partial results. ## Expected Behavior The method should wait for stream processing to complete before collecting and returning results. Proper completion signaling — such as RxPY `on_completed` callbacks, `asyncio.Event`, or explicit awaitable completion markers — must be used to ensure all stream operators have finished before results are read. ## Acceptance Criteria - `run_single_shot()` reliably returns complete results when asynchronous stream processing is involved - A proper completion-detection mechanism (e.g., `asyncio.Event`, RxPY `on_completed`, or equivalent) is used instead of `await asyncio.sleep(0)` - No race condition exists between stream emission and result collection - All existing BDD scenarios for `run_single_shot()` continue to pass - New BDD scenarios cover the race condition regression case - All nox stages pass; coverage ≥ 97% ## Supporting Information - **File**: `src/cleveragents/reactive/application.py` - **Function**: `run_single_shot()` - **Lines**: 395–415 - **Likelihood**: High — occurs whenever asynchronous stream processing is used - **Impact**: Stream processing may return incomplete or incorrect results due to premature completion detection - **Suggested Fix**: Implement proper stream completion detection using RxPY completion operators (`on_completed`) or explicit `asyncio.Event` signaling mechanisms > **Backlog note:** This issue was discovered during autonomous operation > on milestone v3.2.0. It does not block milestone completion and has been > placed in the backlog for human review and future milestone assignment. ## Metadata - **Branch**: `fix/concurrency-stream-completion-detection` - **Commit Message**: `fix(reactive): replace asyncio.sleep(0) with proper stream completion signaling in run_single_shot` - **Milestone**: *(backlog — see note above)* - **Parent Epic**: #7052 ## Subtasks - [ ] Reproduce the race condition with a failing BDD scenario tagged `@tdd_issue` and `@tdd_expected_fail` - [ ] Identify the correct completion-signaling mechanism for the RxPY stream pipeline - [ ] Implement proper awaitable completion detection in `run_single_shot()` - [ ] Remove `@tdd_expected_fail` tag once the fix is in place - [ ] Update integration tests to cover the async completion path - [ ] Verify no performance regression via ASV benchmarks - [ ] Update relevant documentation / inline comments ## Definition of Done - [ ] A BDD scenario tagged `@tdd_issue` and `@tdd_issue_<N>` exists and passes after the fix - [ ] `run_single_shot()` no longer uses bare `await asyncio.sleep(0)` as a completion guard - [ ] Proper completion signaling is implemented and documented - [ ] No new type errors (`nox -e typecheck` passes) - [ ] All nox stages pass - [ ] Coverage >= 97% --- **Automated by CleverAgents Bot** Supervisor: Acting on behalf of: Bug Hunter | Agent: new-issue-creator
Author
Owner

Verified — Concurrency bug: race condition in stream processing completion detection. MoSCoW: Should-have. Priority: Medium.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: race condition in stream processing completion detection. MoSCoW: Should-have. Priority: Medium. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Author
Owner

Verified — Concurrency bug: race condition in stream processing completion detection. MoSCoW: Should-have. Priority: Medium.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: race condition in stream processing completion detection. MoSCoW: Should-have. Priority: Medium. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Author
Owner

Verified — Concurrency bug: race condition in stream processing completion detection. MoSCoW: Should-have. Priority: Medium.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: race condition in stream processing completion detection. MoSCoW: Should-have. Priority: Medium. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Reference
cleveragents/cleveragents-core#7203
No description provided.