UAT: EventBusBridge.start() fails with TypeError when used with ReactiveEventBus — incompatible subscribe() signature #3940

Open
opened 2026-04-06 07:39:11 +00:00 by freemo · 0 comments
Owner

Summary

EventBusBridge.start() calls self._event_bus.subscribe(self._on_domain_event) with a single argument, but ReactiveEventBus.subscribe() requires two arguments: event_type and handler. This causes a TypeError at runtime and means the bridge never subscribes to the reactive event bus, making SSE streaming completely non-functional when used with ReactiveEventBus.

What Was Tested

  • File: src/cleveragents/a2a/events.pyEventBusBridge.start() method
  • File: src/cleveragents/infrastructure/events/reactive.pyReactiveEventBus.subscribe() method

Expected Behavior (from spec)

Per docs/specification.md §Streaming Architecture and the A2A SSE feature:

The EventBusBridge should subscribe to the internal EventBus and forward translated A2aEvent instances to the A2aEventQueue. When a PLAN_CREATED domain event is emitted on the ReactiveEventBus, the bridge should translate it to a TaskStatusUpdateEvent and publish it to the queue for SSE streaming.

Actual Behavior

from cleveragents.infrastructure.events.reactive import ReactiveEventBus
from cleveragents.a2a.events import EventBusBridge, A2aEventQueue

bus = ReactiveEventBus()
queue = A2aEventQueue()
bridge = EventBusBridge(bus, queue)
bridge.start()
# Raises: TypeError: ReactiveEventBus.subscribe() missing 1 required positional argument: 'handler'

Root Cause

In src/cleveragents/a2a/events.py, EventBusBridge.start():

def start(self) -> None:
    """Subscribe to the event bus and begin forwarding."""
    if hasattr(self._event_bus, "subscribe"):
        self._subscription = self._event_bus.subscribe(self._on_domain_event)  # BUG: missing event_type
        logger.info("a2a.event_bridge.started")

But ReactiveEventBus.subscribe() requires:

def subscribe(self, event_type: EventType, handler: Callable[[DomainEvent], None]) -> None:

The bridge was apparently designed for a different EventBus interface (one that takes a single callback for all events), but the actual ReactiveEventBus requires per-type subscriptions.

Impact

  • SSE streaming is completely broken when EventBusBridge is used with ReactiveEventBus
  • No TaskStatusUpdateEvent or TaskArtifactUpdateEvent events are ever published to the A2aEventQueue
  • Real-time plan status updates via SSE are non-functional
  • The a2a_sse_streaming.feature scenarios that test bridge behavior pass only because they use a mock event bus with a single-argument subscribe() method

Steps to Reproduce

from cleveragents.infrastructure.events.reactive import ReactiveEventBus
from cleveragents.a2a.events import EventBusBridge, A2aEventQueue

bus = ReactiveEventBus()
queue = A2aEventQueue()
bridge = EventBusBridge(bus, queue)
bridge.start()  # TypeError: ReactiveEventBus.subscribe() missing 1 required positional argument: 'handler'

Fix Suggestion

The bridge needs to subscribe to all relevant event types individually, or the EventBus protocol needs a wildcard/all-events subscription method. One approach:

def start(self) -> None:
    """Subscribe to the event bus and begin forwarding."""
    if hasattr(self._event_bus, "subscribe"):
        # Subscribe to all status event types
        from cleveragents.infrastructure.events.types import EventType
        for event_type_name in self._STATUS_EVENT_TYPES | self._ARTIFACT_EVENT_TYPES:
            event_type = EventType(event_type_name)  # Convert name to enum
            self._event_bus.subscribe(event_type, self._on_domain_event)
        logger.info("a2a.event_bridge.started")

Note: This fix also requires fixing Bug #2 (enum names vs values in _STATUS_EVENT_TYPES).

  • Parent Epic: #397 (Server & Autonomy Infrastructure)
  • Related: EventBusBridge._STATUS_EVENT_TYPES uses enum names instead of values (companion bug)

Automated by CleverAgents Bot
Supervisor: UAT Testing | Agent: ca-uat-tester

