BUG-HUNT: [state-management] LangGraph.execute() bypasses StateManager.update_state() with direct .state = assignment — skips is_closed guard, history tracking, update_count increment, and checkpoint triggers #6666

Open
opened 2026-04-09 23:03:01 +00:00 by HAL9000 · 1 comment
Owner

Bug Report: [state-management] — execute() writes directly to state_manager.state, bypassing all StateManager invariants

Severity Assessment

  • Impact: When time_travel is enabled, the initial execution state is never recorded in history — time_travel() cannot go back to the pre-execution state. The update_count never increments for the initial set, causing the checkpoint trigger (update_count % checkpoint_interval == 0) to fire one cycle too early on subsequent updates. After StateManager.close() is called, execute() silently corrupts state_manager.state without raising any error, because the is_closed guard in update_state() is bypassed.
  • Likelihood: Medium — time-travel and checkpointing are opt-in, but close() can be called by any lifecycle manager; the post-close corruption is always silent.
  • Priority: Medium

Location

  • File: src/cleveragents/langgraph/graph.py
  • Function: LangGraph.execute
  • Lines: 79–91

Description

LangGraph.execute() sets the initial state by writing directly to the StateManager's internal attribute:

# graph.py lines 86-87
self.state_manager.state = state           # ← direct attribute write
self.state_manager.state_stream.on_next(state)

StateManager.update_state() — the proper API — performs several critical operations that are entirely skipped:

# state.py lines 109-132
def update_state(self, updates, mode=..., node_id=None) -> GraphState:
    if self.is_closed:                         # ← GUARD — bypassed
        raise RuntimeError("StateManager is closed")
    if self.enable_time_travel:                # ← HISTORY SNAPSHOT — bypassed
        snapshot = StateSnapshot(state=self.state.to_dict(), ...)
        self.history.append(snapshot)
    self.state.update(updates, mode)
    self.state.execution_count += 1            # ← COUNT INCREMENT — bypassed
    self.state_stream.on_next(self.state)
    self.update_count += 1                     # ← UPDATE COUNT — bypassed
    if self.checkpoint_dir and self.update_count % self.checkpoint_interval == 0:
        self._save_checkpoint()                # ← CHECKPOINT TRIGGER — bypassed
    return self.state

Evidence

graph.py lines 79–91 (the complete execute() method):

async def execute(self, input_data: GraphState | dict[str, Any]) -> GraphState:
    state = (
        input_data
        if isinstance(input_data, GraphState)
        else GraphState.from_dict(cast(dict[str, Any], input_data))
    )
    # Replace state manager state for a fresh execution context
    self.state_manager.state = state          # ← bypasses ALL StateManager invariants
    self.state_manager.state_stream.on_next(state)
    start_stream = f"__{self.name}_node_start__"
    if start_stream in self.stream_router.streams:
        self.stream_router.send_message(start_stream, state)
    return self.state_manager.get_state()

StateManager.close() is defined at state.py lines 192–202 and sets self.is_closed = True and calls state_stream.on_completed(). After close():

  • state_stream.on_next() emissions are silently dropped by the completed BehaviorSubject
  • self.state_manager.state = state (line 86) still writes to the attribute — the state attribute is updated but the stream is dead; any subscriber watching state changes will never be notified.

Impact Details

  1. is_closed not checked: If close() was called (e.g., by a lifecycle manager between executions), execute() silently replaces state_manager.state and calls on_next() on a completed stream (silently dropped). No error is raised; callers have no indication that the state update was partially applied.

  2. History not recorded: With enable_time_travel=True, the initial state set by execute() is never snapshotted. time_travel(steps_back=1) after a single-node execution will return to the state BEFORE the node ran — not to the state BEFORE execute() was called.

  3. update_count not incremented: update_count starts at 0. The checkpoint trigger fires when update_count % checkpoint_interval == 0. Since update_count was not incremented for the initial set, the first update_state() call brings update_count to 1 — correct. But if execute() is called multiple times without node execution, each call resets the state without incrementing update_count, so the checkpoint interval effectively runs longer than configured.

  4. execution_count on state not incremented: GraphState.execution_count is meant to track total state updates. The direct assignment bypasses the execution_count += 1 that update_state() performs.

Expected Behavior

execute() should call self.state_manager.reset(state) (which handles is_closed check, clears history, resets update_count) to properly initialize state for a fresh execution:

async def execute(self, input_data):
    state = ...  # convert input
    self.state_manager.reset(state)   # ← use proper API
    start_stream = ...
    if start_stream in self.stream_router.streams:
        self.stream_router.send_message(start_stream, state)
    return self.state_manager.get_state()

Category

