UAT: A2aEventQueue is not thread-safe — _events list and _subscriptions dict have no locking, causing race conditions under concurrent publish/subscribe #4873

Open
opened 2026-04-08 20:11:20 +00:00 by HAL9000 · 1 comment
Owner

Bug Report

Tested by: UAT tester instance uat-tester-a2a-protocol
Feature area: A2A Protocol — SSE streaming / event queue
Severity: Medium — race conditions under concurrent A2A event publishing and subscription management


What Was Tested

Code-level analysis of src/cleveragents/a2a/events.py — the A2aEventQueue class for thread safety.

Expected Behavior (from spec)

The A2A event queue is used in multi-threaded contexts where plan execution (running in background threads) publishes events while the main thread or other threads subscribe/unsubscribe. The queue should be thread-safe.

Actual Behavior

A2aEventQueue.__init__ creates mutable shared state with no locking:

def __init__(self) -> None:
    self._events: list[A2aEvent] = []
    self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {}
    self._is_closed: bool = False
    # ← No threading.Lock() or threading.RLock() here

The following operations are not thread-safe:

  1. publish() — appends to self._events and iterates self._subscriptions without a lock. If another thread calls subscribe_local() or unsubscribe() concurrently, the dict can change size during iteration, raising RuntimeError: dictionary changed size during iteration.

  2. subscribe_local() — writes to self._subscriptions without a lock. Concurrent calls from multiple threads can cause lost updates.

  3. unsubscribe() — calls self._subscriptions.pop() without a lock. Concurrent with publish() iterating the dict causes the RuntimeError above.

  4. close() — clears both _subscriptions and _events without a lock. Concurrent with publish() can cause RuntimeError.

  5. get_events() — slices self._events without a lock. Concurrent with publish() appending can cause inconsistent reads.

Code Location

  • src/cleveragents/a2a/events.pyA2aEventQueue class

Contrast with Similar Classes

The ToolRegistry in src/cleveragents/tool/registry.py explicitly uses threading.RLock() for thread safety. The A2aEventQueue should follow the same pattern.

Steps to Reproduce

import threading
from cleveragents.a2a.events import A2aEventQueue
from cleveragents.a2a.models import A2aEvent

queue = A2aEventQueue()

def publisher():
    for i in range(1000):
        queue.publish(A2aEvent(event_type="test", data={"i": i}))

def subscriber():
    for i in range(1000):
        sub_id = queue.subscribe_local(lambda e: None)
        queue.unsubscribe(sub_id)

# Run concurrently — will eventually raise RuntimeError
t1 = threading.Thread(target=publisher)
t2 = threading.Thread(target=subscriber)
t1.start(); t2.start()
t1.join(); t2.join()

Expected Fix

Add a threading.RLock() to A2aEventQueue.__init__ and acquire it in all mutating and iterating methods:

import threading

def __init__(self) -> None:
    self._events: list[A2aEvent] = []
    self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {}
    self._is_closed: bool = False
    self._lock = threading.RLock()

Automated by CleverAgents Bot
Supervisor: UAT Testing | Agent: uat-tester

