BUG-HUNT: [resource] run_single_shot creates RxPY subscriptions on every call that are never disposed — unbounded subscription accumulation and duplicate callbacks #6530

Open
opened 2026-04-09 21:15:27 +00:00 by HAL9000 · 0 comments
Owner

Bug Report: [resource] run_single_shot Subscription Leak

Severity Assessment

  • Impact: On repeated calls to run_single_shot, RxPY subscriptions accumulate in the __output__ and __error__ observables without bound. Each call to on_next(prompt) triggers all prior on_next callbacks, so stale closures from previous calls receive data they should never see. Under long-running usage this causes unbounded memory growth proportional to the number of calls.
  • Likelihood: High — every invocation of run_single_shot creates two new subscriptions. Any interactive session or looped usage (e.g. run_with_context) leaks.
  • Priority: High

Location

  • File: src/cleveragents/reactive/application.py
  • Function: ReactiveCleverAgentsApp.run_single_shot
  • Lines: 395–411

Description

run_single_shot creates two anonymous subscriptions on each call:

# lines 407-408
self.stream_router.observables["__output__"].subscribe(on_next, on_error)
self.stream_router.observables["__error__"].subscribe(on_error)

The return values (RxPY Disposable objects) are discarded. Neither subscription is ever stored in self.stream_router.subscriptions, nor are they disposed when the method returns.

Because ReactiveStreamRouter.dispose() only iterates self.subscriptions, these two subscriptions survive across the entire lifetime of the ReactiveCleverAgentsApp instance.

Consequences:

  1. Memory leak – The on_next and on_error closures (which each capture a list and hold a reference to self.stream_router) are kept alive by the observable's subscriber list indefinitely.
  2. Duplicate callbacks – On the Nth call to run_single_shot, there are N registered on_next callbacks. When __input__.on_next(prompt) is processed, every past on_next closure fires, appending to its own stale result_container. While those stale containers are unreachable from the caller, the RxPY scheduler still executes all N handlers, causing unintended cross-call execution.
  3. run_with_context amplificationrun_with_context calls run_single_shot for every user turn, so in a session with M turns there are 2M live subscriptions by the end.

Evidence

# application.py lines 395-411
result_container: list[str] = []
error_container: list[str] = []

def on_next(msg: Any) -> None:
    result_container.append(str(getattr(msg, "content", msg)))

def on_error(err: Exception) -> None:
    error_container.append(str(err))

# Subscriptions created — return values (Disposable) silently dropped
self.stream_router.observables["__output__"].subscribe(on_next, on_error)
self.stream_router.observables["__error__"].subscribe(on_error)

self.stream_router.streams["__input__"].on_next(prompt)
await asyncio.sleep(0)

Compare with ReactiveStreamRouter.dispose() which only disposes self.subscriptions:

# stream_router.py
def dispose(self) -> None:
    for subscription in self.subscriptions[:]:   # <-- never includes run_single_shot subs
        ...

Expected Behaviour

Each call to run_single_shot should hold its subscriptions, await completion, then dispose them before returning — leaving __output__ with no stale subscribers.

Actual Behaviour

Subscriptions accumulate on every call; memory grows without bound and stale callbacks execute on future calls.

Suggested Fix

Store the disposables and clean them up after the call:

disposables = []
disposables.append(
    self.stream_router.observables["__output__"].subscribe(on_next, on_error)
)
disposables.append(
    self.stream_router.observables["__error__"].subscribe(on_error)
)
try:
    self.stream_router.streams["__input__"].on_next(prompt)
    await asyncio.sleep(0)
finally:
    for d in disposables:
        with contextlib.suppress(Exception):
            d.dispose()

Category

resource / memory-leak

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [resource] `run_single_shot` Subscription Leak ### Severity Assessment - **Impact**: On repeated calls to `run_single_shot`, RxPY subscriptions accumulate in the `__output__` and `__error__` observables without bound. Each call to `on_next(prompt)` triggers *all* prior `on_next` callbacks, so stale closures from previous calls receive data they should never see. Under long-running usage this causes unbounded memory growth proportional to the number of calls. - **Likelihood**: High — every invocation of `run_single_shot` creates two new subscriptions. Any interactive session or looped usage (e.g. `run_with_context`) leaks. - **Priority**: High ### Location - **File**: `src/cleveragents/reactive/application.py` - **Function**: `ReactiveCleverAgentsApp.run_single_shot` - **Lines**: 395–411 ### Description `run_single_shot` creates two anonymous subscriptions on each call: ```python # lines 407-408 self.stream_router.observables["__output__"].subscribe(on_next, on_error) self.stream_router.observables["__error__"].subscribe(on_error) ``` The return values (RxPY `Disposable` objects) are discarded. Neither subscription is ever stored in `self.stream_router.subscriptions`, nor are they disposed when the method returns. Because `ReactiveStreamRouter.dispose()` only iterates `self.subscriptions`, these two subscriptions survive across the entire lifetime of the `ReactiveCleverAgentsApp` instance. **Consequences:** 1. **Memory leak** – The `on_next` and `on_error` closures (which each capture a `list` and hold a reference to `self.stream_router`) are kept alive by the observable's subscriber list indefinitely. 2. **Duplicate callbacks** – On the Nth call to `run_single_shot`, there are N registered `on_next` callbacks. When `__input__.on_next(prompt)` is processed, every past `on_next` closure fires, appending to its own stale `result_container`. While those stale containers are unreachable from the caller, the RxPY scheduler still executes all N handlers, causing unintended cross-call execution. 3. **`run_with_context` amplification** – `run_with_context` calls `run_single_shot` for every user turn, so in a session with M turns there are 2M live subscriptions by the end. ### Evidence ```python # application.py lines 395-411 result_container: list[str] = [] error_container: list[str] = [] def on_next(msg: Any) -> None: result_container.append(str(getattr(msg, "content", msg))) def on_error(err: Exception) -> None: error_container.append(str(err)) # Subscriptions created — return values (Disposable) silently dropped self.stream_router.observables["__output__"].subscribe(on_next, on_error) self.stream_router.observables["__error__"].subscribe(on_error) self.stream_router.streams["__input__"].on_next(prompt) await asyncio.sleep(0) ``` Compare with `ReactiveStreamRouter.dispose()` which only disposes `self.subscriptions`: ```python # stream_router.py def dispose(self) -> None: for subscription in self.subscriptions[:]: # <-- never includes run_single_shot subs ... ``` ### Expected Behaviour Each call to `run_single_shot` should hold its subscriptions, await completion, then dispose them before returning — leaving `__output__` with no stale subscribers. ### Actual Behaviour Subscriptions accumulate on every call; memory grows without bound and stale callbacks execute on future calls. ### Suggested Fix Store the disposables and clean them up after the call: ```python disposables = [] disposables.append( self.stream_router.observables["__output__"].subscribe(on_next, on_error) ) disposables.append( self.stream_router.observables["__error__"].subscribe(on_error) ) try: self.stream_router.streams["__input__"].on_next(prompt) await asyncio.sleep(0) finally: for d in disposables: with contextlib.suppress(Exception): d.dispose() ``` ### Category resource / memory-leak ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 21:27:53 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6530
No description provided.