BUG-HUNT: [resource] _build_routes creates stream subscriptions that are not tracked in stream_router.subscriptions — subscriptions leak on dispose() #6540

Open
opened 2026-04-09 21:16:32 +00:00 by HAL9000 · 0 comments
Owner

Bug Report: [resource] Untracked Stream Subscriptions in _build_routes

Severity Assessment

  • Impact: When ReactiveStreamRouter.dispose() is called, it only disposes subscriptions stored in self.subscriptions. The three subscription points in _build_routes bypass this list entirely, so the RxPY Subject/Observable subscriber chains are never cleaned up. Disposed streams still hold live subscriber references, which prevents GC and keeps callbacks alive even after the router is torn down.
  • Likelihood: Medium — triggered whenever a YAML config with stream routes is loaded and the application is later torn down (e.g. in tests, server restarts, or any code that calls dispose()).
  • Priority: Medium

Location

  • File: src/cleveragents/reactive/application.py
  • Function: ReactiveCleverAgentsApp._build_routes
  • Lines: 252–296

Description

_build_routes creates three kinds of subscriptions that are silently discarded:

1. __input__ → stream subscription (line 269):

if not stream_cfg.subscriptions:
    self.stream_router.streams["__input__"].subscribe(
        self.stream_router.streams[stream_cfg.name]
    )

2. Stream → __output__ subscription (line 273):

if stream_cfg.name != "__output__":
    self.stream_router.observables[stream_cfg.name].subscribe(
        self.stream_router.streams["__output__"]
    )

3. Merge subscriptions (line 292):

self.stream_router.streams[src].subscribe(
    lambda msg, tgt=target: self.stream_router.streams[tgt].on_next(msg)
)

In all three cases the return value of .subscribe() — an RxPY Disposable — is dropped on the floor. Compare with ReactiveStreamRouter._setup_subscriptions which correctly stores its disposables:

# stream_router.py _setup_subscriptions (correct pattern)
subscription = source_stream.subscribe(self.streams[config.name])
self.subscriptions.append(subscription)   # tracked for later disposal

And ReactiveStreamRouter.dispose():

def dispose(self) -> None:
    for subscription in self.subscriptions[:]:   # <-- only these are cleaned up
        with contextlib.suppress(Exception):
            if hasattr(subscription, "dispose"):
                subscription.dispose()

Subscriptions created in _build_routes never appear in self.subscriptions, so dispose() cannot clean them up.

Expected Behaviour

All subscriptions created during route building should be registered in self.stream_router.subscriptions so that dispose() fully tears down the reactive graph.

Actual Behaviour

dispose() leaves the __input__ → stream and stream → __output__ subscriptions live; the subjects retain references to their subscriber callables and cannot be GC'd.

Suggested Fix

Wrap each .subscribe() call in _build_routes and append the result to self.stream_router.subscriptions:

sub = self.stream_router.streams["__input__"].subscribe(
    self.stream_router.streams[stream_cfg.name]
)
self.stream_router.subscriptions.append(sub)

Apply the same pattern to all three subscription sites.

Category

resource / subscription-leak

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [resource] Untracked Stream Subscriptions in `_build_routes` ### Severity Assessment - **Impact**: When `ReactiveStreamRouter.dispose()` is called, it only disposes subscriptions stored in `self.subscriptions`. The three subscription points in `_build_routes` bypass this list entirely, so the RxPY Subject/Observable subscriber chains are never cleaned up. Disposed streams still hold live subscriber references, which prevents GC and keeps callbacks alive even after the router is torn down. - **Likelihood**: Medium — triggered whenever a YAML config with stream routes is loaded and the application is later torn down (e.g. in tests, server restarts, or any code that calls `dispose()`). - **Priority**: Medium ### Location - **File**: `src/cleveragents/reactive/application.py` - **Function**: `ReactiveCleverAgentsApp._build_routes` - **Lines**: 252–296 ### Description `_build_routes` creates three kinds of subscriptions that are silently discarded: **1. `__input__` → stream subscription (line 269):** ```python if not stream_cfg.subscriptions: self.stream_router.streams["__input__"].subscribe( self.stream_router.streams[stream_cfg.name] ) ``` **2. Stream → `__output__` subscription (line 273):** ```python if stream_cfg.name != "__output__": self.stream_router.observables[stream_cfg.name].subscribe( self.stream_router.streams["__output__"] ) ``` **3. Merge subscriptions (line 292):** ```python self.stream_router.streams[src].subscribe( lambda msg, tgt=target: self.stream_router.streams[tgt].on_next(msg) ) ``` In all three cases the return value of `.subscribe()` — an RxPY `Disposable` — is dropped on the floor. Compare with `ReactiveStreamRouter._setup_subscriptions` which correctly stores its disposables: ```python # stream_router.py _setup_subscriptions (correct pattern) subscription = source_stream.subscribe(self.streams[config.name]) self.subscriptions.append(subscription) # tracked for later disposal ``` And `ReactiveStreamRouter.dispose()`: ```python def dispose(self) -> None: for subscription in self.subscriptions[:]: # <-- only these are cleaned up with contextlib.suppress(Exception): if hasattr(subscription, "dispose"): subscription.dispose() ``` Subscriptions created in `_build_routes` never appear in `self.subscriptions`, so `dispose()` cannot clean them up. ### Expected Behaviour All subscriptions created during route building should be registered in `self.stream_router.subscriptions` so that `dispose()` fully tears down the reactive graph. ### Actual Behaviour `dispose()` leaves the `__input__` → stream and stream → `__output__` subscriptions live; the subjects retain references to their subscriber callables and cannot be GC'd. ### Suggested Fix Wrap each `.subscribe()` call in `_build_routes` and append the result to `self.stream_router.subscriptions`: ```python sub = self.stream_router.streams["__input__"].subscribe( self.stream_router.streams[stream_cfg.name] ) self.stream_router.subscriptions.append(sub) ``` Apply the same pattern to all three subscription sites. ### Category resource / subscription-leak ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 21:27:55 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6540
No description provided.