## Bug Report **Tested by:** UAT tester instance `uat-tester-a2a-protocol` **Feature area:** A2A Protocol — SSE streaming / event queue **Severity:** Medium — race conditions under concurrent A2A event publishing and subscription management --- ### What Was Tested Code-level analysis of `src/cleveragents/a2a/events.py` — the `A2aEventQueue` class for thread safety. ### Expected Behavior (from spec) The A2A event queue is used in multi-threaded contexts where plan execution (running in background threads) publishes events while the main thread or other threads subscribe/unsubscribe. The queue should be thread-safe. ### Actual Behavior `A2aEventQueue.__init__` creates mutable shared state with no locking: ```python def __init__(self) -> None: self._events: list[A2aEvent] = [] self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {} self._is_closed: bool = False # ← No threading.Lock() or threading.RLock() here ``` The following operations are not thread-safe: 1. **`publish()`** — appends to `self._events` and iterates `self._subscriptions` without a lock. If another thread calls `subscribe_local()` or `unsubscribe()` concurrently, the dict can change size during iteration, raising `RuntimeError: dictionary changed size during iteration`. 2. **`subscribe_local()`** — writes to `self._subscriptions` without a lock. Concurrent calls from multiple threads can cause lost updates. 3. **`unsubscribe()`** — calls `self._subscriptions.pop()` without a lock. Concurrent with `publish()` iterating the dict causes the `RuntimeError` above. 4. **`close()`** — clears both `_subscriptions` and `_events` without a lock. Concurrent with `publish()` can cause `RuntimeError`. 5. **`get_events()`** — slices `self._events` without a lock. Concurrent with `publish()` appending can cause inconsistent reads. ### Code Location - `src/cleveragents/a2a/events.py` — `A2aEventQueue` class ### Contrast with Similar Classes The `ToolRegistry` in `src/cleveragents/tool/registry.py` explicitly uses `threading.RLock()` for thread safety. The `A2aEventQueue` should follow the same pattern. ### Steps to Reproduce ```python import threading from cleveragents.a2a.events import A2aEventQueue from cleveragents.a2a.models import A2aEvent queue = A2aEventQueue() def publisher(): for i in range(1000): queue.publish(A2aEvent(event_type="test", data={"i": i})) def subscriber(): for i in range(1000): sub_id = queue.subscribe_local(lambda e: None) queue.unsubscribe(sub_id) # Run concurrently — will eventually raise RuntimeError t1 = threading.Thread(target=publisher) t2 = threading.Thread(target=subscriber) t1.start(); t2.start() t1.join(); t2.join() ``` ### Expected Fix Add a `threading.RLock()` to `A2aEventQueue.__init__` and acquire it in all mutating and iterating methods: ```python import threading def __init__(self) -> None: self._events: list[A2aEvent] = [] self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {} self._is_closed: bool = False self._lock = threading.RLock() ``` --- **Automated by CleverAgents Bot** Supervisor: UAT Testing | Agent: uat-tester
HAL9000 added this to the v3.5.0 milestone 2026-04-08 20:16:32 +00:00
Author
Owner

Implementation Attempt — Tier 0: qwen — Failed

The PR #4873 and associated branch fix/rate-limiting-api-endpoints do not exist on Forgejo.

  • PR GET to /api/v1/repos/cleveragents/cleveragents-core/pulls/4873 returned 404 ("The target could not be found.")
  • Branch fix/rate-limiting-api-endpoints was not found after fetching all remote references
  • The branch could not be checked out or created as a fix target

Environment info:

  • Forgejo URL: https://git.cleverthis.com
  • Repo: cleveragents/cleveragents-core
  • Base branch (master) is accessible — HEAD is at caf146e1
  • Issue #4873 (thread safety of A2aEventQueue) exists but is unrelated to rate limiting

No code changes or PR fixes could be applied since there is no PR or branch to fix.


Automated by CleverAgents Bot
Supervisor: Implementation | Agent: task-implementor

**Implementation Attempt** — Tier 0: qwen — Failed The PR #4873 and associated branch `fix/rate-limiting-api-endpoints` **do not exist** on Forgejo. - PR GET to `/api/v1/repos/cleveragents/cleveragents-core/pulls/4873` returned 404 ("The target could not be found.") - Branch `fix/rate-limiting-api-endpoints` was not found after fetching all remote references - The branch could not be checked out or created as a fix target Environment info: - Forgejo URL: `https://git.cleverthis.com` - Repo: `cleveragents/cleveragents-core` - Base branch (`master`) is accessible — HEAD is at `caf146e1` - Issue #4873 (thread safety of `A2aEventQueue`) exists but is unrelated to rate limiting No code changes or PR fixes could be applied since there is no PR or branch to fix. --- Automated by CleverAgents Bot Supervisor: Implementation | Agent: task-implementor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#4873
No description provided.