BUG-HUNT: [concurrency] A2aEventQueue.publish() can raise RuntimeError when close() is called concurrently #6382

Open
opened 2026-04-09 20:59:33 +00:00 by HAL9000 · 0 comments
Owner

Bug Report: Concurrency — A2aEventQueue.publish() race condition with close()

Severity Assessment

  • Impact: Unexpected RuntimeError: dictionary changed size during iteration in any code that calls publish() directly while the queue is being closed from another thread. The EventBusBridge suppresses RuntimeError broadly, masking this race; but other callers would see the exception leak.
  • Likelihood: Medium — the EventBusBridge calls publish() from domain-event callbacks (potentially a separate event-bus thread) while the main thread or test teardown might call close().
  • Priority: Medium

Location

  • File: src/cleveragents/a2a/events.py
  • Class: A2aEventQueue
  • Lines: ~79–95 (publish), ~118–127 (close)

Description

A2aEventQueue contains no threading locks. The publish() method iterates over self._subscriptions as a dict view:

for sub_id, callback in self._subscriptions.items():

Meanwhile, close() unconditionally clears the same dict:

self._subscriptions.clear()

If close() runs in one thread while publish() is mid-iteration in another, Python raises:

RuntimeError: dictionary changed size during iteration

Evidence

publish() — iterates subscriptions without a lock (events.py ~lines 79–95):

def publish(self, event: A2aEvent) -> None:
    if self._is_closed:
        raise RuntimeError("Cannot publish to a closed event queue")
    ...
    self._events.append(event)
    for sub_id, callback in self._subscriptions.items():   # ← unsafe iteration
        try:
            callback(event)
        except Exception:
            logger.exception(...)

close() — clears dict while publish() may be iterating (events.py ~lines 118–127):

def close(self) -> None:
    self._is_closed = True
    count = len(self._subscriptions)
    self._subscriptions.clear()   # ← can race with publish() iterator
    self._events.clear()

EventBusBridge._on_domain_event — broadly suppresses RuntimeError (events.py ~lines 238–243):

with contextlib.suppress(RuntimeError):
    self._event_queue.publish(a2a_event)

The contextlib.suppress(RuntimeError) here catches BOTH the intentional "queue closed" error and the unintentional dict-iteration RuntimeError, which masks the race condition from EventBusBridge callers. Direct callers of publish() (outside of EventBusBridge) have no such guard.

Expected Behavior

A2aEventQueue should be safe to use from concurrent threads. Calling close() while publish() is running should either (a) block until publish completes, or (b) be detected cleanly and raise only the "queue is closed" error without causing a dict-iteration crash.

Actual Behavior

If close() is called from a different thread during publish()'s subscription iteration, Python raises RuntimeError: dictionary changed size during iteration. The EventBusBridge silently swallows this via contextlib.suppress(RuntimeError), masking the bug; other direct callers of publish() would see the exception propagate unexpectedly.

Suggested Fix

Add a threading.Lock (or threading.RLock) to A2aEventQueue and acquire it in both publish() and close(). Alternatively, iterate over a snapshot of the subscriptions:

for sub_id, callback in list(self._subscriptions.items()):
    ...

A snapshot approach is the minimal fix; a full lock is more correct for the _is_closed + _events.append + iteration sequence.

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: Concurrency — `A2aEventQueue.publish()` race condition with `close()` ### Severity Assessment - **Impact**: Unexpected `RuntimeError: dictionary changed size during iteration` in any code that calls `publish()` directly while the queue is being closed from another thread. The `EventBusBridge` suppresses `RuntimeError` broadly, masking this race; but other callers would see the exception leak. - **Likelihood**: Medium — the `EventBusBridge` calls `publish()` from domain-event callbacks (potentially a separate event-bus thread) while the main thread or test teardown might call `close()`. - **Priority**: Medium ### Location - **File**: `src/cleveragents/a2a/events.py` - **Class**: `A2aEventQueue` - **Lines**: ~79–95 (`publish`), ~118–127 (`close`) ### Description `A2aEventQueue` contains no threading locks. The `publish()` method iterates over `self._subscriptions` as a dict view: ```python for sub_id, callback in self._subscriptions.items(): ``` Meanwhile, `close()` unconditionally clears the same dict: ```python self._subscriptions.clear() ``` If `close()` runs in one thread while `publish()` is mid-iteration in another, Python raises: ``` RuntimeError: dictionary changed size during iteration ``` ### Evidence **`publish()` — iterates subscriptions without a lock** (events.py ~lines 79–95): ```python def publish(self, event: A2aEvent) -> None: if self._is_closed: raise RuntimeError("Cannot publish to a closed event queue") ... self._events.append(event) for sub_id, callback in self._subscriptions.items(): # ← unsafe iteration try: callback(event) except Exception: logger.exception(...) ``` **`close()` — clears dict while publish() may be iterating** (events.py ~lines 118–127): ```python def close(self) -> None: self._is_closed = True count = len(self._subscriptions) self._subscriptions.clear() # ← can race with publish() iterator self._events.clear() ``` **`EventBusBridge._on_domain_event` — broadly suppresses RuntimeError** (events.py ~lines 238–243): ```python with contextlib.suppress(RuntimeError): self._event_queue.publish(a2a_event) ``` The `contextlib.suppress(RuntimeError)` here catches BOTH the intentional "queue closed" error and the unintentional dict-iteration `RuntimeError`, which masks the race condition from `EventBusBridge` callers. Direct callers of `publish()` (outside of `EventBusBridge`) have no such guard. ### Expected Behavior `A2aEventQueue` should be safe to use from concurrent threads. Calling `close()` while `publish()` is running should either (a) block until publish completes, or (b) be detected cleanly and raise only the "queue is closed" error without causing a dict-iteration crash. ### Actual Behavior If `close()` is called from a different thread during `publish()`'s subscription iteration, Python raises `RuntimeError: dictionary changed size during iteration`. The `EventBusBridge` silently swallows this via `contextlib.suppress(RuntimeError)`, masking the bug; other direct callers of `publish()` would see the exception propagate unexpectedly. ### Suggested Fix Add a `threading.Lock` (or `threading.RLock`) to `A2aEventQueue` and acquire it in both `publish()` and `close()`. Alternatively, iterate over a snapshot of the subscriptions: ```python for sub_id, callback in list(self._subscriptions.items()): ... ``` A snapshot approach is the minimal fix; a full lock is more correct for the `_is_closed` + `_events.append` + iteration sequence. ### Category `concurrency` ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 21:09:09 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6382
No description provided.