BUG-HUNT: [CONCURRENCY] Race condition in task management causes resource leaks and system instability #7105

Open
opened 2026-04-10 07:45:03 +00:00 by HAL9000 · 2 comments
Owner

Metadata

  • Branch: bugfix/m3.2.0-langgraph-bridge-active-tasks-race-condition
  • Commit Message: fix(langgraph): add asyncio.Lock synchronization to _active_tasks in RxPyLangGraphBridge
  • Milestone: v3.2.0
  • Parent Epic: TBD — see orphan note below

Background and Context

RxPyLangGraphBridge in src/cleveragents/langgraph/bridge.py maintains an _active_tasks set to track in-flight asyncio.Task objects. This set is mutated from multiple concurrent contexts — the create_future_task closure (called from an RxPy flat_map operator, which runs in a thread-pool) and the done_callback lambda (invoked by the asyncio event loop on task completion) — without any synchronization primitive protecting the shared state.

The RxPyLangGraphBridge is the core bridge between the RxPy reactive stream pipeline and LangGraph graph execution. It is used by every actor graph execution path in the system. A race condition here can corrupt the task-tracking set, cause resource leaks, and produce system instability under concurrent load.

Current Behavior

_active_tasks is a plain set[asyncio.Task[Any]] with no lock:

# src/cleveragents/langgraph/bridge.py, lines ~226–236
def create_future_task(msg: StreamMessage) -> asyncio.Future[StreamMessage]:
    loop = asyncio.get_event_loop()
    task: asyncio.Future[StreamMessage] = asyncio.ensure_future(
        execute_graph(msg), loop=loop
    )
    self._active_tasks.add(task)          # UNSAFE: No synchronization
    task.add_done_callback(
        lambda t: self._active_tasks.discard(cast(asyncio.Task[Any], t))  # UNSAFE
    )
    return task
  • self._active_tasks.add(task) is called from the RxPy flat_map thread-pool context.
  • The done_callback lambda calling self._active_tasks.discard(...) is invoked by the asyncio event loop thread.
  • cleanup_tasks() and cleanup_tasks_async() also iterate and clear _active_tasks from yet another context.

These concurrent mutations on a plain set are not thread-safe in CPython (the GIL does not protect compound operations like add + iteration).

Expected Behavior

All mutations to _active_tasks must be protected by a synchronization primitive (e.g., asyncio.Lock for async contexts, threading.Lock for cross-thread access) to guarantee:

  1. No task is silently dropped from the tracking set due to a lost write.
  2. No RuntimeError: Set changed size during iteration during cleanup.
  3. No resource leak from tasks that were added but never discarded due to a race between add and discard.
  4. No memory exhaustion from unbounded accumulation of completed tasks whose callbacks lost the race.

Evidence

# src/cleveragents/langgraph/bridge.py, lines ~226–236
def create_future_task(msg: StreamMessage) -> asyncio.Future[StreamMessage]:
    loop = asyncio.get_event_loop()
    task: asyncio.Future[StreamMessage] = asyncio.ensure_future(
        execute_graph(msg), loop=loop
    )
    self._active_tasks.add(task)  # UNSAFE: No synchronization
    task.add_done_callback(
        lambda t: self._active_tasks.discard(cast(asyncio.Task[Any], t))  # UNSAFE
    )
    return task

The cleanup_tasks_async method also has a TOCTOU window:

# lines ~62–96
tasks = list(self._active_tasks)   # snapshot — but new tasks can be added concurrently
# ... awaits ...
late_tasks = self._active_tasks - set(tasks)  # set difference — not atomic

Impact

  • Race condition in task tracking: Multiple concurrent graph executions can add/remove tasks simultaneously, corrupting the _active_tasks set.
  • Resource leaks: Tasks that complete during a concurrent add may never be discarded, causing the set to grow unboundedly.
  • Memory exhaustion: Under sustained concurrent load, leaked task references prevent garbage collection.
  • RuntimeError during cleanup: cleanup_tasks() iterates _active_tasks while callbacks may be discarding from it concurrently.
  • System instability: The bridge is used in every actor graph execution path; corruption here propagates to all graph-based actors.

