BUG-HUNT: [concurrency] A2aEventQueue.publish() crashes with RuntimeError when callback modifies subscriptions dict during iteration #7725

Open
opened 2026-04-12 03:21:11 +00:00 by HAL9000 · 3 comments
Owner

Bug Report: [Concurrency] — Dict Mutation During Callback Iteration in publish()

Severity Assessment

  • Impact: publish() raises RuntimeError: dictionary changed size during iteration and silently drops all subsequent subscriber notifications for that event
  • Likelihood: Medium — any subscriber that calls unsubscribe() inside its own callback triggers this immediately; also triggered by concurrent calls to subscribe_local() or unsubscribe() from another thread
  • Priority: High

Location

  • File: src/cleveragents/a2a/events.py
  • Function/Class: A2aEventQueue.publish()
  • Lines: ~73–83

Description

publish() iterates over self._subscriptions.items() while dispatching callbacks. If any callback (or a concurrent thread) calls unsubscribe() — which does self._subscriptions.pop(subscription_id, None) — Python raises RuntimeError: dictionary changed size during iteration, aborting the notification loop and leaving remaining subscribers without the event. There is no threading lock protecting _subscriptions or _events.

Evidence

# events.py — publish() iterates live dict
def publish(self, event: A2aEvent) -> None:
    if self._is_closed:
        raise RuntimeError("Cannot publish to a closed event queue")
    self._events.append(event)
    for sub_id, callback in self._subscriptions.items():  # <-- live dict
        try:
            callback(event)  # callback may call unsubscribe() → dict.pop() → RuntimeError
        except Exception:
            logger.exception(...)

# unsubscribe() modifies the same dict
def unsubscribe(self, subscription_id: str) -> bool:
    removed = self._subscriptions.pop(subscription_id, None) is not None
    ...

Expected Behavior

A subscriber should be able to unsubscribe itself (or a different subscription) from within a callback without crashing the publish loop. Remaining subscribers should still receive the event.

Actual Behavior

RuntimeError: dictionary changed size during iteration is raised inside publish(), which propagates upward (it is not caught by the inner except Exception guard since that is inside the loop body). All subsequent subscriber callbacks are skipped.

Suggested Fix

Iterate over a snapshot of subscriptions, and optionally add a threading lock:

import threading

class A2aEventQueue:
    def __init__(self) -> None:
        self._events: list[A2aEvent] = []
        self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {}
        self._is_closed: bool = False
        self._lock = threading.Lock()

    def publish(self, event: A2aEvent) -> None:
        with self._lock:
            if self._is_closed:
                raise RuntimeError("Cannot publish to a closed event queue")
            self._events.append(event)
            # Snapshot to allow mutation during iteration
            callbacks = list(self._subscriptions.items())
        for sub_id, callback in callbacks:
            try:
                callback(event)
            except Exception:
                logger.exception("a2a.event.callback_error", subscription_id=sub_id)

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [Concurrency] — Dict Mutation During Callback Iteration in publish() ### Severity Assessment - **Impact**: `publish()` raises `RuntimeError: dictionary changed size during iteration` and silently drops all subsequent subscriber notifications for that event - **Likelihood**: Medium — any subscriber that calls `unsubscribe()` inside its own callback triggers this immediately; also triggered by concurrent calls to `subscribe_local()` or `unsubscribe()` from another thread - **Priority**: High ### Location - **File**: `src/cleveragents/a2a/events.py` - **Function/Class**: `A2aEventQueue.publish()` - **Lines**: ~73–83 ### Description `publish()` iterates over `self._subscriptions.items()` while dispatching callbacks. If any callback (or a concurrent thread) calls `unsubscribe()` — which does `self._subscriptions.pop(subscription_id, None)` — Python raises `RuntimeError: dictionary changed size during iteration`, aborting the notification loop and leaving remaining subscribers without the event. There is no threading lock protecting `_subscriptions` or `_events`. ### Evidence ```python # events.py — publish() iterates live dict def publish(self, event: A2aEvent) -> None: if self._is_closed: raise RuntimeError("Cannot publish to a closed event queue") self._events.append(event) for sub_id, callback in self._subscriptions.items(): # <-- live dict try: callback(event) # callback may call unsubscribe() → dict.pop() → RuntimeError except Exception: logger.exception(...) # unsubscribe() modifies the same dict def unsubscribe(self, subscription_id: str) -> bool: removed = self._subscriptions.pop(subscription_id, None) is not None ... ``` ### Expected Behavior A subscriber should be able to unsubscribe itself (or a different subscription) from within a callback without crashing the publish loop. Remaining subscribers should still receive the event. ### Actual Behavior `RuntimeError: dictionary changed size during iteration` is raised inside `publish()`, which propagates upward (it is not caught by the inner `except Exception` guard since that is inside the loop body). All subsequent subscriber callbacks are skipped. ### Suggested Fix Iterate over a snapshot of subscriptions, and optionally add a threading lock: ```python import threading class A2aEventQueue: def __init__(self) -> None: self._events: list[A2aEvent] = [] self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {} self._is_closed: bool = False self._lock = threading.Lock() def publish(self, event: A2aEvent) -> None: with self._lock: if self._is_closed: raise RuntimeError("Cannot publish to a closed event queue") self._events.append(event) # Snapshot to allow mutation during iteration callbacks = list(self._subscriptions.items()) for sub_id, callback in callbacks: try: callback(event) except Exception: logger.exception("a2a.event.callback_error", subscription_id=sub_id) ``` ### Category concurrency ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-12 03:42:25 +00:00
Author
Owner

Verified — Concurrency bug: A2aEventQueue.publish() crashes when callback modifies subscriptions during iteration. MoSCoW: Must-have. Priority: High — runtime crash in event system.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: A2aEventQueue.publish() crashes when callback modifies subscriptions during iteration. MoSCoW: Must-have. Priority: High — runtime crash in event system. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Author
Owner

Verified — Concurrency bug: A2aEventQueue.publish() crashes when callback modifies subscriptions during iteration. MoSCoW: Must-have. Priority: High — runtime crash in event system.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: A2aEventQueue.publish() crashes when callback modifies subscriptions during iteration. MoSCoW: Must-have. Priority: High — runtime crash in event system. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Author
Owner

Verified — Concurrency bug: A2aEventQueue.publish() crashes when callback modifies subscriptions during iteration. MoSCoW: Must-have. Priority: High — runtime crash in event system.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: A2aEventQueue.publish() crashes when callback modifies subscriptions during iteration. MoSCoW: Must-have. Priority: High — runtime crash in event system. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#7725
No description provided.