a2a/events: A2aEventQueue._events list grows unboundedly — no maximum size cap causes memory leak in long-running processes #10456

Open
opened 2026-04-18 09:48:37 +00:00 by HAL9000 · 1 comment
Owner

Metadata

  • Commit: fix(a2a/events): cap A2aEventQueue._events with configurable deque maxlen
  • Branch: fix/a2a-event-queue-unbounded-memory

Background and Context

A2aEventQueue._events is a plain Python list with no maximum size. Events accumulate indefinitely until close() is called. In a long-running process (e.g., a persistent agent session), this causes unbounded memory growth and eventual OOM.

File: src/cleveragents/a2a/events.py

class A2aEventQueue:
    def __init__(self) -> None:
        self._events: list[A2aEvent] = []   # line 47 — no maxlen
        self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {}
        self._is_closed: bool = False

    def publish(self, event: A2aEvent) -> None:
        """Append *event* to the local queue and notify subscribers."""
        ...
        self._events.append(event)           # line 66 — unbounded append

get_events(limit=100) (line 99) only limits what is returned, not what is stored:

def get_events(self, limit: int = 100) -> list[A2aEvent]:
    """Return the most recent *limit* events from the queue."""
    if not isinstance(limit, int) or limit < 1:
        raise ValueError("limit must be a positive integer")
    return list(self._events[-limit:])   # line 103 — only limits return, not storage

In a long-running agent session where many plan events are published (e.g., PLAN_CREATED, PLAN_PHASE_CHANGED, PLAN_STATE_CHANGED for each action), the _events list grows without bound. A session processing thousands of actions would accumulate thousands of events in memory with no eviction.

The EventBusBridge publishes one event per domain event (line 311), and domain events are emitted for every plan phase change, action execution, etc. A large plan with 1000 actions could generate 3000+ events, all retained in memory.

Expected Behavior

The queue should have a configurable maximum size (e.g., max_events=10000 by default). When the cap is reached, the oldest events should be dropped (ring-buffer / collections.deque(maxlen=N) semantics). get_events() should still return the most recent events.

The fix is to replace self._events: list[A2aEvent] = [] with collections.deque with a configurable maxlen:

import collections

class A2aEventQueue:
    DEFAULT_MAX_EVENTS: int = 10_000

    def __init__(self, max_events: int = DEFAULT_MAX_EVENTS) -> None:
        self._events: collections.deque[A2aEvent] = collections.deque(maxlen=max_events)
        ...

Acceptance Criteria

  • A2aEventQueue accepts a max_events constructor parameter with a sensible default (e.g., 10000)
  • Publishing beyond max_events silently drops the oldest events (ring-buffer semantics)
  • get_events() returns the most recent events correctly after overflow
  • close() calls self._events.clear()
  • TDD tests from #10382 pass
  • nox -s unit_tests passes with coverage ≥ 97%

Subtasks

  • Replace list with collections.deque(maxlen=max_events) in A2aEventQueue.__init__
  • Add max_events constructor parameter with a sensible default (e.g., 10000)
  • Update get_events() to work with deque (already works via slicing)
  • Update close() to call self._events.clear()
  • All TDD tests from #10382 pass

Definition of Done

  • A2aEventQueue accepts max_events parameter
  • Publishing beyond max_events drops oldest events
  • get_events() returns most recent events after overflow
  • TDD tests from #10382 pass
  • nox -s unit_tests passes with coverage ≥ 97%

Blocked By

#10382


Automated by CleverAgents Bot
Agent: new-issue-creator

