BUG-HUNT: [concurrency] ReactiveEventBus documented as single-threaded but used as DI Singleton — no thread safety for subscriptions dict #7409

Open
opened 2026-04-10 19:05:43 +00:00 by HAL9000 · 0 comments
Owner

Bug Report: Concurrency — ReactiveEventBus Thread Safety Gap

Severity Assessment

  • Impact: Concurrent subscribe() and emit() calls (from multiple threads) can cause data corruption of _subscriptions dict, leading to missing event deliveries, RuntimeError: dictionary changed size during iteration, or duplicate handler invocations
  • Likelihood: Medium — occurs whenever two threads concurrently call emit() and subscribe() to the same ReactiveEventBus singleton
  • Priority: High

Location

  • File: src/cleveragents/infrastructure/events/reactive.py
  • Function: ReactiveEventBus.emit(), ReactiveEventBus.subscribe()
  • Lines: 72–121

Description

The ReactiveEventBus docstring explicitly states:

Thread safety: This implementation is designed for single-threaded use within the main application event loop. The DI container registers it as a Singleton. If concurrent access from multiple threads is required, callers must provide external synchronization.

However, since the DI container registers it as a singleton and multiple services (SessionService, CheckpointService, CostBudgetService, etc.) all hold references and emit events, concurrent access is inevitable in practice when the AsyncWorker is running background jobs that trigger events.

The critical path in emit() does:

for handler in list(self._subscriptions.get(event.event_type, ())):

Using list() to copy handlers is partially safe, BUT self._subscriptions itself can be mutated by subscribe() on another thread between the .get() and list() calls.

Evidence

# src/cleveragents/infrastructure/events/reactive.py

def emit(self, event: DomainEvent) -> None:
    # ...
    for handler in list(self._subscriptions.get(event.event_type, ())):  # line ~109
        # ↑ Between .get() and list(), another thread may call subscribe()
        # modifying _subscriptions dict
        try:
            handler(event)
        except Exception as exc:
            ...

def subscribe(self, event_type: EventType, handler: ...) -> None:
    # ...
    self._subscriptions.setdefault(event_type, []).append(handler)  # line ~121
    # ↑ Mutates _subscriptions without any lock

The AsyncWorker service uses ThreadPoolExecutor and jobs run on separate threads. When a job completes and the session service emits SESSION_MESSAGE_SENT, that emit happens on a worker thread while the main thread may be calling subscribe().

Expected Behavior

Either the documentation should be enforced (singleton used only from a single thread via an event loop), or the _subscriptions dict should be protected with a threading.RLock().

Actual Behavior

Concurrent subscribe() + emit() calls can cause RuntimeError: dictionary changed size during iteration or miss newly subscribed handlers.

Suggested Fix

Add threading.RLock() protection:

def __init__(self, ...):
    self._lock = threading.RLock()

def subscribe(self, event_type, handler):
    with self._lock:
        self._subscriptions.setdefault(event_type, []).append(handler)

def emit(self, event):
    with self._lock:
        handlers = list(self._subscriptions.get(event.event_type, ()))
    for handler in handlers:  # call outside lock to avoid deadlocks
        ...

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD with tags: @tdd_issue, @tdd_issue_, @tdd_expected_fail.


Automated by CleverAgents Bot
Supervisor: Bug Detection Pool | Agent: bug-hunt-pool-supervisor

## Bug Report: Concurrency — ReactiveEventBus Thread Safety Gap ### Severity Assessment - **Impact**: Concurrent `subscribe()` and `emit()` calls (from multiple threads) can cause data corruption of `_subscriptions` dict, leading to missing event deliveries, `RuntimeError: dictionary changed size during iteration`, or duplicate handler invocations - **Likelihood**: Medium — occurs whenever two threads concurrently call `emit()` and `subscribe()` to the same `ReactiveEventBus` singleton - **Priority**: High ### Location - **File**: `src/cleveragents/infrastructure/events/reactive.py` - **Function**: `ReactiveEventBus.emit()`, `ReactiveEventBus.subscribe()` - **Lines**: 72–121 ### Description The `ReactiveEventBus` docstring explicitly states: > **Thread safety**: This implementation is designed for **single-threaded use** within the main application event loop. The DI container registers it as a **Singleton**. If concurrent access from multiple threads is required, callers must provide external synchronization. However, since the DI container registers it as a singleton and multiple services (SessionService, CheckpointService, CostBudgetService, etc.) all hold references and emit events, concurrent access is inevitable in practice when the `AsyncWorker` is running background jobs that trigger events. The critical path in `emit()` does: ```python for handler in list(self._subscriptions.get(event.event_type, ())): ``` Using `list()` to copy handlers is partially safe, BUT `self._subscriptions` itself can be mutated by `subscribe()` on another thread between the `.get()` and `list()` calls. ### Evidence ```python # src/cleveragents/infrastructure/events/reactive.py def emit(self, event: DomainEvent) -> None: # ... for handler in list(self._subscriptions.get(event.event_type, ())): # line ~109 # ↑ Between .get() and list(), another thread may call subscribe() # modifying _subscriptions dict try: handler(event) except Exception as exc: ... def subscribe(self, event_type: EventType, handler: ...) -> None: # ... self._subscriptions.setdefault(event_type, []).append(handler) # line ~121 # ↑ Mutates _subscriptions without any lock ``` The `AsyncWorker` service uses `ThreadPoolExecutor` and jobs run on separate threads. When a job completes and the session service emits `SESSION_MESSAGE_SENT`, that emit happens on a worker thread while the main thread may be calling `subscribe()`. ### Expected Behavior Either the documentation should be enforced (singleton used only from a single thread via an event loop), or the `_subscriptions` dict should be protected with a `threading.RLock()`. ### Actual Behavior Concurrent `subscribe()` + `emit()` calls can cause `RuntimeError: dictionary changed size during iteration` or miss newly subscribed handlers. ### Suggested Fix Add `threading.RLock()` protection: ```python def __init__(self, ...): self._lock = threading.RLock() def subscribe(self, event_type, handler): with self._lock: self._subscriptions.setdefault(event_type, []).append(handler) def emit(self, event): with self._lock: handlers = list(self._subscriptions.get(event.event_type, ())) for handler in handlers: # call outside lock to avoid deadlocks ... ``` ### Category concurrency ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD with tags: @tdd_issue, @tdd_issue_<this-issue-number>, @tdd_expected_fail. --- **Automated by CleverAgents Bot** Supervisor: Bug Detection Pool | Agent: bug-hunt-pool-supervisor
HAL9000 added this to the v3.5.0 milestone 2026-04-10 19:35:38 +00:00
HAL9000 self-assigned this 2026-04-11 03:21:08 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#7409
No description provided.