langgraph/graph: add failing test proving LangGraph._setup_node_stream_subscriptions never disposes RxPy subscriptions #10396

Open
opened 2026-04-18 09:28:38 +00:00 by HAL9000 · 0 comments
Owner

TDD Test Specification

Tags

@tdd_issue @tdd_issue_2 @tdd_expected_fail

Test Description

Add a failing unit test that proves LangGraph._setup_node_stream_subscriptions() discards the Disposable returned by observable.subscribe(), leaving subscriptions active after stop() is called.

Expected Failing Test

# tests/unit/langgraph/test_graph_subscription_disposal.py
import pytest
from unittest.mock import MagicMock, patch
from cleveragents.langgraph.graph import LangGraph, GraphConfig

def test_subscriptions_are_disposed_on_stop():
    """LangGraph.stop() should dispose all node stream subscriptions."""
    config = GraphConfig(name="test_graph")
    
    disposable_mocks = []
    
    original_subscribe = None
    
    def mock_subscribe(observer):
        disposable = MagicMock()
        disposable_mocks.append(disposable)
        return disposable
    
    with patch.object(
        LangGraph, '_setup_node_stream_subscriptions',
        wraps=lambda self: _patched_setup(self, mock_subscribe)
    ):
        graph = LangGraph(config=config)
        graph.stop()
    
    # All subscriptions should have been disposed
    assert len(disposable_mocks) > 0, "Expected at least one subscription"
    for disposable in disposable_mocks:
        disposable.dispose.assert_called_once(), f"Subscription was not disposed on stop()"


def _patched_setup(graph, mock_subscribe):
    """Patched version that tracks subscription disposables."""
    for node_name in graph.nodes:
        stream_name = f"__{graph.name}_node_{node_name}__"
        if stream_name in graph.stream_router.observables:
            observable = graph.stream_router.observables[stream_name]
            # Patch subscribe to track disposables
            original = observable.subscribe
            observable.subscribe = mock_subscribe

Why This Test Currently Fails

In src/cleveragents/langgraph/graph.py, _setup_node_stream_subscriptions (lines ~152-169):

def _setup_node_stream_subscriptions(self) -> None:
    for node_name in self.nodes:
        stream_name = f"__{self.name}_node_{node_name}__"
        if stream_name in self.stream_router.observables:
            observable = self.stream_router.observables[stream_name]
            # ...
            observer = Observer(on_next=on_next, on_error=on_error, on_completed=on_completed)
            observable.subscribe(observer)  # ← Return value (Disposable) is DISCARDED!

The observable.subscribe(observer) call returns a Disposable object in RxPY. This return value is discarded, so:

  1. Subscriptions are never stored
  2. stop() cannot dispose them
  3. Node executors continue running after stop() is called
  4. The LangGraph instance cannot be garbage collected (subscriptions hold references via self.logger)

Acceptance Criteria

  • Test file exists at tests/unit/langgraph/test_graph_subscription_disposal.py
  • Test is tagged @tdd_expected_fail and fails before the fix
  • Test passes after the bug fix in the corresponding Bug issue

Subtasks

  • Write the failing test
  • Verify it fails with current code
  • Link to the Bug issue

Definition of Done

This issue is complete when:

  • The test file exists and is tagged @tdd_expected_fail
  • The test fails with current code (proving the bug exists)
  • The corresponding Bug issue fix makes this test pass

Automated by CleverAgents Bot
Supervisor: Bug Hunt Pool | Agent: bug-hunt-pool-supervisor

## TDD Test Specification ### Tags @tdd_issue @tdd_issue_2 @tdd_expected_fail ### Test Description Add a failing unit test that proves `LangGraph._setup_node_stream_subscriptions()` discards the `Disposable` returned by `observable.subscribe()`, leaving subscriptions active after `stop()` is called. ### Expected Failing Test ```python # tests/unit/langgraph/test_graph_subscription_disposal.py import pytest from unittest.mock import MagicMock, patch from cleveragents.langgraph.graph import LangGraph, GraphConfig def test_subscriptions_are_disposed_on_stop(): """LangGraph.stop() should dispose all node stream subscriptions.""" config = GraphConfig(name="test_graph") disposable_mocks = [] original_subscribe = None def mock_subscribe(observer): disposable = MagicMock() disposable_mocks.append(disposable) return disposable with patch.object( LangGraph, '_setup_node_stream_subscriptions', wraps=lambda self: _patched_setup(self, mock_subscribe) ): graph = LangGraph(config=config) graph.stop() # All subscriptions should have been disposed assert len(disposable_mocks) > 0, "Expected at least one subscription" for disposable in disposable_mocks: disposable.dispose.assert_called_once(), f"Subscription was not disposed on stop()" def _patched_setup(graph, mock_subscribe): """Patched version that tracks subscription disposables.""" for node_name in graph.nodes: stream_name = f"__{graph.name}_node_{node_name}__" if stream_name in graph.stream_router.observables: observable = graph.stream_router.observables[stream_name] # Patch subscribe to track disposables original = observable.subscribe observable.subscribe = mock_subscribe ``` ### Why This Test Currently Fails In `src/cleveragents/langgraph/graph.py`, `_setup_node_stream_subscriptions` (lines ~152-169): ```python def _setup_node_stream_subscriptions(self) -> None: for node_name in self.nodes: stream_name = f"__{self.name}_node_{node_name}__" if stream_name in self.stream_router.observables: observable = self.stream_router.observables[stream_name] # ... observer = Observer(on_next=on_next, on_error=on_error, on_completed=on_completed) observable.subscribe(observer) # ← Return value (Disposable) is DISCARDED! ``` The `observable.subscribe(observer)` call returns a `Disposable` object in RxPY. This return value is discarded, so: 1. Subscriptions are never stored 2. `stop()` cannot dispose them 3. Node executors continue running after `stop()` is called 4. The `LangGraph` instance cannot be garbage collected (subscriptions hold references via `self.logger`) ### Acceptance Criteria - [ ] Test file exists at `tests/unit/langgraph/test_graph_subscription_disposal.py` - [ ] Test is tagged `@tdd_expected_fail` and fails before the fix - [ ] Test passes after the bug fix in the corresponding Bug issue ### Subtasks - [ ] Write the failing test - [ ] Verify it fails with current code - [ ] Link to the Bug issue ## Definition of Done This issue is complete when: - The test file exists and is tagged `@tdd_expected_fail` - The test fails with current code (proving the bug exists) - The corresponding Bug issue fix makes this test pass --- **Automated by CleverAgents Bot** Supervisor: Bug Hunt Pool | Agent: bug-hunt-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#10396
No description provided.