BUG-HUNT: [resource] Agent.dispose() does not cancel pending asyncio tasks — background tasks continue running after disposal, potentially emitting to a disposed output stream #6529

Open
opened 2026-04-09 21:15:22 +00:00 by HAL9000 · 1 comment
Owner

Bug Report: [resource] — Agent.dispose() leaves asyncio tasks running

Severity Assessment

  • Impact: After dispose() is called, any in-flight _process_wrapper tasks continue executing. When they complete, they call self.output_stream.on_next(result) on an already-disposed RxPY Subject, which can cause unhandled exceptions or silent errors in the disposed stream.
  • Likelihood: Medium — triggered when agents are disposed while messages are in-flight (e.g., during shutdown or agent replacement)
  • Priority: High

Location

  • File: src/cleveragents/agents/base.py
  • Class: Agent
  • Method: dispose
  • Lines: 86–90

Description

The dispose() method disposes the RxPY Subjects (input_stream and output_stream) but does not cancel the asyncio tasks stored in self._tasks:

# base.py lines 86-90
def dispose(self) -> None:
    if hasattr(self.input_stream, "dispose"):
        self.input_stream.dispose()
    if hasattr(self.output_stream, "dispose"):
        self.output_stream.dispose()
    # NOTE: self._tasks is NOT cancelled!

After output_stream.dispose() is called, any still-running task that later calls self.output_stream.on_next(result) will interact with a disposed Subject. Depending on the RxPY version, this can raise DisposedException or silently fail.

Additionally, the tasks represent leaked resources — they hold references to self and its state, preventing garbage collection of the agent.

Expected Behavior

dispose() should cancel all pending tasks before disposing the streams:

def dispose(self) -> None:
    for task in list(self._tasks):
        task.cancel()
    self._tasks.clear()
    if hasattr(self.input_stream, "dispose"):
        self.input_stream.dispose()
    if hasattr(self.output_stream, "dispose"):
        self.output_stream.dispose()

Actual Behavior

Pending tasks continue running after disposal. They will attempt to emit to the already-disposed output_stream, causing undefined behavior.

Category

resource / concurrency

TDD Note

After this bug is verified, a Type/Testing issue will be created with a TDD test tagged @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [resource] — `Agent.dispose()` leaves asyncio tasks running ### Severity Assessment - **Impact**: After `dispose()` is called, any in-flight `_process_wrapper` tasks continue executing. When they complete, they call `self.output_stream.on_next(result)` on an already-disposed RxPY Subject, which can cause unhandled exceptions or silent errors in the disposed stream. - **Likelihood**: Medium — triggered when agents are disposed while messages are in-flight (e.g., during shutdown or agent replacement) - **Priority**: High ### Location - **File**: `src/cleveragents/agents/base.py` - **Class**: `Agent` - **Method**: `dispose` - **Lines**: 86–90 ### Description The `dispose()` method disposes the RxPY Subjects (`input_stream` and `output_stream`) but does **not** cancel the asyncio tasks stored in `self._tasks`: ```python # base.py lines 86-90 def dispose(self) -> None: if hasattr(self.input_stream, "dispose"): self.input_stream.dispose() if hasattr(self.output_stream, "dispose"): self.output_stream.dispose() # NOTE: self._tasks is NOT cancelled! ``` After `output_stream.dispose()` is called, any still-running task that later calls `self.output_stream.on_next(result)` will interact with a disposed Subject. Depending on the RxPY version, this can raise `DisposedException` or silently fail. Additionally, the tasks represent leaked resources — they hold references to `self` and its state, preventing garbage collection of the agent. ### Expected Behavior `dispose()` should cancel all pending tasks before disposing the streams: ```python def dispose(self) -> None: for task in list(self._tasks): task.cancel() self._tasks.clear() if hasattr(self.input_stream, "dispose"): self.input_stream.dispose() if hasattr(self.output_stream, "dispose"): self.output_stream.dispose() ``` ### Actual Behavior Pending tasks continue running after disposal. They will attempt to emit to the already-disposed `output_stream`, causing undefined behavior. ### Category resource / concurrency ### TDD Note After this bug is verified, a Type/Testing issue will be created with a TDD test tagged `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 21:27:53 +00:00
Author
Owner

Verified — Valid resource management bug. Background tasks continue running after disposal, potentially emitting to a disposed output stream. MoSCoW: Should Have (confirmed) — resource leak that could cause subtle bugs in long-running sessions.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Valid resource management bug. Background tasks continue running after disposal, potentially emitting to a disposed output stream. **MoSCoW: Should Have** (confirmed) — resource leak that could cause subtle bugs in long-running sessions. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6529
No description provided.