BUG-HUNT: [DATA-INTEGRITY] State corruption in concurrent graph execution due to unsafe BehaviorSubject access #7122

Open
opened 2026-04-10 07:57:06 +00:00 by HAL9000 · 1 comment
Owner

Bug Report: [DATA-INTEGRITY] — StateManager.update_state() has no synchronization, enabling concurrent state corruption

Severity Assessment

  • Impact: When multiple nodes execute in parallel (enabled by the parallel_execution flag), they can simultaneously call StateManager.update_state(). Because there is no locking around the state mutation, execution_count increments race, intermediate GraphState objects are overwritten, and the corrupted state is immediately broadcast to all subscribers via state_stream.on_next(). Every downstream observer — including checkpoint triggers, history tracking, and any reactive pipeline — receives a corrupted snapshot.
  • Likelihood: High — parallel_execution is a first-class supported flag; any graph with two or more parallel nodes will trigger this path.
  • Priority: High — data corruption in core LangGraph state management

Location

  • File: src/cleveragents/langgraph/state.py
  • Function: StateManager.update_state
  • Lines: ~85–110

Description

The BehaviorSubject state stream can be accessed concurrently from multiple nodes, but there is no locking around state updates. Multiple nodes executing in parallel can corrupt the graph state.

def update_state(
    self,
    updates: dict[str, Any],
    mode: StateUpdateMode = StateUpdateMode.MERGE,
    node_id: str | None = None,
) -> GraphState:
    # No synchronization here!
    self.state.update(updates, mode)  # Could be called concurrently
    self.state.execution_count += 1  # Race condition
    self.state_stream.on_next(self.state)  # Broadcasting corrupted state

Current Behavior

When two or more nodes execute concurrently and both call update_state():

  1. Lost updates — the second writer's state.update() call may overwrite the first writer's changes before on_next() fires.
  2. Inconsistent execution countsexecution_count += 1 is a non-atomic read-modify-write; concurrent increments produce a final count lower than the true number of updates.
  3. Corrupted state broadcaststate_stream.on_next(self.state) emits a partially-applied or overwritten state to all subscribers, propagating corruption to checkpoint triggers, history tracking, and any reactive pipeline.

Expected Behavior

update_state() must be atomic. Concurrent callers must serialize their updates so that each GraphState snapshot emitted on state_stream reflects exactly one complete, consistent update.

Acceptance Criteria

  • StateManager.update_state() acquires an asyncio.Lock (or equivalent) before mutating self.state or self.state.execution_count.
  • The lock is held for the entire critical section: state.update()execution_count += 1state_stream.on_next().
  • Parallel node execution produces a monotonically increasing execution_count with no gaps or duplicates.
  • No state update is silently lost when two nodes update simultaneously.
  • All existing BDD scenarios for StateManager continue to pass.

Supporting Information

Related existing issues in the same module:

  • #6666LangGraph.execute() bypasses StateManager.update_state() with direct .state = assignment
  • #6531StateManager._save_checkpoint non-atomic write
  • #6556PlanGenerationGraph._should_retry mutates state inside a LangGraph conditional edge

Suggested Fix

import asyncio

class StateManager:
    def __init__(self, ...):
        ...
        self._lock = asyncio.Lock()

    async def update_state(
        self,
        updates: dict[str, Any],
        mode: StateUpdateMode = StateUpdateMode.MERGE,
        node_id: str | None = None,
    ) -> GraphState:
        async with self._lock:
            self.state.update(updates, mode)
            self.state.execution_count += 1
            self.state_stream.on_next(self.state)
            return self.state

Backlog note: This issue was discovered during autonomous operation
on milestone v3.2.0. It does not block milestone completion and has been
placed in the backlog for human review and future milestone assignment.


Metadata

  • Branch: fix/langgraph-state-concurrent-update-lock
  • Commit Message: fix(langgraph): add asyncio.Lock to StateManager.update_state() to prevent concurrent state corruption
  • Milestone: (backlog — see note above)
  • Parent Epic: #7023

Subtasks

  • Reproduce the race condition with a BDD scenario using two concurrent nodes
  • Add asyncio.Lock to StateManager.__init__
  • Make update_state() async and acquire the lock for the full critical section
  • Update all callers of update_state() to await the call
  • Verify execution_count is monotonically increasing under parallel load
  • Confirm no state update is lost when two nodes update simultaneously
  • Update integration tests to exercise parallel node execution paths
  • Run full nox suite and confirm coverage ≥ 97%

Definition of Done

  • BDD scenario reproducing the race condition is tagged @tdd_expected_fail before the fix and passes after
  • StateManager.update_state() holds a lock for the entire critical section
  • All callers updated to await update_state()
  • execution_count is correct under concurrent load (verified by test)
  • No silent state loss under parallel node execution (verified by test)
  • All nox stages pass
  • Coverage >= 97%

Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: new-issue-creator

