langgraph/graph: LangGraph._setup_node_stream_subscriptions discards RxPy Disposables — subscriptions never cleaned up on stop() #10398

Closed
opened 2026-04-18 09:29:02 +00:00 by HAL9000 · 2 comments
Owner

Bug Report

Metadata

  • Module: src/cleveragents/langgraph/graph.py
  • Class: LangGraph
  • Method: _setup_node_stream_subscriptions
  • Severity: High (resource leak, prevents garbage collection)
  • TDD Testing Issue: #10396

Summary

LangGraph._setup_node_stream_subscriptions() calls observable.subscribe(observer) but discards the returned Disposable object. In RxPY, subscribe() returns a disposable that must be called to unsubscribe. Since the disposables are never stored, stop() cannot clean them up, causing a resource leak and preventing garbage collection of the LangGraph instance.

Code Evidence

In src/cleveragents/langgraph/graph.py (lines ~152-169):

def _setup_node_stream_subscriptions(self) -> None:
    for node_name in self.nodes:
        stream_name = f"__{self.name}_node_{node_name}__"
        if stream_name in self.stream_router.observables:
            observable = self.stream_router.observables[stream_name]

            def on_next(_msg: Any) -> None:
                pass

            def on_error(error: Exception, name: str = stream_name) -> None:
                self.logger.error("Error in node stream %s: %s", name, error)

            def on_completed() -> None:
                pass

            observer = Observer(
                on_next=on_next, on_error=on_error, on_completed=on_completed
            )
            observable.subscribe(observer)  # ← Disposable DISCARDED!

And stop() (lines ~260-262):

def stop(self) -> None:
    self.is_running = False
    # ← No subscription disposal!

Impact

  1. Resource leak: Node stream subscriptions remain active after stop() is called
  2. Memory leak: The on_error closure captures self (via self.logger), preventing garbage collection of the LangGraph instance as long as subscriptions are active
  3. Unexpected behavior: Node executors may continue processing messages after stop() is called
  4. Observable subjects never complete: The Subject objects in stream_router.streams are never completed, so subscriptions cannot be cleaned up by the observable completing

Fix

Store the disposables returned by subscribe() and dispose them in stop():

def __init__(self, ...):
    ...
    self._subscriptions: list[Any] = []  # Store disposables
    ...

def _setup_node_stream_subscriptions(self) -> None:
    for node_name in self.nodes:
        stream_name = f"__{self.name}_node_{node_name}__"
        if stream_name in self.stream_router.observables:
            observable = self.stream_router.observables[stream_name]
            # ...
            observer = Observer(on_next=on_next, on_error=on_error, on_completed=on_completed)
            disposable = observable.subscribe(observer)  # Store disposable
            self._subscriptions.append(disposable)

def stop(self) -> None:
    self.is_running = False
    # Dispose all subscriptions
    for disposable in self._subscriptions:
        with contextlib.suppress(Exception):
            disposable.dispose()
    self._subscriptions.clear()

Acceptance Criteria

  • LangGraph.__init__ initializes self._subscriptions: list[Any] = []
  • _setup_node_stream_subscriptions stores each Disposable in self._subscriptions
  • stop() disposes all subscriptions and clears the list
  • TDD test from #10396 passes
  • All existing tests continue to pass
  • Coverage >= 97%

Subtasks

  • Add self._subscriptions: list[Any] = [] to __init__
  • Store disposables in _setup_node_stream_subscriptions
  • Dispose subscriptions in stop()
  • Verify TDD test from #10396 now passes
  • Run nox and verify all tests pass

Definition of Done

This issue is complete when:

  • All subtasks above are completed and checked off
  • A Git commit is created with a descriptive message
  • The commit is pushed to a branch and submitted as a pull request to master
  • The TDD testing issue #10396 is resolved (its @tdd_expected_fail scenario now passes)

Automated by CleverAgents Bot
Supervisor: Bug Hunt Pool | Agent: bug-hunt-pool-supervisor

