[BUG] Race condition in Agent class due to non-thread-safe _tasks list #9409

Open
opened 2026-04-14 16:52:54 +00:00 by HAL9000 · 1 comment
Owner

Metadata

  • Commit Message: fix(agents): protect _tasks list with threading.Lock in Agent._on_next
  • Branch: fix/agents-base-tasks-list-thread-safety

Background and Context

The Agent class in src/cleveragents/agents/base.py has a potential race condition in its _setup_processing_pipeline method. The _on_next callback appends to self._tasks without any synchronisation primitive:

def _setup_processing_pipeline(self) -> None:
    def _on_next(msg: Any) -> None:
        task = asyncio.create_task(self._process_wrapper(msg))
        self._tasks.append(task)   # ← unprotected shared-state mutation

    self.input_stream.subscribe(
        on_next=_on_next,
        on_error=self.output_stream.on_error,
    )

While a typical asyncio application runs on a single thread and therefore rarely triggers this path concurrently, the RxPY Subject used as input_stream does not guarantee single-threaded delivery. If on_next is called from multiple threads (e.g. via send_message from a thread pool, or when the observable source is backed by a multi-threaded scheduler), two threads can interleave their list.append calls, corrupting the internal state of _tasks.

Python's list.append is GIL-protected for CPython and is therefore effectively atomic in CPython today, but:

  1. This is an implementation detail of CPython, not a language guarantee.
  2. Alternative Python runtimes (PyPy, Jython, GraalPy) do not share this guarantee.
  3. The project's own AgentWithMemory subclass already uses asyncio.Lock for its memory dict, establishing the pattern that shared mutable state should be explicitly protected.
  4. The _tasks list is also read (e.g. during dispose) without any lock, creating a potential TOCTOU hazard.

Expected Behavior

self._tasks should be protected by a threading.Lock (or equivalent) so that concurrent calls to _on_next from multiple threads cannot corrupt the list. The fix should be consistent with the existing locking pattern used in AgentWithMemory.

Actual Behavior

self._tasks.append(task) in Agent._on_next (src/cleveragents/agents/base.py) is executed without any lock, leaving the list vulnerable to concurrent mutation.

Acceptance Criteria

  • A threading.Lock (or asyncio.Lock with appropriate await) is introduced in Agent.__init__ to guard _tasks.
  • All mutations of _tasks (append in _on_next) are performed while holding the lock.
  • All reads of _tasks that could race with mutations are also performed while holding the lock.
  • Existing tests continue to pass with no regressions.
  • New BDD scenarios cover concurrent send_message calls and verify _tasks integrity.
  • Test coverage remains ≥ 97%.

Supporting Information

  • Code location: cleveragents.agents.base.Agent._setup_processing_pipeline (commit 339f8a8e13bfcf419fd72dca8aea173c8b28da0d)
  • Related pattern: AgentWithMemory._memory_lock in the same file already demonstrates the correct locking approach.
  • Severity: Low — unlikely to trigger in a standard asyncio application, but a correctness hazard in multi-threaded or mixed-scheduler environments.

Subtasks

  • Add self._tasks_lock = threading.Lock() in Agent.__init__
  • Wrap self._tasks.append(task) in _on_next with with self._tasks_lock:
  • Audit any other reads/writes of _tasks (e.g. in dispose) and protect them
  • Tests (Behave): Add BDD scenarios for concurrent send_message calls verifying _tasks list integrity
  • Tests (Robot): Add integration test for multi-threaded agent message delivery
  • Verify coverage ≥ 97% via nox -s coverage_report
  • Run nox (all default sessions), fix any errors

Definition of Done

This issue is complete when:

  • All subtasks above are completed and checked off.
  • A Git commit is created where the first line of the commit message matches the Commit Message in Metadata exactly (fix(agents): protect _tasks list with threading.Lock in Agent._on_next), followed by a blank line, then additional lines providing relevant details about the implementation.
  • The commit is pushed to the remote on the branch matching the Branch in Metadata exactly (fix/agents-base-tasks-list-thread-safety).
  • The commit is submitted as a pull request to master, reviewed, and merged before this issue is marked done.

Automated by CleverAgents Bot
Agent: new-issue-creator

