[events/ReactiveEventBus] TDD: ReactiveEventBus._subject is never completed — no close/shutdown method causes RxPY subscriber resource leaks #10377

Open
opened 2026-04-18 09:17:17 +00:00 by HAL9000 · 0 comments
Owner

Metadata

  • Commit: fix(events): add close() method to ReactiveEventBus to complete RxPY subject
  • Branch: fix/reactive-event-bus-close-method

Background and Context

ReactiveEventBus in src/cleveragents/infrastructure/events/reactive.py creates an RxPY Subject but never calls on_completed() on it. There is no close() or shutdown() method. RxPY subscribers that wait for stream completion (e.g., using to_list(), last(), or buffering operators) will wait forever, and internal RxPY state is never cleaned up.

Expected Behavior

A failing test (tagged @tdd_issue, @tdd_issue_N, @tdd_expected_fail) exists that verifies ReactiveEventBus provides a close() method that completes the RxPY subject. The test fails before the fix and passes after.

Acceptance Criteria

  • Test is written and tagged with @tdd_issue, @tdd_issue_N, @tdd_expected_fail
  • Test fails before the fix is applied (AttributeError on close() call)
  • Test passes after the fix is applied
  • Context manager test verifies automatic cleanup

Subtasks

  • Write failing test: ReactiveEventBus.close() completes the RxPY stream
  • Write failing test: ReactiveEventBus.close() prevents further emit() calls
  • Write failing test: ReactiveEventBus implements context manager protocol
  • Tag all tests with @tdd_issue, @tdd_issue_N, @tdd_expected_fail
  • Confirm tests fail with AttributeError: 'ReactiveEventBus' object has no attribute 'close'

Definition of Done

This issue is closed when:

  1. All three failing tests are written and tagged correctly
  2. Tests fail before the fix (confirming the bug exists)
  3. Tests pass after the fix is applied (see the linked bug issue)

Summary

ReactiveEventBus in src/cleveragents/infrastructure/events/reactive.py creates an RxPY Subject but never calls on_completed() on it. There is no close() or shutdown() method. RxPY subscribers that wait for stream completion (e.g., using to_list(), last(), or buffering operators) will wait forever, and internal RxPY state is never cleaned up.

Test Specification

Write a failing test (tagged @tdd_issue, @tdd_issue_N, @tdd_expected_fail) that verifies ReactiveEventBus provides a close() method that completes the RxPY subject.

Scenario

@tdd_issue @tdd_issue_N @tdd_expected_fail
Scenario: ReactiveEventBus.close() completes the RxPY stream
  Given a ReactiveEventBus instance
  And a subscriber collecting events from bus.stream using to_list()
  When bus.close() is called
  Then the subscriber receives the on_completed signal
  And the collected events list is finalized

@tdd_issue @tdd_issue_N @tdd_expected_fail
Scenario: ReactiveEventBus.close() prevents further emit() calls
  Given a ReactiveEventBus instance
  When bus.close() is called
  And an event is emitted after close
  Then a RuntimeError or similar exception is raised
  Or the event is silently dropped with a warning log

@tdd_issue @tdd_issue_N @tdd_expected_fail
Scenario: ReactiveEventBus implements context manager protocol
  Given a ReactiveEventBus instance used as a context manager
  When the context manager exits
  Then bus.close() is called automatically
  And the RxPY stream is completed

Expected Failure Reason

The tests will fail because ReactiveEventBus has no close() method. Calling bus.close() will raise AttributeError: 'ReactiveEventBus' object has no attribute 'close'.

Fix Path

Add a close() method to ReactiveEventBus:

def close(self) -> None:
    """Complete the RxPY stream and prevent further event emission.
    
    Signals on_completed() to all RxPY subscribers, allowing them to
    finalize their state. After close(), emit() calls will raise
    RuntimeError or be silently dropped.
    """
    self._subject.on_completed()

def __enter__(self) -> "ReactiveEventBus":
    return self

def __exit__(self, *args: object) -> None:
    self.close()

References

  • src/cleveragents/infrastructure/events/reactive.pyReactiveEventBus (no close() method, _subject never completed)

Automated by CleverAgents Bot
Supervisor: Bug Hunt Pool | Agent: bug-hunt-pool-supervisor