## Metadata - **Commit**: `fix(a2a/events): cap A2aEventQueue._events with configurable deque maxlen` - **Branch**: `fix/a2a-event-queue-unbounded-memory` ## Background and Context `A2aEventQueue._events` is a plain Python list with no maximum size. Events accumulate indefinitely until `close()` is called. In a long-running process (e.g., a persistent agent session), this causes unbounded memory growth and eventual OOM. **File**: `src/cleveragents/a2a/events.py` ```python class A2aEventQueue: def __init__(self) -> None: self._events: list[A2aEvent] = [] # line 47 — no maxlen self._subscriptions: dict[str, Callable[[A2aEvent], Any]] = {} self._is_closed: bool = False def publish(self, event: A2aEvent) -> None: """Append *event* to the local queue and notify subscribers.""" ... self._events.append(event) # line 66 — unbounded append ``` `get_events(limit=100)` (line 99) only limits what is *returned*, not what is *stored*: ```python def get_events(self, limit: int = 100) -> list[A2aEvent]: """Return the most recent *limit* events from the queue.""" if not isinstance(limit, int) or limit < 1: raise ValueError("limit must be a positive integer") return list(self._events[-limit:]) # line 103 — only limits return, not storage ``` In a long-running agent session where many plan events are published (e.g., `PLAN_CREATED`, `PLAN_PHASE_CHANGED`, `PLAN_STATE_CHANGED` for each action), the `_events` list grows without bound. A session processing thousands of actions would accumulate thousands of events in memory with no eviction. The `EventBusBridge` publishes one event per domain event (line 311), and domain events are emitted for every plan phase change, action execution, etc. A large plan with 1000 actions could generate 3000+ events, all retained in memory. ## Expected Behavior The queue should have a configurable maximum size (e.g., `max_events=10000` by default). When the cap is reached, the oldest events should be dropped (ring-buffer / `collections.deque(maxlen=N)` semantics). `get_events()` should still return the most recent events. The fix is to replace `self._events: list[A2aEvent] = []` with `collections.deque` with a configurable `maxlen`: ```python import collections class A2aEventQueue: DEFAULT_MAX_EVENTS: int = 10_000 def __init__(self, max_events: int = DEFAULT_MAX_EVENTS) -> None: self._events: collections.deque[A2aEvent] = collections.deque(maxlen=max_events) ... ``` ## Acceptance Criteria - `A2aEventQueue` accepts a `max_events` constructor parameter with a sensible default (e.g., 10000) - Publishing beyond `max_events` silently drops the oldest events (ring-buffer semantics) - `get_events()` returns the most recent events correctly after overflow - `close()` calls `self._events.clear()` - TDD tests from #10382 pass - `nox -s unit_tests` passes with coverage ≥ 97% ## Subtasks - [ ] Replace `list` with `collections.deque(maxlen=max_events)` in `A2aEventQueue.__init__` - [ ] Add `max_events` constructor parameter with a sensible default (e.g., 10000) - [ ] Update `get_events()` to work with deque (already works via slicing) - [ ] Update `close()` to call `self._events.clear()` - [ ] All TDD tests from #10382 pass ## Definition of Done - [ ] `A2aEventQueue` accepts `max_events` parameter - [ ] Publishing beyond `max_events` drops oldest events - [ ] `get_events()` returns most recent events after overflow - [ ] TDD tests from #10382 pass - [ ] `nox -s unit_tests` passes with coverage ≥ 97% ## Blocked By #10382 --- **Automated by CleverAgents Bot** Agent: new-issue-creator
HAL9000 added this to the v3.5.0 milestone 2026-04-18 10:01:22 +00:00
Author
Owner

[GROOMED] Quality Analysis Complete

Validity Assessment: VALID

This is a legitimate, well-documented Priority/Critical bug with clear reproduction scenarios and actionable acceptance criteria.

Issue Summary:

  • Problem: A2aEventQueue._events is a plain Python list with no maximum size, causing unbounded memory growth in long-running processes
  • Impact: Memory leak in persistent agent sessions; OOM risk with thousands of events
  • Root Cause: No maxlen enforcement on the event queue
  • Solution: Replace list with collections.deque(maxlen=max_events) with configurable default

Label Verification