Acceptance Criteria

  • _active_tasks mutations are protected by a threading.Lock (for cross-thread safety) or restructured to be event-loop-only with proper scheduling.
  • create_future_task adds the task to _active_tasks atomically before returning.
  • The done_callback discards the task safely without risk of RuntimeError.
  • cleanup_tasks() and cleanup_tasks_async() iterate a snapshot and handle concurrent modification gracefully.
  • No task reference is leaked after completion under concurrent load.
  • All nox stages pass.
  • Coverage ≥ 97%.

Suggested Fix

Replace the plain set with a lock-protected wrapper:

import threading

# In __init__:
self._active_tasks: set[asyncio.Task[Any]] = set()
self._tasks_lock: threading.Lock = threading.Lock()

# In create_future_task:
with self._tasks_lock:
    self._active_tasks.add(task)
task.add_done_callback(
    lambda t: self._remove_task(cast(asyncio.Task[Any], t))
)

def _remove_task(self, task: asyncio.Task[Any]) -> None:
    with self._tasks_lock:
        self._active_tasks.discard(task)

Subtasks

  • Add threading.Lock (_tasks_lock) to RxPyLangGraphBridge.__init__
  • Wrap _active_tasks.add(task) in create_future_task with _tasks_lock
  • Extract _remove_task helper that acquires _tasks_lock before discard
  • Update cleanup_tasks() to snapshot under lock before iterating
  • Update cleanup_tasks_async() to snapshot under lock and handle late tasks atomically
  • Write TDD Behave scenario reproducing the race condition (tagged @tdd_issue, @tdd_issue_<N>, @tdd_expected_fail)
  • Write Robot Framework integration test verifying concurrent graph execution does not leak tasks
  • Write ASV benchmark for concurrent task creation/cleanup throughput
  • Remove @tdd_expected_fail tag once fix is in place

Definition of Done

  • threading.Lock guards all _active_tasks mutations in RxPyLangGraphBridge
  • No RuntimeError: Set changed size during iteration under concurrent load
  • No task references leaked after completion in concurrent execution scenarios
  • BDD scenario with @tdd_issue and @tdd_issue_<N> tags passes (after fix, with @tdd_expected_fail removed)
  • Robot Framework integration test passes
  • ASV benchmark shows no regression > 10%
  • All nox stages pass
  • Coverage ≥ 97%

Orphan note: No parent Epic for the langgraph/bridge.py concurrency fix track was found at issue creation time. This issue must be manually linked to the appropriate parent Epic before work begins. Candidate: search for an Epic covering RxPyLangGraphBridge correctness or LangGraph/RxPy integration hardening.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: new-issue-creator