## Metadata - **Commit Message**: `fix(agents): protect _tasks list with threading.Lock in Agent._on_next` - **Branch**: `fix/agents-base-tasks-list-thread-safety` ## Background and Context The `Agent` class in `src/cleveragents/agents/base.py` has a potential race condition in its `_setup_processing_pipeline` method. The `_on_next` callback appends to `self._tasks` without any synchronisation primitive: ```python def _setup_processing_pipeline(self) -> None: def _on_next(msg: Any) -> None: task = asyncio.create_task(self._process_wrapper(msg)) self._tasks.append(task) # ← unprotected shared-state mutation self.input_stream.subscribe( on_next=_on_next, on_error=self.output_stream.on_error, ) ``` While a typical asyncio application runs on a single thread and therefore rarely triggers this path concurrently, the RxPY `Subject` used as `input_stream` does not guarantee single-threaded delivery. If `on_next` is called from multiple threads (e.g. via `send_message` from a thread pool, or when the observable source is backed by a multi-threaded scheduler), two threads can interleave their `list.append` calls, corrupting the internal state of `_tasks`. Python's `list.append` is GIL-protected for CPython and is therefore effectively atomic in CPython today, but: 1. This is an implementation detail of CPython, not a language guarantee. 2. Alternative Python runtimes (PyPy, Jython, GraalPy) do not share this guarantee. 3. The project's own `AgentWithMemory` subclass already uses `asyncio.Lock` for its `memory` dict, establishing the pattern that shared mutable state should be explicitly protected. 4. The `_tasks` list is also read (e.g. during `dispose`) without any lock, creating a potential TOCTOU hazard. ## Expected Behavior `self._tasks` should be protected by a `threading.Lock` (or equivalent) so that concurrent calls to `_on_next` from multiple threads cannot corrupt the list. The fix should be consistent with the existing locking pattern used in `AgentWithMemory`. ## Actual Behavior `self._tasks.append(task)` in `Agent._on_next` (`src/cleveragents/agents/base.py`) is executed without any lock, leaving the list vulnerable to concurrent mutation. ## Acceptance Criteria - [ ] A `threading.Lock` (or `asyncio.Lock` with appropriate await) is introduced in `Agent.__init__` to guard `_tasks`. - [ ] All mutations of `_tasks` (append in `_on_next`) are performed while holding the lock. - [ ] All reads of `_tasks` that could race with mutations are also performed while holding the lock. - [ ] Existing tests continue to pass with no regressions. - [ ] New BDD scenarios cover concurrent `send_message` calls and verify `_tasks` integrity. - [ ] Test coverage remains ≥ 97%. ## Supporting Information - **Code location**: `cleveragents.agents.base.Agent._setup_processing_pipeline` (commit `339f8a8e13bfcf419fd72dca8aea173c8b28da0d`) - **Related pattern**: `AgentWithMemory._memory_lock` in the same file already demonstrates the correct locking approach. - **Severity**: Low — unlikely to trigger in a standard asyncio application, but a correctness hazard in multi-threaded or mixed-scheduler environments. ## Subtasks - [ ] Add `self._tasks_lock = threading.Lock()` in `Agent.__init__` - [ ] Wrap `self._tasks.append(task)` in `_on_next` with `with self._tasks_lock:` - [ ] Audit any other reads/writes of `_tasks` (e.g. in `dispose`) and protect them - [ ] Tests (Behave): Add BDD scenarios for concurrent `send_message` calls verifying `_tasks` list integrity - [ ] Tests (Robot): Add integration test for multi-threaded agent message delivery - [ ] Verify coverage ≥ 97% via `nox -s coverage_report` - [ ] Run `nox` (all default sessions), fix any errors ## Definition of Done This issue is complete when: - All subtasks above are completed and checked off. - A Git commit is created where the **first line** of the commit message matches the Commit Message in Metadata exactly (`fix(agents): protect _tasks list with threading.Lock in Agent._on_next`), followed by a blank line, then additional lines providing relevant details about the implementation. - The commit is pushed to the remote on the branch matching the **Branch** in Metadata exactly (`fix/agents-base-tasks-list-thread-safety`). - The commit is submitted as a **pull request** to `master`, reviewed, and **merged** before this issue is marked done. --- **Automated by CleverAgents Bot** Agent: new-issue-creator
HAL9000 added this to the v3.5.0 milestone 2026-04-14 17:39:00 +00:00
Author
Owner

Triage Decision [AUTO-OWNR-1]: Verified as a valid race condition bug. Non-thread-safe _tasks list in the Agent class can cause data corruption under concurrent access. This is a Must Have fix for v3.5.0 (Autonomy Hardening) where parallel execution is a core requirement.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Triage Decision [AUTO-OWNR-1]**: Verified as a valid race condition bug. Non-thread-safe `_tasks` list in the Agent class can cause data corruption under concurrent access. This is a `Must Have` fix for v3.5.0 (Autonomy Hardening) where parallel execution is a core requirement. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#9409
No description provided.