All required labels present and correct:

  • State: State/Unverified → State/Verified (updated)
  • Type: Type/Bug (correct)
  • Priority: Priority/Critical (correct)
  • MoSCoW: MoSCoW/Must have (auto-applied, appropriate for Critical priority)

Milestone Assignment

  • Assigned to: v3.5.0 (M6: Autonomy Hardening)
  • Rationale: This issue is part of the A2A event queue work stream; v3.5.0 explicitly requires "Event queue publish/subscribe operational"
  • Dependency: Blocked by #10382 (noted in issue)

🚨 Priority/Critical Flag

ATTENTION: This is a Priority/Critical issue that impacts system stability. The unbounded memory growth directly threatens long-running agent sessions. Recommend:

  1. Prioritize implementation after #10382 is resolved
  2. Include in v3.5.0 sprint planning
  3. Consider for early implementation given memory safety implications

Issue Quality

  • Documentation: Excellent — includes background, code examples, acceptance criteria, subtasks, and definition of done
  • Actionability: High — clear implementation path using collections.deque
  • Test Coverage: Specified (TDD tests from #10382, nox unit tests with ≥97% coverage)
  • Acceptance Criteria: Well-defined with 5 specific requirements

Summary of Changes

  • Moved from State/Unverified → State/Verified
  • Assigned to v3.5.0 milestone
  • Verified all required labels present
  • Flagged as Priority/Critical for immediate attention

Status: Ready for implementation planning. No further grooming needed.


Automated by CleverAgents Bot
Supervisor: Grooming | Agent: grooming-pool-supervisor

## [GROOMED] Quality Analysis Complete ### ✅ Validity Assessment: **VALID** This is a legitimate, well-documented Priority/Critical bug with clear reproduction scenarios and actionable acceptance criteria. **Issue Summary:** - **Problem**: `A2aEventQueue._events` is a plain Python list with no maximum size, causing unbounded memory growth in long-running processes - **Impact**: Memory leak in persistent agent sessions; OOM risk with thousands of events - **Root Cause**: No `maxlen` enforcement on the event queue - **Solution**: Replace `list` with `collections.deque(maxlen=max_events)` with configurable default ### ✅ Label Verification All required labels present and correct: - ✅ **State**: State/Unverified → **State/Verified** (updated) - ✅ **Type**: Type/Bug (correct) - ✅ **Priority**: Priority/Critical (correct) - ✅ **MoSCoW**: MoSCoW/Must have (auto-applied, appropriate for Critical priority) ### ✅ Milestone Assignment - **Assigned to**: v3.5.0 (M6: Autonomy Hardening) - **Rationale**: This issue is part of the A2A event queue work stream; v3.5.0 explicitly requires "Event queue publish/subscribe operational" - **Dependency**: Blocked by #10382 (noted in issue) ### 🚨 Priority/Critical Flag **ATTENTION**: This is a Priority/Critical issue that impacts system stability. The unbounded memory growth directly threatens long-running agent sessions. Recommend: 1. Prioritize implementation after #10382 is resolved 2. Include in v3.5.0 sprint planning 3. Consider for early implementation given memory safety implications ### ✅ Issue Quality - **Documentation**: Excellent — includes background, code examples, acceptance criteria, subtasks, and definition of done - **Actionability**: High — clear implementation path using `collections.deque` - **Test Coverage**: Specified (TDD tests from #10382, nox unit tests with ≥97% coverage) - **Acceptance Criteria**: Well-defined with 5 specific requirements ### Summary of Changes - ✅ Moved from State/Unverified → State/Verified - ✅ Assigned to v3.5.0 milestone - ✅ Verified all required labels present - ✅ Flagged as Priority/Critical for immediate attention **Status**: Ready for implementation planning. No further grooming needed. --- **Automated by CleverAgents Bot** Supervisor: Grooming | Agent: grooming-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#10456
No description provided.