## Bug Report: [DATA-INTEGRITY] — `StateManager.update_state()` has no synchronization, enabling concurrent state corruption ### Severity Assessment - **Impact**: When multiple nodes execute in parallel (enabled by the `parallel_execution` flag), they can simultaneously call `StateManager.update_state()`. Because there is no locking around the state mutation, `execution_count` increments race, intermediate `GraphState` objects are overwritten, and the corrupted state is immediately broadcast to all subscribers via `state_stream.on_next()`. Every downstream observer — including checkpoint triggers, history tracking, and any reactive pipeline — receives a corrupted snapshot. - **Likelihood**: High — `parallel_execution` is a first-class supported flag; any graph with two or more parallel nodes will trigger this path. - **Priority**: High — data corruption in core LangGraph state management ### Location - **File**: `src/cleveragents/langgraph/state.py` - **Function**: `StateManager.update_state` - **Lines**: ~85–110 ### Description The `BehaviorSubject` state stream can be accessed concurrently from multiple nodes, but there is no locking around state updates. Multiple nodes executing in parallel can corrupt the graph state. ```python def update_state( self, updates: dict[str, Any], mode: StateUpdateMode = StateUpdateMode.MERGE, node_id: str | None = None, ) -> GraphState: # No synchronization here! self.state.update(updates, mode) # Could be called concurrently self.state.execution_count += 1 # Race condition self.state_stream.on_next(self.state) # Broadcasting corrupted state ``` ### Current Behavior When two or more nodes execute concurrently and both call `update_state()`: 1. **Lost updates** — the second writer's `state.update()` call may overwrite the first writer's changes before `on_next()` fires. 2. **Inconsistent execution counts** — `execution_count += 1` is a non-atomic read-modify-write; concurrent increments produce a final count lower than the true number of updates. 3. **Corrupted state broadcast** — `state_stream.on_next(self.state)` emits a partially-applied or overwritten state to all subscribers, propagating corruption to checkpoint triggers, history tracking, and any reactive pipeline. ### Expected Behavior `update_state()` must be atomic. Concurrent callers must serialize their updates so that each `GraphState` snapshot emitted on `state_stream` reflects exactly one complete, consistent update. ### Acceptance Criteria - `StateManager.update_state()` acquires an `asyncio.Lock` (or equivalent) before mutating `self.state` or `self.state.execution_count`. - The lock is held for the entire critical section: `state.update()` → `execution_count += 1` → `state_stream.on_next()`. - Parallel node execution produces a monotonically increasing `execution_count` with no gaps or duplicates. - No state update is silently lost when two nodes update simultaneously. - All existing BDD scenarios for `StateManager` continue to pass. ### Supporting Information Related existing issues in the same module: - #6666 — `LangGraph.execute()` bypasses `StateManager.update_state()` with direct `.state =` assignment - #6531 — `StateManager._save_checkpoint` non-atomic write - #6556 — `PlanGenerationGraph._should_retry` mutates state inside a LangGraph conditional edge ### Suggested Fix ```python import asyncio class StateManager: def __init__(self, ...): ... self._lock = asyncio.Lock() async def update_state( self, updates: dict[str, Any], mode: StateUpdateMode = StateUpdateMode.MERGE, node_id: str | None = None, ) -> GraphState: async with self._lock: self.state.update(updates, mode) self.state.execution_count += 1 self.state_stream.on_next(self.state) return self.state ``` > **Backlog note:** This issue was discovered during autonomous operation > on milestone v3.2.0. It does not block milestone completion and has been > placed in the backlog for human review and future milestone assignment. --- ## Metadata - **Branch**: `fix/langgraph-state-concurrent-update-lock` - **Commit Message**: `fix(langgraph): add asyncio.Lock to StateManager.update_state() to prevent concurrent state corruption` - **Milestone**: *(backlog — see note above)* - **Parent Epic**: #7023 ## Subtasks - [ ] Reproduce the race condition with a BDD scenario using two concurrent nodes - [ ] Add `asyncio.Lock` to `StateManager.__init__` - [ ] Make `update_state()` async and acquire the lock for the full critical section - [ ] Update all callers of `update_state()` to `await` the call - [ ] Verify `execution_count` is monotonically increasing under parallel load - [ ] Confirm no state update is lost when two nodes update simultaneously - [ ] Update integration tests to exercise parallel node execution paths - [ ] Run full nox suite and confirm coverage ≥ 97% ## Definition of Done - [ ] BDD scenario reproducing the race condition is tagged `@tdd_expected_fail` before the fix and passes after - [ ] `StateManager.update_state()` holds a lock for the entire critical section - [ ] All callers updated to `await update_state()` - [ ] `execution_count` is correct under concurrent load (verified by test) - [ ] No silent state loss under parallel node execution (verified by test) - [ ] All nox stages pass - [ ] Coverage >= 97% --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: new-issue-creator
Author
Owner

Verified — Data integrity bug: state corruption in concurrent graph execution. MoSCoW: Should-have. Priority: Medium.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Data integrity bug: state corruption in concurrent graph execution. MoSCoW: Should-have. Priority: Medium. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#7122
No description provided.