BUG-HUNT: [concurrency] Agent._setup_processing_pipeline calls asyncio.create_task() from a synchronous RxPY on_next callback — raises RuntimeError when triggered outside a running event loop #6654

Open
opened 2026-04-09 22:46:49 +00:00 by HAL9000 · 0 comments
Owner

Bug Report: [concurrency] — asyncio.create_task() in Sync RxPY Callback Requires a Running Event Loop

Severity Assessment

  • Impact: Any call to agent.send_message() (or any RxPY operator that triggers input_stream.on_next()) from a synchronous context that is NOT executing inside a running asyncio event loop will raise RuntimeError: no running event loop. This crashes the reactive pipeline silently at the subscription layer — neither the on_error handler of output_stream nor any exception propagation reaches the caller because the exception escapes from _on_next inside the RxPY subscriber.
  • Likelihood: Medium-High — affects any test harness running agents synchronously, any CLI path that calls send_message() outside asyncio.run(), and any sync integration code. The process_message_sync method (line 57) works around this for a single message, but the pipeline subscription path via send_message() has no equivalent protection.
  • Priority: High

Location

  • File: src/cleveragents/agents/base.py
  • Class: Agent
  • Method: _setup_processing_pipeline
  • Lines: 31–39

Description

_setup_processing_pipeline registers a synchronous _on_next callback on the RxPY input_stream:

# base.py lines 31-39
def _setup_processing_pipeline(self) -> None:
    def _on_next(msg: Any) -> None:
        task = asyncio.create_task(self._process_wrapper(msg))  # ← BUG
        self._tasks.append(task)

    self.input_stream.subscribe(
        on_next=_on_next,
        on_error=self.output_stream.on_error,
    )

asyncio.create_task() requires a currently running asyncio event loop. From the CPython docs:

asyncio.create_task(coro) — This function requires the running event loop in the current OS thread. A RuntimeError is raised if there is no running event loop.

RxPY calls on_next synchronously on whatever thread and in whatever execution context triggers input_stream.on_next(). When send_message() is called from a non-async context (no loop running), asyncio.create_task() raises immediately:

RuntimeError: no running event loop

This exception propagates out of _on_next, is caught by RxPY's subscriber error handling (which calls on_error if provided, or swallows it), and the message is never processed. There is no log, no output_stream.on_error call, and no indication to the caller that the message was dropped.

Reproduction path:

agent = ConcreteAgent("test")
# Called from sync code (test, CLI, etc.) — no event loop running:
agent.send_message("hello")  # → RuntimeError: no running event loop

This is distinct from the already-reported issue #6521 (which covers exceptions inside an already-running task's _process_wrapper). This bug prevents the task from being created at all when called from a sync context.

Note on existing process_message_sync: The process_message_sync method (line 57) uses asyncio.get_event_loop().run_until_complete() as a workaround for sync callers to invoke the async processing path. However, it bypasses _setup_processing_pipeline entirely and calls process_message() directly, meaning the RxPY pipeline (subscription, operators, output_stream) is never exercised by that path.

Expected Behavior

Messages sent to an agent via send_message() from any context — sync or async — should be reliably queued and processed. At minimum, if called from a sync context, the agent should not silently drop the message, and any failure should be clearly surfaced.

Actual Behavior

When send_message() (or any other path that triggers input_stream.on_next()) is called outside a running asyncio event loop, asyncio.create_task() raises RuntimeError, the message is dropped, and the caller receives no indication of failure.

Suggested Fix

Option A (recommended): Use asyncio.get_running_loop().create_task() inside the callback, and add a guard to handle the sync case by scheduling via loop.call_soon_threadsafe or by buffering the task for later scheduling:

def _on_next(msg: Any) -> None:
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        # No running loop — log and discard, or buffer for deferred scheduling
        logger.warning("Agent '%s': message dropped — no running event loop", self.name)
        return
    task = loop.create_task(self._process_wrapper(msg))
    self._tasks.append(task)