state-management

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [state-management] — `execute()` writes directly to `state_manager.state`, bypassing all `StateManager` invariants ### Severity Assessment - **Impact**: When `time_travel` is enabled, the initial execution state is never recorded in history — `time_travel()` cannot go back to the pre-execution state. The `update_count` never increments for the initial set, causing the checkpoint trigger (`update_count % checkpoint_interval == 0`) to fire one cycle too early on subsequent updates. After `StateManager.close()` is called, `execute()` silently corrupts `state_manager.state` without raising any error, because the `is_closed` guard in `update_state()` is bypassed. - **Likelihood**: Medium — time-travel and checkpointing are opt-in, but `close()` can be called by any lifecycle manager; the post-close corruption is always silent. - **Priority**: Medium ### Location - **File**: `src/cleveragents/langgraph/graph.py` - **Function**: `LangGraph.execute` - **Lines**: 79–91 ### Description `LangGraph.execute()` sets the initial state by writing directly to the `StateManager`'s internal attribute: ```python # graph.py lines 86-87 self.state_manager.state = state # ← direct attribute write self.state_manager.state_stream.on_next(state) ``` `StateManager.update_state()` — the proper API — performs several critical operations that are entirely skipped: ```python # state.py lines 109-132 def update_state(self, updates, mode=..., node_id=None) -> GraphState: if self.is_closed: # ← GUARD — bypassed raise RuntimeError("StateManager is closed") if self.enable_time_travel: # ← HISTORY SNAPSHOT — bypassed snapshot = StateSnapshot(state=self.state.to_dict(), ...) self.history.append(snapshot) self.state.update(updates, mode) self.state.execution_count += 1 # ← COUNT INCREMENT — bypassed self.state_stream.on_next(self.state) self.update_count += 1 # ← UPDATE COUNT — bypassed if self.checkpoint_dir and self.update_count % self.checkpoint_interval == 0: self._save_checkpoint() # ← CHECKPOINT TRIGGER — bypassed return self.state ``` ### Evidence `graph.py` lines 79–91 (the complete `execute()` method): ```python async def execute(self, input_data: GraphState | dict[str, Any]) -> GraphState: state = ( input_data if isinstance(input_data, GraphState) else GraphState.from_dict(cast(dict[str, Any], input_data)) ) # Replace state manager state for a fresh execution context self.state_manager.state = state # ← bypasses ALL StateManager invariants self.state_manager.state_stream.on_next(state) start_stream = f"__{self.name}_node_start__" if start_stream in self.stream_router.streams: self.stream_router.send_message(start_stream, state) return self.state_manager.get_state() ``` `StateManager.close()` is defined at `state.py` lines 192–202 and sets `self.is_closed = True` and calls `state_stream.on_completed()`. After `close()`: - `state_stream.on_next()` emissions are silently dropped by the completed `BehaviorSubject` - `self.state_manager.state = state` (line 86) still writes to the attribute — the state attribute is updated but the stream is dead; any subscriber watching state changes will never be notified. ### Impact Details 1. **`is_closed` not checked**: If `close()` was called (e.g., by a lifecycle manager between executions), `execute()` silently replaces `state_manager.state` and calls `on_next()` on a completed stream (silently dropped). No error is raised; callers have no indication that the state update was partially applied. 2. **History not recorded**: With `enable_time_travel=True`, the initial state set by `execute()` is never snapshotted. `time_travel(steps_back=1)` after a single-node execution will return to the state BEFORE the node ran — not to the state BEFORE `execute()` was called. 3. **`update_count` not incremented**: `update_count` starts at 0. The checkpoint trigger fires when `update_count % checkpoint_interval == 0`. Since `update_count` was not incremented for the initial set, the first `update_state()` call brings `update_count` to 1 — correct. But if `execute()` is called multiple times without node execution, each call resets the state without incrementing `update_count`, so the checkpoint interval effectively runs longer than configured. 4. **`execution_count` on state not incremented**: `GraphState.execution_count` is meant to track total state updates. The direct assignment bypasses the `execution_count += 1` that `update_state()` performs. ### Expected Behavior `execute()` should call `self.state_manager.reset(state)` (which handles `is_closed` check, clears history, resets `update_count`) to properly initialize state for a fresh execution: ```python async def execute(self, input_data): state = ... # convert input self.state_manager.reset(state) # ← use proper API start_stream = ... if start_stream in self.stream_router.streams: self.stream_router.send_message(start_stream, state) return self.state_manager.get_state() ``` ### Category state-management ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 23:14:30 +00:00
Author
Owner

Verified — State management bug: LangGraph.execute() bypasses StateManager — skips guards, history, checkpoints. MoSCoW: Should-have. Priority: High.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — State management bug: LangGraph.execute() bypasses StateManager — skips guards, history, checkpoints. MoSCoW: Should-have. Priority: High. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6666
No description provided.