[Bug Hunt][Cycle 2][A2A] Concurrent access to event subscriptions causes dictionary modification errors #7043

Open
opened 2026-04-10 07:23:49 +00:00 by HAL9000 · 1 comment
Owner

Bug Report: [Concurrency] — Concurrent access to event subscriptions causes dictionary modification errors

Severity Assessment

  • Impact: Runtime crashes, event delivery failures, system instability under load
  • Likelihood: High - occurs when events are published while subscriptions are being modified
  • Priority: Critical

Location

  • File: src/cleveragents/a2a/events.py
  • Function/Class: A2aEventQueue.publish() and unsubscribe()
  • Lines: 56-72 and 79-86

Description

The A2aEventQueue class has a race condition where publish() iterates over _subscriptions while callbacks are executed, and unsubscribe() can modify the dictionary during iteration, causing "dictionary changed size during iteration" errors.

Evidence

def publish(self, event: A2aEvent) -> None:
    """Append *event* to the local queue and notify subscribers."""
    if self._is_closed:
        raise RuntimeError("Cannot publish to a closed event queue")
    if not isinstance(event, A2aEvent):
        raise TypeError("event must be an A2aEvent instance")
    self._events.append(event)
    logger.debug(
        "a2a.event.published",
        event_id=event.event_id,
        event_type=event.event_type,
    )
    for sub_id, callback in self._subscriptions.items():  # ← UNSAFE ITERATION
        try:
            callback(event)  # ← Callback could call unsubscribe()
        except Exception:
            logger.exception(
                "a2a.event.callback_error",
                subscription_id=sub_id,
            )

def unsubscribe(self, subscription_id: str) -> bool:
    """Remove a subscription.  Returns ``True`` if it existed."""
    if not subscription_id or not isinstance(subscription_id, str):
        raise ValueError("subscription_id must be a non-empty string")
    removed = self._subscriptions.pop(subscription_id, None) is not None  # ← MODIFIES DICT
    if removed:
        logger.debug("a2a.event.unsubscribed", subscription_id=subscription_id)
    return removed

Expected Behavior

Event publishing and subscription management should be thread-safe and handle concurrent modifications gracefully.

Actual Behavior

When a callback function calls unsubscribe() during event publishing, or when multiple threads access the subscription dictionary, it causes "dictionary changed size during iteration" RuntimeError.

Suggested Fix

Use thread-safe iteration by creating a snapshot of subscriptions:

import threading

def __init__(self) -> None:
    self._events: list[A2aEvent] = []
    self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {}
    self._is_closed: bool = False
    self._subscriptions_lock = threading.RLock()

def publish(self, event: A2aEvent) -> None:
    # ... existing validation ...
    self._events.append(event)
    
    # Create a snapshot to avoid concurrent modification issues
    with self._subscriptions_lock:
        subscriptions_snapshot = dict(self._subscriptions)
    
    for sub_id, callback in subscriptions_snapshot.items():
        try:
            callback(event)
        except Exception:
            logger.exception("a2a.event.callback_error", subscription_id=sub_id)

def unsubscribe(self, subscription_id: str) -> bool:
    with self._subscriptions_lock:
        removed = self._subscriptions.pop(subscription_id, None) is not None
    # ... rest of method

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [Concurrency] — Concurrent access to event subscriptions causes dictionary modification errors ### Severity Assessment - **Impact**: Runtime crashes, event delivery failures, system instability under load - **Likelihood**: High - occurs when events are published while subscriptions are being modified - **Priority**: Critical ### Location - **File**: `src/cleveragents/a2a/events.py` - **Function/Class**: `A2aEventQueue.publish()` and `unsubscribe()` - **Lines**: 56-72 and 79-86 ### Description The `A2aEventQueue` class has a race condition where `publish()` iterates over `_subscriptions` while callbacks are executed, and `unsubscribe()` can modify the dictionary during iteration, causing "dictionary changed size during iteration" errors. ### Evidence ```python def publish(self, event: A2aEvent) -> None: """Append *event* to the local queue and notify subscribers.""" if self._is_closed: raise RuntimeError("Cannot publish to a closed event queue") if not isinstance(event, A2aEvent): raise TypeError("event must be an A2aEvent instance") self._events.append(event) logger.debug( "a2a.event.published", event_id=event.event_id, event_type=event.event_type, ) for sub_id, callback in self._subscriptions.items(): # ← UNSAFE ITERATION try: callback(event) # ← Callback could call unsubscribe() except Exception: logger.exception( "a2a.event.callback_error", subscription_id=sub_id, ) def unsubscribe(self, subscription_id: str) -> bool: """Remove a subscription. Returns ``True`` if it existed.""" if not subscription_id or not isinstance(subscription_id, str): raise ValueError("subscription_id must be a non-empty string") removed = self._subscriptions.pop(subscription_id, None) is not None # ← MODIFIES DICT if removed: logger.debug("a2a.event.unsubscribed", subscription_id=subscription_id) return removed ``` ### Expected Behavior Event publishing and subscription management should be thread-safe and handle concurrent modifications gracefully. ### Actual Behavior When a callback function calls `unsubscribe()` during event publishing, or when multiple threads access the subscription dictionary, it causes "dictionary changed size during iteration" RuntimeError. ### Suggested Fix Use thread-safe iteration by creating a snapshot of subscriptions: ```python import threading def __init__(self) -> None: self._events: list[A2aEvent] = [] self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {} self._is_closed: bool = False self._subscriptions_lock = threading.RLock() def publish(self, event: A2aEvent) -> None: # ... existing validation ... self._events.append(event) # Create a snapshot to avoid concurrent modification issues with self._subscriptions_lock: subscriptions_snapshot = dict(self._subscriptions) for sub_id, callback in subscriptions_snapshot.items(): try: callback(event) except Exception: logger.exception("a2a.event.callback_error", subscription_id=sub_id) def unsubscribe(self, subscription_id: str) -> bool: with self._subscriptions_lock: removed = self._subscriptions.pop(subscription_id, None) is not None # ... rest of method ``` ### Category concurrency ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
Author
Owner

Verified — Critical concurrency bug: concurrent access to event subscriptions causes dictionary modification errors. MoSCoW: Must-have. Priority: Critical.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Critical concurrency bug: concurrent access to event subscriptions causes dictionary modification errors. MoSCoW: Must-have. Priority: Critical. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#7043
No description provided.