fix(events): add unsubscribe() to EventBus protocol and implementations (#10356) #11195

Closed
freemo wants to merge 1 commit from feature/eventbus-unsubscribe into master
Owner

Summary

Adds unsubscribe(event_type, handler) -> bool method to the EventBus protocol and both concrete implementations (ReactiveEventBus, LoggingEventBus) to resolve memory leaks in long-running processes.

Problem

The EventBus protocol only defined emit() and subscribe(), with no way to remove individual handlers. Handler references stored as strong references were retained indefinitely inside the bus internal _subscriptions dict, causing memory leaks especially since:

  • The ReactiveEventBus is registered as a Singleton in DI (lives for app lifetime)
  • AuditEventSubscriber registers 10 security event type subscriptions that are never cleaned up
  • EventBusBridge.stop() was broken — it tried to call .dispose() on the return value of subscribe(), which returned None

Changes

Protocol (protocol.py)

  • Added unsubscribe(event_type: EventType, handler: Optional[Callable]) -> bool method signature
  • Updated docstring to note memory management concerns (references issue #10356)

ReactiveEventBus (reactive.py)

  • Implemented unsubscribe() — removes the specific (event_type, handler) pair from per-event-type lists
  • Returns True if a match was found and removed, False otherwise
  • Auto-cleans empty list entries from _subscriptions dict
  • Added validation: TypeError for invalid event_type or non-callable handler

LoggingEventBus (logging_bus.py)

  • Same unsubscribe() implementation as ReactiveEventBus

AuditEventSubscriber (audit_event_subscriber.py)

  • Added stop() method that calls EventBus.unsubscribe() for all event types originally subscribed to
  • Stored subscribed event types in _subscribed_types frozenset for cleanup
  • Method is idempotent — safe to call multiple times

EventBusBridge (a2a/events.py)

  • Fixed stop() to use the new unsubscribe() mechanism instead of trying .dispose() on None
  • Now iterates over all EventType values and calls unsubscribe(event_type, handler) for each
  • Added _subscription boolean flag to track whether bridge is actually started

Tests (BDD)

  • 10 new scenarios for ReactiveEventBus unsubscribe: removal, False return, selective unsubscription, type validation
  • 5 new scenarios for LoggingEventBus unsubscribe: same coverage
  • Updated protocol conformance checks to verify unsubscribe method exists
  • Added step definitions for I unsubscribe the handler..., unsubcribing should return True/False, and TypeError validations

Documentation (docs/reference/event_bus.md)

  • Updated EventBus Protocol section to include unsubscribe() in the interface table
  • Added memory management guidance section explaining when/how to call unsubscribe()
  • Added section about AuditEventSubscriber.stop() for lifecycle-aware consumers
# Summary Adds `unsubscribe(event_type, handler) -> bool` method to the EventBus protocol and both concrete implementations (ReactiveEventBus, LoggingEventBus) to resolve memory leaks in long-running processes. ## Problem The EventBus protocol only defined emit() and subscribe(), with no way to remove individual handlers. Handler references stored as strong references were retained indefinitely inside the bus internal _subscriptions dict, causing memory leaks especially since: - The ReactiveEventBus is registered as a Singleton in DI (lives for app lifetime) - AuditEventSubscriber registers 10 security event type subscriptions that are never cleaned up - EventBusBridge.stop() was broken — it tried to call .dispose() on the return value of subscribe(), which returned None ## Changes ### Protocol (protocol.py) - Added unsubscribe(event_type: EventType, handler: Optional[Callable]) -> bool method signature - Updated docstring to note memory management concerns (references issue #10356) ### ReactiveEventBus (reactive.py) - Implemented unsubscribe() — removes the specific (event_type, handler) pair from per-event-type lists - Returns True if a match was found and removed, False otherwise - Auto-cleans empty list entries from _subscriptions dict - Added validation: TypeError for invalid event_type or non-callable handler ### LoggingEventBus (logging_bus.py) - Same unsubscribe() implementation as ReactiveEventBus ### AuditEventSubscriber (audit_event_subscriber.py) - Added stop() method that calls EventBus.unsubscribe() for all event types originally subscribed to - Stored subscribed event types in _subscribed_types frozenset for cleanup - Method is idempotent — safe to call multiple times ### EventBusBridge (a2a/events.py) - Fixed stop() to use the new unsubscribe() mechanism instead of trying .dispose() on None - Now iterates over all EventType values and calls unsubscribe(event_type, handler) for each - Added _subscription boolean flag to track whether bridge is actually started ### Tests (BDD) - 10 new scenarios for ReactiveEventBus unsubscribe: removal, False return, selective unsubscription, type validation - 5 new scenarios for LoggingEventBus unsubscribe: same coverage - Updated protocol conformance checks to verify unsubscribe method exists - Added step definitions for I unsubscribe the handler..., unsubcribing should return True/False, and TypeError validations ### Documentation (docs/reference/event_bus.md) - Updated EventBus Protocol section to include unsubscribe() in the interface table - Added memory management guidance section explaining when/how to call unsubscribe() - Added section about AuditEventSubscriber.stop() for lifecycle-aware consumers
fix(events): add unsubscribe() to EventBus protocol and implementations (#10356)
Some checks failed
CI / lint (pull_request) Failing after 1m15s
CI / typecheck (pull_request) Successful in 1m37s
CI / security (pull_request) Successful in 1m36s
CI / push-validation (pull_request) Successful in 39s
CI / helm (pull_request) Successful in 42s
CI / build (pull_request) Successful in 51s
CI / quality (pull_request) Successful in 1m15s
CI / integration_tests (pull_request) Successful in 5m19s
CI / unit_tests (pull_request) Failing after 7m14s
CI / docker (pull_request) Has been skipped
CI / coverage (pull_request) Has been skipped
CI / status-check (pull_request) Failing after 3s
4dc3300577
Added unsubscribe(event_type, handler) -> bool method to the EventBus protocol for both ReactiveEventBus and LoggingEventBus implementations. This resolves memory leaks caused by handler references persisting indefinitely in long-running processes.
HAL9001 left a comment

Review Summary

The core idea here is solid — adding unsubscribe() to the EventBus protocol and both implementations is the right fix for the memory leak described in #10356. The ReactiveEventBus and LoggingEventBus implementations are clean and correct, the docstrings are thorough, and the documentation update is a welcome addition.

However, there are 4 blocking issues that need to be fixed before this can be approved, plus several CI failures that confirm two of them.

CI Failures

  • CI / lint — FAILING: Likely caused by the # type: ignore suppressions added in features/steps/event_bus_steps.py (project policy: zero tolerance, all # type: ignore are prohibited) or formatting issues.
  • CI / unit_tests — FAILING: Almost certainly caused by the broken step_subscribe_two step — ctx.subscribed_handlers is always empty, so the "selective unsubscribe" scenario silently skips the actual unsubscribe call and likely fails on assertion.
  • CI / coverage — SKIPPED: Depends on unit_tests passing.

Blocking Issues Found

  1. BLOCKER — EventBusBridge.start() calls subscribe() with wrong arity (src/cleveragents/a2a/events.py): The call self._event_bus.subscribe(self._on_domain_event) passes only the handler, but EventBus.subscribe(event_type, handler) requires two arguments. This will raise TypeError at runtime whenever EventBusBridge.start() is called. While this bug existed before this PR, this PR explicitly claims to fix EventBusBridge.stop() — yet start() remains broken. The PR description says "Fixed stop() to use the new unsubscribe() mechanism" but does not acknowledge that start() also needs fixing.

  2. BLOCKER — step_subscribe_two builds empty ctx.subscribed_handlers (features/steps/event_bus_steps.py): The step initialises ctx.subscribed_handlers = [] but never populates it. The two collectors are placed in ctx.collectors but are not also appended to ctx.subscribed_handlers. Consequently the selective-unsubscribe step can never reach the branch that removes the first handler, causing the "other handlers preserved" scenario to pass vacuously or fail.

  3. BLOCKER — New # type: ignore suppressions added (features/steps/event_bus_steps.py): Several new # type: ignore[arg-type] comments were added in the unsubscribe test step definitions. Project policy is zero tolerance for # type: ignore — this is a hard rejection criterion.

  4. BLOCKER — Commit footer missing ISSUES CLOSED: #10356: The commit body does not contain the required ISSUES CLOSED: #N footer. Per CONTRIBUTING.md every commit footer must reference the closed issue. The PR body contains the closing keyword but the commit itself does not.

Non-blocking Observations

  • CHANGELOG.md was not updated. CONTRIBUTING.md requires a changelog entry per commit.
  • PR has no milestone assigned and no Type/ label — both are required per CONTRIBUTING.md checklist items 11 and 12.
  • The Gherkin step name unsubcribing (missing second s) is a typo — should be unsubscribing. Not blocking, but worth fixing.
  • The AuditEventSubscriber.stop() implementation is well-structured and idempotent — good work.
  • The test coverage for ReactiveEventBus.unsubscribe() and LoggingEventBus.unsubscribe() is thorough in concept; once the bugs are fixed, the scenarios will provide solid regression protection.
## Review Summary The core idea here is solid — adding `unsubscribe()` to the `EventBus` protocol and both implementations is the right fix for the memory leak described in #10356. The `ReactiveEventBus` and `LoggingEventBus` implementations are clean and correct, the docstrings are thorough, and the documentation update is a welcome addition. However, there are **4 blocking issues** that need to be fixed before this can be approved, plus several CI failures that confirm two of them. ### CI Failures - **`CI / lint` — FAILING**: Likely caused by the `# type: ignore` suppressions added in `features/steps/event_bus_steps.py` (project policy: zero tolerance, all `# type: ignore` are prohibited) or formatting issues. - **`CI / unit_tests` — FAILING**: Almost certainly caused by the broken `step_subscribe_two` step — `ctx.subscribed_handlers` is always empty, so the "selective unsubscribe" scenario silently skips the actual unsubscribe call and likely fails on assertion. - **`CI / coverage` — SKIPPED**: Depends on `unit_tests` passing. ### Blocking Issues Found 1. **BLOCKER — `EventBusBridge.start()` calls `subscribe()` with wrong arity** (`src/cleveragents/a2a/events.py`): The call `self._event_bus.subscribe(self._on_domain_event)` passes only the handler, but `EventBus.subscribe(event_type, handler)` requires two arguments. This will raise `TypeError` at runtime whenever `EventBusBridge.start()` is called. While this bug existed before this PR, this PR explicitly claims to fix `EventBusBridge.stop()` — yet `start()` remains broken. The PR description says "Fixed stop() to use the new unsubscribe() mechanism" but does not acknowledge that `start()` also needs fixing. 2. **BLOCKER — `step_subscribe_two` builds empty `ctx.subscribed_handlers`** (`features/steps/event_bus_steps.py`): The step initialises `ctx.subscribed_handlers = []` but never populates it. The two collectors are placed in `ctx.collectors` but are not also appended to `ctx.subscribed_handlers`. Consequently the selective-unsubscribe step can never reach the branch that removes the first handler, causing the "other handlers preserved" scenario to pass vacuously or fail. 3. **BLOCKER — New `# type: ignore` suppressions added** (`features/steps/event_bus_steps.py`): Several new `# type: ignore[arg-type]` comments were added in the unsubscribe test step definitions. Project policy is **zero tolerance** for `# type: ignore` — this is a hard rejection criterion. 4. **BLOCKER — Commit footer missing `ISSUES CLOSED: #10356`**: The commit body does not contain the required `ISSUES CLOSED: #N` footer. Per CONTRIBUTING.md every commit footer must reference the closed issue. The PR body contains the closing keyword but the commit itself does not. ### Non-blocking Observations - CHANGELOG.md was not updated. CONTRIBUTING.md requires a changelog entry per commit. - PR has no milestone assigned and no `Type/` label — both are required per CONTRIBUTING.md checklist items 11 and 12. - The Gherkin step name `unsubcribing` (missing second `s`) is a typo — should be `unsubscribing`. Not blocking, but worth fixing. - The `AuditEventSubscriber.stop()` implementation is well-structured and idempotent — good work. - The test coverage for `ReactiveEventBus.unsubscribe()` and `LoggingEventBus.unsubscribe()` is thorough in concept; once the bugs are fixed, the scenarios will provide solid regression protection.
Owner

BLOCKER — ctx.subscribed_handlers is always empty

ctx.subscribed_handlers is initialised as [] here but is never populated. The two collectors created in the loop are appended to ctx.collectors, but none are appended to ctx.subscribed_handlers. This means step_unsubscribe_handler can never satisfy len(ctx.subscribed_handlers) >= 2, so the selective unsubscribe branch is never executed and the scenario silently skips the actual unsubscribe call.

How to fix:

@when('I subscribe two handlers to "{et}" events')
def step_subscribe_two(ctx: Context, et: str) -> None:
    ctx.subscribed_handlers: list[_EventCollector] = []
    for _ in range(2):
        collector = _EventCollector()
        ctx.collectors.append(collector)
        ctx.subscribed_handlers.append(collector)  # <- missing
        ctx.bus.subscribe(EventType(et), collector)
**BLOCKER — `ctx.subscribed_handlers` is always empty** `ctx.subscribed_handlers` is initialised as `[]` here but is never populated. The two collectors created in the loop are appended to `ctx.collectors`, but none are appended to `ctx.subscribed_handlers`. This means `step_unsubscribe_handler` can never satisfy `len(ctx.subscribed_handlers) >= 2`, so the selective unsubscribe branch is never executed and the scenario silently skips the actual unsubscribe call. **How to fix:** ```python @when('I subscribe two handlers to "{et}" events') def step_subscribe_two(ctx: Context, et: str) -> None: ctx.subscribed_handlers: list[_EventCollector] = [] for _ in range(2): collector = _EventCollector() ctx.collectors.append(collector) ctx.subscribed_handlers.append(collector) # <- missing ctx.bus.subscribe(EventType(et), collector) ```
Owner

BLOCKER — # type: ignore suppressions are prohibited

This project has a zero-tolerance policy for # type: ignore — this is listed as a hard rejection criterion in the review checklist and in CONTRIBUTING.md. Several new # type: ignore[arg-type] comments have been added in the unsubscribe test step definitions (lines ~313, ~325, ~335, ~348 in the file).

The suppressions exist because the test steps intentionally pass wrong types to verify TypeError is raised. The correct approach is to use explicit cast() with the wrong type, or to restructure the test so the type system understands that intentionally-wrong arguments are being passed without needing # type: ignore.

For example:

# Instead of:
ctx.bus.unsubscribe(
    "plan.created",  # type: ignore[arg-type]
    _EventCollector(),  # type: ignore[arg-type]
)

# Use a typed variable to avoid the suppression:
bad_event_type: Any = "plan.created"
bad_handler: Any = _EventCollector()
ctx.bus.unsubscribe(bad_event_type, bad_handler)

All # type: ignore additions in this diff must be removed before this PR can be approved.

**BLOCKER — `# type: ignore` suppressions are prohibited** This project has a **zero-tolerance policy** for `# type: ignore` — this is listed as a hard rejection criterion in the review checklist and in CONTRIBUTING.md. Several new `# type: ignore[arg-type]` comments have been added in the unsubscribe test step definitions (lines ~313, ~325, ~335, ~348 in the file). The suppressions exist because the test steps intentionally pass wrong types to verify `TypeError` is raised. The correct approach is to use explicit `cast()` with the wrong type, or to restructure the test so the type system understands that intentionally-wrong arguments are being passed without needing `# type: ignore`. For example: ```python # Instead of: ctx.bus.unsubscribe( "plan.created", # type: ignore[arg-type] _EventCollector(), # type: ignore[arg-type] ) # Use a typed variable to avoid the suppression: bad_event_type: Any = "plan.created" bad_handler: Any = _EventCollector() ctx.bus.unsubscribe(bad_event_type, bad_handler) ``` All `# type: ignore` additions in this diff must be removed before this PR can be approved.
Owner

BLOCKER — Wrong arity in subscribe() call

This calls subscribe() with only one argument (the handler), but EventBus.subscribe(event_type, handler) requires two arguments. At runtime, this will raise TypeError: subscribe() missing 1 required positional argument: 'handler' whenever EventBusBridge.start() is called.

This was pre-existing broken code, but this PR explicitly touches start() and stop() — it should fix this too. The PR description says stop() was "broken" because it tried to call .dispose() on None, but the same issue with start() receiving the wrong number of arguments is equally broken.

How to fix: The bridge needs to subscribe to every EventType (as stop() already iterates), or alternatively the EventBus protocol should be extended to support a wildcard/all-events subscription. The simplest fix consistent with the existing stop() implementation:

def start(self) -> None:
    """Subscribe to the event bus and begin forwarding."""
    if hasattr(self._event_bus, "subscribe"):
        from cleveragents.infrastructure.events.types import EventType
        for event_type in EventType:
            self._event_bus.subscribe(event_type, self._on_domain_event)
        self._subscription = True
        logger.info("a2a.event_bridge.started")
**BLOCKER — Wrong arity in `subscribe()` call** This calls `subscribe()` with only one argument (the handler), but `EventBus.subscribe(event_type, handler)` requires two arguments. At runtime, this will raise `TypeError: subscribe() missing 1 required positional argument: 'handler'` whenever `EventBusBridge.start()` is called. This was pre-existing broken code, but this PR explicitly touches `start()` and `stop()` — it should fix this too. The PR description says `stop()` was "broken" because it tried to call `.dispose()` on `None`, but the same issue with `start()` receiving the wrong number of arguments is equally broken. **How to fix:** The bridge needs to subscribe to every `EventType` (as `stop()` already iterates), or alternatively the `EventBus` protocol should be extended to support a wildcard/all-events subscription. The simplest fix consistent with the existing `stop()` implementation: ```python def start(self) -> None: """Subscribe to the event bus and begin forwarding.""" if hasattr(self._event_bus, "subscribe"): from cleveragents.infrastructure.events.types import EventType for event_type in EventType: self._event_bus.subscribe(event_type, self._on_domain_event) self._subscription = True logger.info("a2a.event_bridge.started") ```
Owner

Automated by CleverAgents Bot
Supervisor: PR Review | Agent: pr-review-worker

--- Automated by CleverAgents Bot Supervisor: PR Review | Agent: pr-review-worker
Member

close since dup with !11197

close since dup with !11197
hurui200320 closed this pull request 2026-05-15 09:08:25 +00:00
Some checks failed
CI / lint (pull_request) Failing after 1m15s
Required
Details
CI / typecheck (pull_request) Successful in 1m37s
Required
Details
CI / security (pull_request) Successful in 1m36s
Required
Details
CI / push-validation (pull_request) Successful in 39s
CI / helm (pull_request) Successful in 42s
CI / build (pull_request) Successful in 51s
Required
Details
CI / quality (pull_request) Successful in 1m15s
Required
Details
CI / integration_tests (pull_request) Successful in 5m19s
Required
Details
CI / unit_tests (pull_request) Failing after 7m14s
Required
Details
CI / docker (pull_request) Has been skipped
Required
Details
CI / coverage (pull_request) Has been skipped
Required
Details
CI / status-check (pull_request) Failing after 3s

Pull request closed

Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
3 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core!11195
No description provided.