## Metadata - **Branch**: `bugfix/m3.2.0-langgraph-bridge-active-tasks-race-condition` - **Commit Message**: `fix(langgraph): add asyncio.Lock synchronization to _active_tasks in RxPyLangGraphBridge` - **Milestone**: v3.2.0 - **Parent Epic**: TBD — see orphan note below ## Background and Context `RxPyLangGraphBridge` in `src/cleveragents/langgraph/bridge.py` maintains an `_active_tasks` set to track in-flight `asyncio.Task` objects. This set is mutated from multiple concurrent contexts — the `create_future_task` closure (called from an RxPy `flat_map` operator, which runs in a thread-pool) and the `done_callback` lambda (invoked by the asyncio event loop on task completion) — without any synchronization primitive protecting the shared state. The `RxPyLangGraphBridge` is the core bridge between the RxPy reactive stream pipeline and LangGraph graph execution. It is used by every actor graph execution path in the system. A race condition here can corrupt the task-tracking set, cause resource leaks, and produce system instability under concurrent load. ## Current Behavior `_active_tasks` is a plain `set[asyncio.Task[Any]]` with no lock: ```python # src/cleveragents/langgraph/bridge.py, lines ~226–236 def create_future_task(msg: StreamMessage) -> asyncio.Future[StreamMessage]: loop = asyncio.get_event_loop() task: asyncio.Future[StreamMessage] = asyncio.ensure_future( execute_graph(msg), loop=loop ) self._active_tasks.add(task) # UNSAFE: No synchronization task.add_done_callback( lambda t: self._active_tasks.discard(cast(asyncio.Task[Any], t)) # UNSAFE ) return task ``` - `self._active_tasks.add(task)` is called from the RxPy `flat_map` thread-pool context. - The `done_callback` lambda calling `self._active_tasks.discard(...)` is invoked by the asyncio event loop thread. - `cleanup_tasks()` and `cleanup_tasks_async()` also iterate and clear `_active_tasks` from yet another context. These concurrent mutations on a plain `set` are not thread-safe in CPython (the GIL does not protect compound operations like `add` + iteration). ## Expected Behavior All mutations to `_active_tasks` must be protected by a synchronization primitive (e.g., `asyncio.Lock` for async contexts, `threading.Lock` for cross-thread access) to guarantee: 1. No task is silently dropped from the tracking set due to a lost write. 2. No `RuntimeError: Set changed size during iteration` during cleanup. 3. No resource leak from tasks that were added but never discarded due to a race between `add` and `discard`. 4. No memory exhaustion from unbounded accumulation of completed tasks whose callbacks lost the race. ## Evidence ```python # src/cleveragents/langgraph/bridge.py, lines ~226–236 def create_future_task(msg: StreamMessage) -> asyncio.Future[StreamMessage]: loop = asyncio.get_event_loop() task: asyncio.Future[StreamMessage] = asyncio.ensure_future( execute_graph(msg), loop=loop ) self._active_tasks.add(task) # UNSAFE: No synchronization task.add_done_callback( lambda t: self._active_tasks.discard(cast(asyncio.Task[Any], t)) # UNSAFE ) return task ``` The `cleanup_tasks_async` method also has a TOCTOU window: ```python # lines ~62–96 tasks = list(self._active_tasks) # snapshot — but new tasks can be added concurrently # ... awaits ... late_tasks = self._active_tasks - set(tasks) # set difference — not atomic ``` ## Impact - **Race condition in task tracking**: Multiple concurrent graph executions can add/remove tasks simultaneously, corrupting the `_active_tasks` set. - **Resource leaks**: Tasks that complete during a concurrent `add` may never be discarded, causing the set to grow unboundedly. - **Memory exhaustion**: Under sustained concurrent load, leaked task references prevent garbage collection. - **`RuntimeError` during cleanup**: `cleanup_tasks()` iterates `_active_tasks` while callbacks may be discarding from it concurrently. - **System instability**: The bridge is used in every actor graph execution path; corruption here propagates to all graph-based actors. ## Acceptance Criteria - [ ] `_active_tasks` mutations are protected by a `threading.Lock` (for cross-thread safety) or restructured to be event-loop-only with proper scheduling. - [ ] `create_future_task` adds the task to `_active_tasks` atomically before returning. - [ ] The `done_callback` discards the task safely without risk of `RuntimeError`. - [ ] `cleanup_tasks()` and `cleanup_tasks_async()` iterate a snapshot and handle concurrent modification gracefully. - [ ] No task reference is leaked after completion under concurrent load. - [ ] All nox stages pass. - [ ] Coverage ≥ 97%. ## Suggested Fix Replace the plain `set` with a lock-protected wrapper: ```python import threading # In __init__: self._active_tasks: set[asyncio.Task[Any]] = set() self._tasks_lock: threading.Lock = threading.Lock() # In create_future_task: with self._tasks_lock: self._active_tasks.add(task) task.add_done_callback( lambda t: self._remove_task(cast(asyncio.Task[Any], t)) ) def _remove_task(self, task: asyncio.Task[Any]) -> None: with self._tasks_lock: self._active_tasks.discard(task) ``` ## Subtasks - [ ] Add `threading.Lock` (`_tasks_lock`) to `RxPyLangGraphBridge.__init__` - [ ] Wrap `_active_tasks.add(task)` in `create_future_task` with `_tasks_lock` - [ ] Extract `_remove_task` helper that acquires `_tasks_lock` before `discard` - [ ] Update `cleanup_tasks()` to snapshot under lock before iterating - [ ] Update `cleanup_tasks_async()` to snapshot under lock and handle late tasks atomically - [ ] Write TDD Behave scenario reproducing the race condition (tagged `@tdd_issue`, `@tdd_issue_<N>`, `@tdd_expected_fail`) - [ ] Write Robot Framework integration test verifying concurrent graph execution does not leak tasks - [ ] Write ASV benchmark for concurrent task creation/cleanup throughput - [ ] Remove `@tdd_expected_fail` tag once fix is in place ## Definition of Done - [ ] `threading.Lock` guards all `_active_tasks` mutations in `RxPyLangGraphBridge` - [ ] No `RuntimeError: Set changed size during iteration` under concurrent load - [ ] No task references leaked after completion in concurrent execution scenarios - [ ] BDD scenario with `@tdd_issue` and `@tdd_issue_<N>` tags passes (after fix, with `@tdd_expected_fail` removed) - [ ] Robot Framework integration test passes - [ ] ASV benchmark shows no regression > 10% - [ ] All nox stages pass - [ ] Coverage ≥ 97% > **Orphan note:** No parent Epic for the `langgraph/bridge.py` concurrency fix track was found at issue creation time. This issue must be manually linked to the appropriate parent Epic before work begins. Candidate: search for an Epic covering `RxPyLangGraphBridge` correctness or LangGraph/RxPy integration hardening. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: new-issue-creator
HAL9000 added this to the v3.2.0 milestone 2026-04-10 07:45:10 +00:00
Author
Owner