Option B: Document that Agent.send_message() must only be called from within a running asyncio event loop, and add a runtime assertion to make the constraint explicit rather than silent.

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [concurrency] — `asyncio.create_task()` in Sync RxPY Callback Requires a Running Event Loop ### Severity Assessment - **Impact**: Any call to `agent.send_message()` (or any RxPY operator that triggers `input_stream.on_next()`) from a synchronous context that is NOT executing inside a running asyncio event loop will raise `RuntimeError: no running event loop`. This crashes the reactive pipeline silently at the subscription layer — neither the `on_error` handler of `output_stream` nor any exception propagation reaches the caller because the exception escapes from `_on_next` inside the RxPY subscriber. - **Likelihood**: Medium-High — affects any test harness running agents synchronously, any CLI path that calls `send_message()` outside `asyncio.run()`, and any sync integration code. The `process_message_sync` method (line 57) works around this for a single message, but the pipeline subscription path via `send_message()` has no equivalent protection. - **Priority**: High ### Location - **File**: `src/cleveragents/agents/base.py` - **Class**: `Agent` - **Method**: `_setup_processing_pipeline` - **Lines**: 31–39 ### Description `_setup_processing_pipeline` registers a synchronous `_on_next` callback on the RxPY `input_stream`: ```python # base.py lines 31-39 def _setup_processing_pipeline(self) -> None: def _on_next(msg: Any) -> None: task = asyncio.create_task(self._process_wrapper(msg)) # ← BUG self._tasks.append(task) self.input_stream.subscribe( on_next=_on_next, on_error=self.output_stream.on_error, ) ``` `asyncio.create_task()` requires a **currently running** asyncio event loop. From the CPython docs: > *`asyncio.create_task(coro)` — This function requires the running event loop in the current OS thread. A `RuntimeError` is raised if there is no running event loop.* RxPY calls `on_next` synchronously on whatever thread and in whatever execution context triggers `input_stream.on_next()`. When `send_message()` is called from a non-async context (no loop running), `asyncio.create_task()` raises immediately: ``` RuntimeError: no running event loop ``` This exception propagates out of `_on_next`, is caught by RxPY's subscriber error handling (which calls `on_error` if provided, or swallows it), and the message is never processed. There is no log, no `output_stream.on_error` call, and no indication to the caller that the message was dropped. **Reproduction path:** ```python agent = ConcreteAgent("test") # Called from sync code (test, CLI, etc.) — no event loop running: agent.send_message("hello") # → RuntimeError: no running event loop ``` This is distinct from the already-reported issue #6521 (which covers exceptions *inside* an already-running task's `_process_wrapper`). This bug prevents the task from being *created* at all when called from a sync context. **Note on existing `process_message_sync`:** The `process_message_sync` method (line 57) uses `asyncio.get_event_loop().run_until_complete()` as a workaround for sync callers to invoke the async processing path. However, it bypasses `_setup_processing_pipeline` entirely and calls `process_message()` directly, meaning the RxPY pipeline (subscription, operators, output_stream) is never exercised by that path. ### Expected Behavior Messages sent to an agent via `send_message()` from any context — sync or async — should be reliably queued and processed. At minimum, if called from a sync context, the agent should not silently drop the message, and any failure should be clearly surfaced. ### Actual Behavior When `send_message()` (or any other path that triggers `input_stream.on_next()`) is called outside a running asyncio event loop, `asyncio.create_task()` raises `RuntimeError`, the message is dropped, and the caller receives no indication of failure. ### Suggested Fix Option A (recommended): Use `asyncio.get_running_loop().create_task()` inside the callback, and add a guard to handle the sync case by scheduling via `loop.call_soon_threadsafe` or by buffering the task for later scheduling: ```python def _on_next(msg: Any) -> None: try: loop = asyncio.get_running_loop() except RuntimeError: # No running loop — log and discard, or buffer for deferred scheduling logger.warning("Agent '%s': message dropped — no running event loop", self.name) return task = loop.create_task(self._process_wrapper(msg)) self._tasks.append(task) ``` Option B: Document that `Agent.send_message()` must only be called from within a running asyncio event loop, and add a runtime assertion to make the constraint explicit rather than silent. ### Category `concurrency` ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 23:04:01 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6654
No description provided.