BUG-HUNT: [concurrency] Agent._process_wrapper raises ExecutionError inside asyncio.Task — exception is silently lost and never routed to output_stream #6521

Open
opened 2026-04-09 21:14:35 +00:00 by HAL9000 · 0 comments
Owner

Bug Report: [concurrency] — _process_wrapper exception swallowed in asyncio.Task

Severity Assessment

  • Impact: Agent processing errors are silently discarded. Callers and subscribers receive no notification that processing failed. The output_stream error channel is never invoked, so reactive consumers can't distinguish success from failure.
  • Likelihood: High — any exception in process_message() triggers this path
  • Priority: Critical

Location

  • File: src/cleveragents/agents/base.py
  • Class: Agent
  • Method: _process_wrapper, _setup_processing_pipeline
  • Lines: 23–35

Description

_setup_processing_pipeline subscribes to input_stream and dispatches messages to _process_wrapper via asyncio.create_task(). Inside _process_wrapper, any exception from process_message() is caught and re-raised as ExecutionError:

# base.py lines 23-34
def _setup_processing_pipeline(self) -> None:
    def _on_next(msg: Any) -> None:
        task = asyncio.create_task(self._process_wrapper(msg))
        self._tasks.append(task)

    self.input_stream.subscribe(
        on_next=_on_next,
        on_error=self.output_stream.on_error,
    )

async def _process_wrapper(self, message_data) -> None:
    try:
        ...
        result = await self.process_message(message, context)
        self.output_stream.on_next(result)
    except Exception as exc:
        raise ExecutionError(f"Agent {self.name} processing failed: {exc}") from exc  # BUG

When _process_wrapper raises inside an asyncio.Task, the exception is not propagated anywhere — the task stores it internally, and unless someone explicitly awaits the task and catches the exception (nobody does, since the tasks are fire-and-forget), it is silently discarded (or emits an "unhandled exception in task" warning at task GC time).

The output_stream.on_error handler is wired to the input_stream.subscribe(on_error=...), which handles errors emitted by the observable itself, not errors from tasks spawned inside on_next. So output_stream.on_error is never called when _process_wrapper fails.

Expected Behavior

When process_message() raises, the error should be forwarded to self.output_stream.on_error(exc) so reactive subscribers can react to it.

Actual Behavior

The exception is re-raised inside a fire-and-forget asyncio Task. It is silently swallowed at runtime. No error notification is sent to output_stream.

Suggested Fix

Replace the raise in _process_wrapper with a call to output_stream.on_error:

async def _process_wrapper(self, message_data) -> None:
    try:
        ...
        result = await self.process_message(message, context)
        self.output_stream.on_next(result)
    except Exception as exc:
        error = ExecutionError(f"Agent {self.name} processing failed: {exc}") from exc
        self.output_stream.on_error(error)  # Route to error channel instead of raising

Category

concurrency / error-handling

TDD Note

After this bug is verified, a Type/Testing issue will be created with a TDD test tagged @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [concurrency] — `_process_wrapper` exception swallowed in asyncio.Task ### Severity Assessment - **Impact**: Agent processing errors are silently discarded. Callers and subscribers receive no notification that processing failed. The `output_stream` error channel is never invoked, so reactive consumers can't distinguish success from failure. - **Likelihood**: High — any exception in `process_message()` triggers this path - **Priority**: Critical ### Location - **File**: `src/cleveragents/agents/base.py` - **Class**: `Agent` - **Method**: `_process_wrapper`, `_setup_processing_pipeline` - **Lines**: 23–35 ### Description `_setup_processing_pipeline` subscribes to `input_stream` and dispatches messages to `_process_wrapper` via `asyncio.create_task()`. Inside `_process_wrapper`, any exception from `process_message()` is caught and **re-raised** as `ExecutionError`: ```python # base.py lines 23-34 def _setup_processing_pipeline(self) -> None: def _on_next(msg: Any) -> None: task = asyncio.create_task(self._process_wrapper(msg)) self._tasks.append(task) self.input_stream.subscribe( on_next=_on_next, on_error=self.output_stream.on_error, ) async def _process_wrapper(self, message_data) -> None: try: ... result = await self.process_message(message, context) self.output_stream.on_next(result) except Exception as exc: raise ExecutionError(f"Agent {self.name} processing failed: {exc}") from exc # BUG ``` When `_process_wrapper` raises inside an `asyncio.Task`, the exception is **not propagated** anywhere — the task stores it internally, and unless someone explicitly awaits the task and catches the exception (nobody does, since the tasks are fire-and-forget), it is silently discarded (or emits an "unhandled exception in task" warning at task GC time). The `output_stream.on_error` handler is wired to the `input_stream.subscribe(on_error=...)`, which handles errors emitted by the *observable itself*, not errors from tasks spawned inside `on_next`. So `output_stream.on_error` is never called when `_process_wrapper` fails. ### Expected Behavior When `process_message()` raises, the error should be forwarded to `self.output_stream.on_error(exc)` so reactive subscribers can react to it. ### Actual Behavior The exception is re-raised inside a fire-and-forget asyncio Task. It is silently swallowed at runtime. No error notification is sent to `output_stream`. ### Suggested Fix Replace the `raise` in `_process_wrapper` with a call to `output_stream.on_error`: ```python async def _process_wrapper(self, message_data) -> None: try: ... result = await self.process_message(message, context) self.output_stream.on_next(result) except Exception as exc: error = ExecutionError(f"Agent {self.name} processing failed: {exc}") from exc self.output_stream.on_error(error) # Route to error channel instead of raising ``` ### Category concurrency / error-handling ### TDD Note After this bug is verified, a Type/Testing issue will be created with a TDD test tagged `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 21:27:54 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6521
No description provided.