## Metadata - **Commit**: `fix(events): add close() method to ReactiveEventBus to complete RxPY subject` - **Branch**: `fix/reactive-event-bus-close-method` ## Background and Context `ReactiveEventBus` in `src/cleveragents/infrastructure/events/reactive.py` creates an RxPY `Subject` but never calls `on_completed()` on it. There is no `close()` or `shutdown()` method. RxPY subscribers that wait for stream completion (e.g., using `to_list()`, `last()`, or buffering operators) will wait forever, and internal RxPY state is never cleaned up. ## Expected Behavior A failing test (tagged `@tdd_issue`, `@tdd_issue_N`, `@tdd_expected_fail`) exists that verifies `ReactiveEventBus` provides a `close()` method that completes the RxPY subject. The test fails before the fix and passes after. ## Acceptance Criteria - [ ] Test is written and tagged with `@tdd_issue`, `@tdd_issue_N`, `@tdd_expected_fail` - [ ] Test fails before the fix is applied (AttributeError on close() call) - [ ] Test passes after the fix is applied - [ ] Context manager test verifies automatic cleanup ## Subtasks - [ ] Write failing test: `ReactiveEventBus.close()` completes the RxPY stream - [ ] Write failing test: `ReactiveEventBus.close()` prevents further `emit()` calls - [ ] Write failing test: `ReactiveEventBus` implements context manager protocol - [ ] Tag all tests with `@tdd_issue`, `@tdd_issue_N`, `@tdd_expected_fail` - [ ] Confirm tests fail with `AttributeError: 'ReactiveEventBus' object has no attribute 'close'` ## Definition of Done This issue is closed when: 1. All three failing tests are written and tagged correctly 2. Tests fail before the fix (confirming the bug exists) 3. Tests pass after the fix is applied (see the linked bug issue) --- ## Summary `ReactiveEventBus` in `src/cleveragents/infrastructure/events/reactive.py` creates an RxPY `Subject` but never calls `on_completed()` on it. There is no `close()` or `shutdown()` method. RxPY subscribers that wait for stream completion (e.g., using `to_list()`, `last()`, or buffering operators) will wait forever, and internal RxPY state is never cleaned up. ## Test Specification Write a failing test (tagged `@tdd_issue`, `@tdd_issue_N`, `@tdd_expected_fail`) that verifies `ReactiveEventBus` provides a `close()` method that completes the RxPY subject. ### Scenario ```gherkin @tdd_issue @tdd_issue_N @tdd_expected_fail Scenario: ReactiveEventBus.close() completes the RxPY stream Given a ReactiveEventBus instance And a subscriber collecting events from bus.stream using to_list() When bus.close() is called Then the subscriber receives the on_completed signal And the collected events list is finalized @tdd_issue @tdd_issue_N @tdd_expected_fail Scenario: ReactiveEventBus.close() prevents further emit() calls Given a ReactiveEventBus instance When bus.close() is called And an event is emitted after close Then a RuntimeError or similar exception is raised Or the event is silently dropped with a warning log @tdd_issue @tdd_issue_N @tdd_expected_fail Scenario: ReactiveEventBus implements context manager protocol Given a ReactiveEventBus instance used as a context manager When the context manager exits Then bus.close() is called automatically And the RxPY stream is completed ``` ### Expected Failure Reason The tests will fail because `ReactiveEventBus` has no `close()` method. Calling `bus.close()` will raise `AttributeError: 'ReactiveEventBus' object has no attribute 'close'`. ### Fix Path Add a `close()` method to `ReactiveEventBus`: ```python def close(self) -> None: """Complete the RxPY stream and prevent further event emission. Signals on_completed() to all RxPY subscribers, allowing them to finalize their state. After close(), emit() calls will raise RuntimeError or be silently dropped. """ self._subject.on_completed() def __enter__(self) -> "ReactiveEventBus": return self def __exit__(self, *args: object) -> None: self.close() ``` ## References - `src/cleveragents/infrastructure/events/reactive.py` — `ReactiveEventBus` (no `close()` method, `_subject` never completed) --- **Automated by CleverAgents Bot** Supervisor: Bug Hunt Pool | Agent: bug-hunt-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#10377
No description provided.