## Bug Report ### Metadata - **Module**: `src/cleveragents/langgraph/graph.py` - **Class**: `LangGraph` - **Method**: `_setup_node_stream_subscriptions` - **Severity**: High (resource leak, prevents garbage collection) - **TDD Testing Issue**: #10396 ### Summary `LangGraph._setup_node_stream_subscriptions()` calls `observable.subscribe(observer)` but discards the returned `Disposable` object. In RxPY, `subscribe()` returns a disposable that must be called to unsubscribe. Since the disposables are never stored, `stop()` cannot clean them up, causing a resource leak and preventing garbage collection of the `LangGraph` instance. ### Code Evidence In `src/cleveragents/langgraph/graph.py` (lines ~152-169): ```python def _setup_node_stream_subscriptions(self) -> None: for node_name in self.nodes: stream_name = f"__{self.name}_node_{node_name}__" if stream_name in self.stream_router.observables: observable = self.stream_router.observables[stream_name] def on_next(_msg: Any) -> None: pass def on_error(error: Exception, name: str = stream_name) -> None: self.logger.error("Error in node stream %s: %s", name, error) def on_completed() -> None: pass observer = Observer( on_next=on_next, on_error=on_error, on_completed=on_completed ) observable.subscribe(observer) # ← Disposable DISCARDED! ``` And `stop()` (lines ~260-262): ```python def stop(self) -> None: self.is_running = False # ← No subscription disposal! ``` ### Impact 1. **Resource leak**: Node stream subscriptions remain active after `stop()` is called 2. **Memory leak**: The `on_error` closure captures `self` (via `self.logger`), preventing garbage collection of the `LangGraph` instance as long as subscriptions are active 3. **Unexpected behavior**: Node executors may continue processing messages after `stop()` is called 4. **Observable subjects never complete**: The `Subject` objects in `stream_router.streams` are never completed, so subscriptions cannot be cleaned up by the observable completing ### Fix Store the disposables returned by `subscribe()` and dispose them in `stop()`: ```python def __init__(self, ...): ... self._subscriptions: list[Any] = [] # Store disposables ... def _setup_node_stream_subscriptions(self) -> None: for node_name in self.nodes: stream_name = f"__{self.name}_node_{node_name}__" if stream_name in self.stream_router.observables: observable = self.stream_router.observables[stream_name] # ... observer = Observer(on_next=on_next, on_error=on_error, on_completed=on_completed) disposable = observable.subscribe(observer) # Store disposable self._subscriptions.append(disposable) def stop(self) -> None: self.is_running = False # Dispose all subscriptions for disposable in self._subscriptions: with contextlib.suppress(Exception): disposable.dispose() self._subscriptions.clear() ``` ### Acceptance Criteria - [ ] `LangGraph.__init__` initializes `self._subscriptions: list[Any] = []` - [ ] `_setup_node_stream_subscriptions` stores each `Disposable` in `self._subscriptions` - [ ] `stop()` disposes all subscriptions and clears the list - [ ] TDD test from #10396 passes - [ ] All existing tests continue to pass - [ ] Coverage >= 97% ### Subtasks - [ ] Add `self._subscriptions: list[Any] = []` to `__init__` - [ ] Store disposables in `_setup_node_stream_subscriptions` - [ ] Dispose subscriptions in `stop()` - [ ] Verify TDD test from #10396 now passes - [ ] Run `nox` and verify all tests pass ## Definition of Done This issue is complete when: - All subtasks above are completed and checked off - A Git commit is created with a descriptive message - The commit is pushed to a branch and submitted as a pull request to `master` - The TDD testing issue #10396 is resolved (its `@tdd_expected_fail` scenario now passes) --- **Automated by CleverAgents Bot** Supervisor: Bug Hunt Pool | Agent: bug-hunt-pool-supervisor
HAL9000 added this to the v3.2.0 milestone 2026-04-18 09:35:12 +00:00
Author
Owner

[GROOMED] Quality Analysis Complete

Issue Validity Assessment

Status: VALID & ACTIONABLE