⚠️ Orphan Issue — Needs Manual Parent Epic Linking

This issue was created by an automated agent and could not be linked to a parent Epic at creation time. A thorough search of open and closed Epics (Type/Epic) did not surface a dedicated Epic for RxPyLangGraphBridge concurrency correctness or LangGraph/RxPy integration hardening.

Action required (human maintainer):

  1. Identify or create the appropriate parent Epic for langgraph/bridge.py correctness work.
  2. Link this issue as a child: open the parent Epic → add this issue (#7105) under "depends on", OR open this issue and set the parent Epic under "blocks".
  3. Update the ## Metadata section of this issue body to replace TBD with the actual Epic number.

Candidate Epics to check:

  • Any Epic covering RxPyLangGraphBridge or langgraph module hardening
  • Any Epic covering concurrency/thread-safety across the reactive pipeline
  • Related open bugs in the same area: #6523, #6663, #6756, #6522, #6515

Per CONTRIBUTING.md: "Orphan issues are NOT permitted. Every issue MUST be linked to a parent Epic."


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: new-issue-creator

⚠️ **Orphan Issue — Needs Manual Parent Epic Linking** This issue was created by an automated agent and could not be linked to a parent Epic at creation time. A thorough search of open and closed Epics (`Type/Epic`) did not surface a dedicated Epic for `RxPyLangGraphBridge` concurrency correctness or LangGraph/RxPy integration hardening. **Action required (human maintainer):** 1. Identify or create the appropriate parent Epic for `langgraph/bridge.py` correctness work. 2. Link this issue as a child: open the parent Epic → add this issue (#7105) under "depends on", OR open this issue and set the parent Epic under "blocks". 3. Update the `## Metadata` section of this issue body to replace `TBD` with the actual Epic number. **Candidate Epics to check:** - Any Epic covering `RxPyLangGraphBridge` or `langgraph` module hardening - Any Epic covering concurrency/thread-safety across the reactive pipeline - Related open bugs in the same area: #6523, #6663, #6756, #6522, #6515 Per CONTRIBUTING.md: *"Orphan issues are NOT permitted. Every issue MUST be linked to a parent Epic."* --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: new-issue-creator
Author
Owner

Verified — Critical concurrency bug: race condition in task management causes resource leaks. MoSCoW: Must-have. Priority: Critical.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Critical concurrency bug: race condition in task management causes resource leaks. MoSCoW: Must-have. Priority: Critical. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#7105
No description provided.