## Summary `EventBusBridge.start()` calls `self._event_bus.subscribe(self._on_domain_event)` with a **single argument**, but `ReactiveEventBus.subscribe()` requires **two arguments**: `event_type` and `handler`. This causes a `TypeError` at runtime and means the bridge **never subscribes** to the reactive event bus, making SSE streaming completely non-functional when used with `ReactiveEventBus`. ## What Was Tested - File: `src/cleveragents/a2a/events.py` — `EventBusBridge.start()` method - File: `src/cleveragents/infrastructure/events/reactive.py` — `ReactiveEventBus.subscribe()` method ## Expected Behavior (from spec) Per `docs/specification.md` §Streaming Architecture and the A2A SSE feature: The `EventBusBridge` should subscribe to the internal `EventBus` and forward translated `A2aEvent` instances to the `A2aEventQueue`. When a `PLAN_CREATED` domain event is emitted on the `ReactiveEventBus`, the bridge should translate it to a `TaskStatusUpdateEvent` and publish it to the queue for SSE streaming. ## Actual Behavior ```python from cleveragents.infrastructure.events.reactive import ReactiveEventBus from cleveragents.a2a.events import EventBusBridge, A2aEventQueue bus = ReactiveEventBus() queue = A2aEventQueue() bridge = EventBusBridge(bus, queue) bridge.start() # Raises: TypeError: ReactiveEventBus.subscribe() missing 1 required positional argument: 'handler' ``` ## Root Cause In `src/cleveragents/a2a/events.py`, `EventBusBridge.start()`: ```python def start(self) -> None: """Subscribe to the event bus and begin forwarding.""" if hasattr(self._event_bus, "subscribe"): self._subscription = self._event_bus.subscribe(self._on_domain_event) # BUG: missing event_type logger.info("a2a.event_bridge.started") ``` But `ReactiveEventBus.subscribe()` requires: ```python def subscribe(self, event_type: EventType, handler: Callable[[DomainEvent], None]) -> None: ``` The bridge was apparently designed for a different `EventBus` interface (one that takes a single callback for all events), but the actual `ReactiveEventBus` requires per-type subscriptions. ## Impact - SSE streaming is completely broken when `EventBusBridge` is used with `ReactiveEventBus` - No `TaskStatusUpdateEvent` or `TaskArtifactUpdateEvent` events are ever published to the `A2aEventQueue` - Real-time plan status updates via SSE are non-functional - The `a2a_sse_streaming.feature` scenarios that test bridge behavior pass only because they use a mock event bus with a single-argument `subscribe()` method ## Steps to Reproduce ```python from cleveragents.infrastructure.events.reactive import ReactiveEventBus from cleveragents.a2a.events import EventBusBridge, A2aEventQueue bus = ReactiveEventBus() queue = A2aEventQueue() bridge = EventBusBridge(bus, queue) bridge.start() # TypeError: ReactiveEventBus.subscribe() missing 1 required positional argument: 'handler' ``` ## Fix Suggestion The bridge needs to subscribe to **all relevant event types** individually, or the `EventBus` protocol needs a wildcard/all-events subscription method. One approach: ```python def start(self) -> None: """Subscribe to the event bus and begin forwarding.""" if hasattr(self._event_bus, "subscribe"): # Subscribe to all status event types from cleveragents.infrastructure.events.types import EventType for event_type_name in self._STATUS_EVENT_TYPES | self._ARTIFACT_EVENT_TYPES: event_type = EventType(event_type_name) # Convert name to enum self._event_bus.subscribe(event_type, self._on_domain_event) logger.info("a2a.event_bridge.started") ``` Note: This fix also requires fixing Bug #2 (enum names vs values in `_STATUS_EVENT_TYPES`). ## Related Issues - Parent Epic: #397 (Server & Autonomy Infrastructure) - Related: EventBusBridge._STATUS_EVENT_TYPES uses enum names instead of values (companion bug) --- **Automated by CleverAgents Bot** Supervisor: UAT Testing | Agent: ca-uat-tester
HAL9000 added this to the v3.5.0 milestone 2026-04-09 15:28:18 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#3940
No description provided.