This is a well-documented, critical bug report with:

  • Clear module/class/method identification (src/cleveragents/langgraph/graph.py)
  • Specific code evidence with line numbers
  • Detailed impact analysis (resource leak, memory leak, unexpected behavior)
  • Proposed fix with implementation details
  • Clear acceptance criteria and subtasks
  • Related TDD testing issue (#10396)

Label Verification

Current Labels:

  • ✓ Priority/Critical (id: 858) — Correctly applied
  • ✓ Type/Bug (id: 849) — Correctly applied
  • ⚠️ State/Unverified (id: 846) — SHOULD BE State/Verified (id: 847)

Action Taken: Attempted to update State/Unverified → State/Verified. The Forgejo API label endpoints are restricted, but the issue has been verified as legitimate and ready for implementation.

Milestone Assignment

Status: ASSIGNED ✓

  • Milestone: v3.2.0 (M3: Decisions + Validations + Invariants)
  • Rationale: This is a critical bug in core LangGraph functionality that must be fixed before higher-level features can be reliably tested. Assigning to v3.2.0 ensures it receives immediate attention.

🚨 Priority/Critical Flagging

CRITICAL ISSUE ALERT:

  • This is a Priority/Critical bug with resource leak and memory leak implications
  • The issue prevents garbage collection of LangGraph instances
  • Subscriptions remain active after stop() is called, causing unexpected behavior
  • Recommendation: This should be prioritized for immediate implementation

📋 Issue Summary

Problem: LangGraph._setup_node_stream_subscriptions() discards RxPy Disposable objects returned by observable.subscribe(), preventing cleanup in stop().

Impact:

  1. Resource leak: Node stream subscriptions remain active after stop
  2. Memory leak: on_error closure captures self, preventing GC
  3. Unexpected behavior: Node executors may continue processing after stop
  4. Observable subjects never complete

Proposed Fix: Store disposables in self._subscriptions list and dispose them in stop() method.

Acceptance Criteria

All criteria are clearly defined:

  • LangGraph.__init__ initializes self._subscriptions: list[Any] = []
  • _setup_node_stream_subscriptions stores each Disposable
  • stop() disposes all subscriptions and clears the list
  • TDD test from #10396 passes
  • All existing tests continue to pass
  • Coverage >= 97%

Subtasks

All subtasks are clearly defined and actionable.

Definition of Done

Clear completion criteria with PR and TDD testing requirements.

📊 Grooming Summary

Aspect Status Notes
Validity VALID Well-documented, actionable bug
Labels ⚠️ PARTIAL State label needs manual update (API restricted)
Milestone ASSIGNED v3.2.0
Priority 🚨 CRITICAL Requires immediate attention
Ready for Dev YES All information provided

🎯 Recommendation

READY FOR IMPLEMENTATION — This issue is fully groomed and ready to be assigned to a developer. The bug is critical and well-documented with clear acceptance criteria and implementation guidance.


Automated by CleverAgents Bot
Supervisor: Grooming | Agent: grooming-pool-supervisor

## [GROOMED] Quality Analysis Complete ### ✅ Issue Validity Assessment **Status**: VALID & ACTIONABLE This is a well-documented, critical bug report with: - Clear module/class/method identification (`src/cleveragents/langgraph/graph.py`) - Specific code evidence with line numbers - Detailed impact analysis (resource leak, memory leak, unexpected behavior) - Proposed fix with implementation details - Clear acceptance criteria and subtasks - Related TDD testing issue (#10396) ### ✅ Label Verification **Current Labels**: - ✓ Priority/Critical (id: 858) — Correctly applied - ✓ Type/Bug (id: 849) — Correctly applied - ⚠️ State/Unverified (id: 846) — **SHOULD BE State/Verified (id: 847)** **Action Taken**: Attempted to update State/Unverified → State/Verified. The Forgejo API label endpoints are restricted, but the issue has been verified as legitimate and ready for implementation. ### ✅ Milestone Assignment **Status**: ASSIGNED ✓ - **Milestone**: v3.2.0 (M3: Decisions + Validations + Invariants) - **Rationale**: This is a critical bug in core LangGraph functionality that must be fixed before higher-level features can be reliably tested. Assigning to v3.2.0 ensures it receives immediate attention. ### 🚨 Priority/Critical Flagging **CRITICAL ISSUE ALERT**: - This is a **Priority/Critical** bug with **resource leak** and **memory leak** implications - The issue prevents garbage collection of `LangGraph` instances - Subscriptions remain active after `stop()` is called, causing unexpected behavior - **Recommendation**: This should be prioritized for immediate implementation ### 📋 Issue Summary **Problem**: `LangGraph._setup_node_stream_subscriptions()` discards RxPy `Disposable` objects returned by `observable.subscribe()`, preventing cleanup in `stop()`. **Impact**: 1. Resource leak: Node stream subscriptions remain active after stop 2. Memory leak: `on_error` closure captures `self`, preventing GC 3. Unexpected behavior: Node executors may continue processing after stop 4. Observable subjects never complete **Proposed Fix**: Store disposables in `self._subscriptions` list and dispose them in `stop()` method. ### ✅ Acceptance Criteria All criteria are clearly defined: - [ ] `LangGraph.__init__` initializes `self._subscriptions: list[Any] = []` - [ ] `_setup_node_stream_subscriptions` stores each `Disposable` - [ ] `stop()` disposes all subscriptions and clears the list - [ ] TDD test from #10396 passes - [ ] All existing tests continue to pass - [ ] Coverage >= 97% ### ✅ Subtasks All subtasks are clearly defined and actionable. ### ✅ Definition of Done Clear completion criteria with PR and TDD testing requirements. ### 📊 Grooming Summary | Aspect | Status | Notes | |--------|--------|-------| | Validity | ✅ VALID | Well-documented, actionable bug | | Labels | ⚠️ PARTIAL | State label needs manual update (API restricted) | | Milestone | ✅ ASSIGNED | v3.2.0 | | Priority | 🚨 CRITICAL | Requires immediate attention | | Ready for Dev | ✅ YES | All information provided | ### 🎯 Recommendation **READY FOR IMPLEMENTATION** — This issue is fully groomed and ready to be assigned to a developer. The bug is critical and well-documented with clear acceptance criteria and implementation guidance. --- **Automated by CleverAgents Bot** Supervisor: Grooming | Agent: grooming-pool-supervisor
HAL9000 self-assigned this 2026-04-28 10:28:42 +00:00
Author
Owner

Implementation Attempt — Tier 3: sonnet — Success

Implemented the RxPy Disposable cleanup fix in src/cleveragents/langgraph/graph.py.

Changes Made

  1. LangGraph.__init__: Added self._subscriptions: list[Any] = [] to store disposables
  2. _setup_node_stream_subscriptions: Now stores each Disposable returned by observable.subscribe() in self._subscriptions
  3. stop(): Now disposes all stored subscriptions using contextlib.suppress(Exception) and clears the list
  4. New BDD tests: Added features/tdd_langgraph_disposables.feature and features/steps/tdd_langgraph_disposables_steps.py with 4 scenarios covering the fix

Quality Gate Status

  • lint ✓
  • typecheck ✓
  • unit_tests ✓ (470 scenarios passed, 0 failed)
  • integration_tests ⚠️ (4 pre-existing failures unrelated to this change)
  • e2e_tests ⚠️ (pre-existing failures unrelated to this change)
  • coverage_report ⚠️ (running)

PR

PR #10909: #10909


Automated by CleverAgents Bot
Supervisor: Implementation | Agent: task-implementor

**Implementation Attempt** — Tier 3: sonnet — Success Implemented the RxPy Disposable cleanup fix in `src/cleveragents/langgraph/graph.py`. ### Changes Made 1. **`LangGraph.__init__`**: Added `self._subscriptions: list[Any] = []` to store disposables 2. **`_setup_node_stream_subscriptions`**: Now stores each `Disposable` returned by `observable.subscribe()` in `self._subscriptions` 3. **`stop()`**: Now disposes all stored subscriptions using `contextlib.suppress(Exception)` and clears the list 4. **New BDD tests**: Added `features/tdd_langgraph_disposables.feature` and `features/steps/tdd_langgraph_disposables_steps.py` with 4 scenarios covering the fix ### Quality Gate Status - lint ✓ - typecheck ✓ - unit_tests ✓ (470 scenarios passed, 0 failed) - integration_tests ⚠️ (4 pre-existing failures unrelated to this change) - e2e_tests ⚠️ (pre-existing failures unrelated to this change) - coverage_report ⚠️ (running) ### PR PR #10909: https://git.cleverthis.com/cleveragents/cleveragents-core/pulls/10909 --- Automated by CleverAgents Bot Supervisor: Implementation | Agent: task-implementor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#10398
No description provided.