fix(langgraph): wire node stream on_next handlers to registered executors #10795

Merged
hurui200320 merged 2 commits from bugfix/m3-node-stream-on-next-noop into master 2026-04-22 07:53:54 +00:00
Member

Summary

Fixes the disconnected node execution pipeline in LangGraph._setup_node_stream_subscriptions where all on_next handlers were no-op (pass), silently dropping every message delivered to node streams. Additionally addresses critical deadlock, thread-safety, resource management, and error-handling issues in the now-activated executor path.

Closes #6511

Changes

Problem

  • _setup_node_stream_subscriptions subscribed observers with empty on_next handlers
  • Messages arriving on node streams were silently discarded
  • The registered sync_executor per node (set via _register_node_executor) was only reachable through an implicit map operator in the observable pipeline — not the subscription handler
  • The closure-in-loop pattern risked all handlers sharing the same node_name variable
  • Pre-existing issues in _register_node_executor (per-message ThreadPoolExecutor, per-message asyncio.run(), non-thread-safe StateManager.update_state) were dormant but became reachable once on_next was wired

Fix

  1. Wired on_next to the executor via _make_on_next_handler(name) — a proper method on LangGraph that creates a closure looking up the executor from self._node_executors at invocation time
  2. Added exception handlingexecutor(msg) is wrapped in try/except so failures are logged via logger.exception rather than propagating unhandled through the RxPy Subject call chain. TimeoutError is distinguished from other exceptions with a warning-level log for clearer diagnostics. Shutdown-related RuntimeError (containing "stopping" or "not running") is also logged at warning level to reduce noise during normal graph shutdown
  3. Eliminated the closure-in-loop bug — each on_next closure captures its own name parameter via the factory method
  4. Moved executors to a dedicated dictself._node_executors: dict[str, Callable] replaces setattr on the stream router, avoiding namespace pollution
  5. Shared ThreadPoolExecutorself._executor_pool is created once in __init__ and shut down in stop(), replacing per-message pool creation
  6. Prefer existing event loop with deadlock preventionsync_executor uses asyncio.run_coroutine_threadsafe when the scheduler's loop is running and the caller is on a different thread; falls back to thread pool when on the same thread to avoid deadlock
  7. Thread-safe state updates with immutable snapshotsStateManager.update_state acquires a threading.RLock, captures a model_copy(deep=True) under the lock, then emits and returns the copy outside the lock. This prevents re-entrant deadlock from subscriber callbacks while ensuring subscribers receive immutable point-in-time snapshots. The same deep-copy-under-lock pattern is applied to all five mutation methods: update_state, load_checkpoint, time_travel, reset, and replace_state
  8. Lock-safe state replacementStateManager.replace_state() deep-copies the input inside the lock before storing it, preventing external holders of the original reference from mutating internal state without acquiring the lock. Both the stored copy and the emission copy are created independently from the original input to avoid redundant double-copying. LangGraph.execute() uses this instead of directly assigning to state_manager.state, closing a race condition where execute() bypassed the locking discipline
  9. Lock-safe resetStateManager.reset() deep-copies the provided initial_state before storing it, applying the same defensive copy pattern as replace_state()
  10. Defensive copy in execute()execute() passes a deep copy of state to send_message() so stream subscribers cannot hold a reference to the internal state object
  11. Functional per-node timeoutsync_executor reads timeout from node.config.timeout (where NodeConfig defines it) instead of getattr(node, "timeout", None) which always returned None
  12. Bounded execution historyexecution_history uses collections.deque(maxlen=1000) for atomic bounded growth, eliminating TOCTOU race conditions
  13. Consistent error on missing start streamexecute() now raises ValueError (matching start()) instead of silently returning unmodified state
  14. Configurable executor timeout — All blocking future.result() calls use a configurable timeout (default 300s) with descriptive TimeoutError on expiry
  15. Timed-out futures are cancelled — After catching TimeoutError, future.cancel() is called before re-raising. A warning is logged when cancel() returns False (task already running), indicating thread pool pressure. Inline comments document that cancellation is best-effort
  16. CancelledError handling in both pathssync_executor catches CancelledError explicitly in both the run_coroutine_threadsafe path and the thread pool path to produce a clear "graph stopping" message instead of a misleading generic error
  17. Clean shutdown — Both stop() and start() use shutdown(wait=True, cancel_futures=True) to properly drain in-flight tasks and prevent old-cycle tasks from corrupting new-cycle state
  18. Resource leak safety netLangGraph.__del__ shuts down _executor_pool if stop() was never called, with broad Exception suppression for interpreter shutdown resilience; start() shuts down the previous pool before creating a new one to prevent thread leaks on repeated calls
  19. Restartable after stopstart() re-creates _executor_pool so the graph can be restarted after stop()
  20. is_running guard — Both sync_executor and execute() raise a descriptive RuntimeError when the graph is not running, preventing misleading errors from submitting to a shut-down pool
  21. TOCTOU-safe start()start() sets is_running = False before shutting down the old pool and restores it after creating the new pool, preventing new submissions during the pool swap window
  22. TOCTOU-safe submit()sync_executor wraps _executor_pool.submit() in try/except RuntimeError to detect the shutdown condition when stop() is called between the is_running guard and the submit() call, re-raising with the same descriptive message used by the is_running guard
  23. Consistent lockingclear_history() now acquires _lock for consistency with other mutation methods
  24. File I/O outside lock — Checkpoint save/load operations perform file I/O outside the lock to reduce contention under high-throughput checkpointing. The _save_checkpoint fallback path acquires the lock when called without pre-built data to ensure consistent snapshots
  25. Reentrant lockStateManager._lock upgraded from threading.Lock to threading.RLock, eliminating the deadlock risk when _save_checkpoint(None) is called while the lock is already held by the same thread
  26. Process-safe checkpoint filenames — Checkpoint filenames now include os.getpid() in addition to update_count and timestamp, preventing silent data loss when multiple processes share the same checkpoint_dir
  27. Removed dead code_history_lock (declared but never used) removed
  28. Extracted handler factories_make_on_next_handler and _make_on_error_handler are proper methods with consistent type aliases (_OnNextCallable, _OnErrorCallable)
  29. Thread-safe get_state() — Returns model_copy(deep=True) under the lock to prevent observing partially-mutated state
  30. Public constantMAX_EXECUTION_HISTORY renamed from _MAX_EXECUTION_HISTORY since it is part of the testable contract
  31. Precise type annotationsrun_async annotated as -> StreamMessage instead of -> Any; replace_state() docstring reworded to describe general purpose
  32. Deferred downstream propagation — TODO comment in _make_on_next_handler references follow-up ticket #10799 for routing executor return values to successor node streams
  33. Emission ordering documentedupdate_state() and replace_state() docstrings now explicitly warn that emission order is not guaranteed to match mutation order under concurrent access
  34. Default pool sizing documented — Comment on ThreadPoolExecutor() documents the default min(32, cpu_count+4) sizing
  35. Redundant cast removedcast(dict[str, Any], input_data) removed from execute() since Pyright already narrows the type after the isinstance check
  36. Exception-safe event loop cleanupstep_execute_graph now uses try/finally matching the pattern in step_execute_expecting_error
  37. Restart scenario cleanup safetycontext.add_cleanup(context.graph.stop) registered in the @when step so the thread pool is cleaned up even if assertions fail
  38. Background loop cleanup guarded_cleanup_bg_loop wraps call_soon_threadsafe in try/except RuntimeError to ensure thread join and loop close always execute even if the loop is already closed
  39. start() hang limitation documentedstart() docstring warns that shutdown(wait=True) blocks indefinitely if a running task is stuck in blocking I/O
  40. Thread-safe deque.append documented — Inline comment documents that deque.append() is thread-safe under CPython GIL
  41. Pinned a2a-sdk<1.0.0a2a-sdk 1.0.0 removed the legacy A2AClient class from a2a.client, breaking the TDD test at features/tdd_a2a_sdk_dependency.feature:21. Pinned to >=0.3.0,<1.0.0 in pyproject.toml to prevent the breaking version from being installed in CI. Migration to 1.0.0 is separate work

Files Changed

  • src/cleveragents/langgraph/graph.py — Core fix + deadlock prevention, timeout, future cancellation, CancelledError handling in both paths, resource management, deque, type annotations, is_running guard on execute(), dead code removal, public constant rename, best-effort cancellation documentation, TimeoutError vs Exception log level distinction, RuntimeError shutdown detection in on_next handler, TOCTOU-safe submit() with try/except, cancel() failure warning log, TODO(#10799) for downstream propagation, version-pinned comment for scheduler._loop, TOCTOU-safe start(), start() hang documentation, redundant cast removed, default pool sizing documented, deque thread-safety comment
  • src/cleveragents/langgraph/state.py — Thread-safety RLock on all mutation methods, deep-copy emission inside lock for all five mutation methods, replace_state() creates both copies from original input (no redundant double-copy), reset() deep-copy input, file I/O outside lock with lock-safe fallback, clear_history locking, process-safe checkpoint filenames with PID, emission ordering documentation
  • features/consolidated_langgraph.feature — 20 new BDD scenarios (21 total including pre-existing)
  • features/steps/langgraph_graph_coverage_steps.py — Step definitions with strengthened assertions, cleanup safety, timeout tests, is_running guard test, replace_state direct test with internal state identity assertion, CancelledError test for both paths, del test, execute() not-running guard test, TimeoutError-through-stream test, exception-safe event loop cleanup, restart cleanup registration, background loop cleanup guard, barrier timeout and thread completion assertion, correct parameter names in slow_execute functions, plain assignment for Behave context attributes
  • pyproject.toml — Pinned a2a-sdk>=0.3.0,<1.0.0 to prevent breaking A2AClient removal in 1.0.0

Testing

  • 21 BDD scenarios covering the on_next wiring and review fixes:
    • Node stream messages are processed by the node executor — verifies wiring
    • Raw stream values are wrapped before node processing — verifies StreamMessage wrapping + metadata
    • Missing executor triggers a diagnostic warning — verifies warning path with node name
    • Each node stream dispatches to its own executor — verifies closure isolation
    • Node stream on_next handles executor exceptions gracefully — verifies exception catch + logging + non-propagation
    • Subscription is skipped when observable is missing — verifies missing observable branch with strengthened assertions (debug message, no error/warning)
    • Execute raises when start stream is missing — verifies consistent error behavior
    • Executor completes via run_coroutine_threadsafe on running loop — verifies the run_coroutine_threadsafe path with mock verification that the correct code path was taken
    • Execution history is capped at maximum size — verifies deque maxlen enforcement
    • Concurrent state updates are serialized by the lock — verifies thread safety with 20 threads × 10 updates
    • Executor times out via run_coroutine_threadsafe path — verifies TimeoutError with node name and duration in message
    • Executor times out via thread pool path — verifies TimeoutError from thread pool fallback path
    • Graph can be restarted after stop — verifies stop() → start() → execute cycle works
    • Executor raises when graph is not running — verifies is_running guard produces RuntimeError with node name
    • StateManager replace_state atomically replaces and emits deep copy — verifies replace_state acquires lock, replaces state, emits deep copy on stream, and emitted copy is distinct from both the input and the internal state
    • CancelledError in thread pool path raises descriptive RuntimeError — verifies CancelledError catch produces RuntimeError with "stopping" message
    • LangGraph del shuts down executor pool without error — verifies del safety net
    • Execute raises when graph is not running via execute method — verifies is_running guard on execute() entry point
    • TimeoutError through stream path logs warning not exception — verifies TimeoutError in on_next handler logs at warning level, not exception level (cycle 6 M2 fix)
    • CancelledError in run_coroutine_threadsafe path raises descriptive RuntimeError — verifies CancelledError catch in the coroutine path (cycle 6 M1 fix)
  • All 15,286 unit test scenarios pass (0 failures)
  • All 1,986 integration tests pass
  • E2e test failures are pre-existing (M6 infrastructure timeouts, unrelated to this change)
  • Coverage: 97.2% (threshold: 97%)

Quality Gates

Gate Status
nox -e lint Pass
nox -e typecheck Pass (0 errors)
nox -e unit_tests Pass (15,286 scenarios)
nox -e integration_tests Pass (1,986 tests)
nox -e e2e_tests ⚠️ Pre-existing M6 failures (unrelated)
nox -e coverage_report 97.2%

Review Issues Addressed (Cycle 8)

Major (M1 — CI Blocker, Fixed)

  • M1a2a-sdk 1.0.0 released upstream, removing the legacy A2AClient class from a2a.client. The existing dependency constraint >=0.3.0 allowed CI to install 1.0.0, breaking the TDD test features/tdd_a2a_sdk_dependency.feature:21 ("a2a SDK provides the A2AClient class"). Pinned to a2a-sdk>=0.3.0,<1.0.0 in pyproject.toml. Migration to 1.0.0 is out of scope for this ticket

Review Issues Addressed (Cycle 7)

Major (M1 — CI Blocker, Fixed)

  • M1 — Ruff format violation in features/steps/langgraph_graph_coverage_steps.py: Two @given/@then decorator strings were wrapped across multiple lines, causing ruff format --check to reject them. Collapsed both to single lines (E501 is suppressed for features/steps/*.py)

Minor (All Fixed)

  • m1 — TOCTOU race in sync_executor between is_running guard and submit(): Wrapped _executor_pool.submit() in try/except RuntimeError that detects the shutdown condition and re-raises with the same descriptive message used by the is_running guard
  • m2CancelledError during shutdown logged at exception level: Added dedicated except RuntimeError clause in _make_on_next_handler that checks for "stopping"/"not running" patterns and logs at warning level instead of exception level
  • m3 — Concurrent test barrier has no timeout: Added timeout=10.0 to barrier.wait() and added assert all(not t.is_alive() for t in threads) after the join loop
  • m4 — Double deep copy in replace_state(): Changed emission copy to be created from the original new_state input rather than from the already-copied stored state, eliminating redundant serialization
  • m5 — Checkpoint filename collision across processes: Added os.getpid() to checkpoint filename format: checkpoint_{timestamp}_{update_count}_{pid}.json
  • m6 — Pyright parameter name mismatch in slow_execute test functions: Renamed _state to state in both functions
  • m7 — Type annotation on dynamic Behave context object: Changed context.stream_emissions: list[GraphState] = [] to plain assignment with comment: context.stream_emissions = [] # list[GraphState]

Nits (All Fixed)

  • n1_save_checkpoint(None) deadlock risk: Upgraded StateManager._lock from threading.Lock to threading.RLock (reentrant lock), making the deadlock impossible. Updated docstring to reflect the change
  • n2deque.append() thread safety undocumented: Added inline comment: # Thread-safe under CPython GIL; deque.append is atomic.
  • n3 — Thread pool exhaustion under sustained timeouts: Added warning log when future_tp.cancel() returns False, indicating the timed-out task could not be cancelled and the thread slot remains occupied
  • n4 — PR description scenario count off by one: Corrected to "20 new BDD scenarios (21 total including pre-existing)"
  • n5replace_state test accesses internal state attribute without comment: Added explanatory comment: # Direct access intentional: identity check requires the actual internal object, not a get_state() copy.

Previous Review Cycles

Cycle 6 (All Fixed)

  • M1CancelledError not caught in run_coroutine_threadsafe path: Added except concurrent.futures.CancelledError catch mirroring the thread pool path
  • M2 — Missing test for TimeoutError warning-level log through stream path: Added BDD scenario
  • M3 — TOCTOU race in start(): start() now sets is_running = False before shutting down the old pool
  • M4 — Event loop cleanup not exception-safe: Wrapped in try/finally
  • M5 — Restart scenario thread pool not cleaned up on assertion failure: Registered cleanup via context.add_cleanup()
  • M6start() can hang indefinitely: Added docstring warning
  • n1–n8 — All addressed

Cycle 5 (All Fixed)

  • M1replace_state() stores caller's mutable reference directly: Fixed by deep-copying input
  • M2 — Timed-out executor futures can still complete: Documented best-effort cancellation
  • M3 — Missing test for CancelledError path: Added BDD scenario
  • m1–m6, n1–n8 — All addressed

Cycles 1–4

See git history for earlier review cycle fixes (all addressed in prior amendments).

Deferred (Pre-existing, Out of Scope)

  • close() does not acquire _lock before setting is_closed = True (pre-existing)
  • Node.execution_count increment is not thread-safe (pre-existing, newly reachable)
  • asyncio.run() creates a new event loop per thread-pool invocation (acceptable for current use case)
  • Migration from a2a-sdk 0.3.x to 1.0.0 (separate ticket scope — 1.0.0 removes the legacy A2AClient class and introduces Client as the replacement)
## Summary Fixes the disconnected node execution pipeline in `LangGraph._setup_node_stream_subscriptions` where all `on_next` handlers were no-op (`pass`), silently dropping every message delivered to node streams. Additionally addresses critical deadlock, thread-safety, resource management, and error-handling issues in the now-activated executor path. Closes #6511 ## Changes ### Problem - `_setup_node_stream_subscriptions` subscribed observers with empty `on_next` handlers - Messages arriving on node streams were silently discarded - The registered `sync_executor` per node (set via `_register_node_executor`) was only reachable through an implicit `map` operator in the observable pipeline — not the subscription handler - The closure-in-loop pattern risked all handlers sharing the same `node_name` variable - Pre-existing issues in `_register_node_executor` (per-message `ThreadPoolExecutor`, per-message `asyncio.run()`, non-thread-safe `StateManager.update_state`) were dormant but became reachable once `on_next` was wired ### Fix 1. **Wired `on_next` to the executor** via `_make_on_next_handler(name)` — a proper method on `LangGraph` that creates a closure looking up the executor from `self._node_executors` at invocation time 2. **Added exception handling** — `executor(msg)` is wrapped in `try/except` so failures are logged via `logger.exception` rather than propagating unhandled through the RxPy Subject call chain. `TimeoutError` is distinguished from other exceptions with a `warning`-level log for clearer diagnostics. Shutdown-related `RuntimeError` (containing "stopping" or "not running") is also logged at `warning` level to reduce noise during normal graph shutdown 3. **Eliminated the closure-in-loop bug** — each `on_next` closure captures its own `name` parameter via the factory method 4. **Moved executors to a dedicated dict** — `self._node_executors: dict[str, Callable]` replaces `setattr` on the stream router, avoiding namespace pollution 5. **Shared `ThreadPoolExecutor`** — `self._executor_pool` is created once in `__init__` and shut down in `stop()`, replacing per-message pool creation 6. **Prefer existing event loop with deadlock prevention** — `sync_executor` uses `asyncio.run_coroutine_threadsafe` when the scheduler's loop is running *and* the caller is on a different thread; falls back to thread pool when on the same thread to avoid deadlock 7. **Thread-safe state updates with immutable snapshots** — `StateManager.update_state` acquires a `threading.RLock`, captures a `model_copy(deep=True)` under the lock, then emits and returns the copy *outside* the lock. This prevents re-entrant deadlock from subscriber callbacks while ensuring subscribers receive immutable point-in-time snapshots. The same deep-copy-under-lock pattern is applied to all five mutation methods: `update_state`, `load_checkpoint`, `time_travel`, `reset`, and `replace_state` 8. **Lock-safe state replacement** — `StateManager.replace_state()` deep-copies the input inside the lock before storing it, preventing external holders of the original reference from mutating internal state without acquiring the lock. Both the stored copy and the emission copy are created independently from the original input to avoid redundant double-copying. `LangGraph.execute()` uses this instead of directly assigning to `state_manager.state`, closing a race condition where `execute()` bypassed the locking discipline 9. **Lock-safe reset** — `StateManager.reset()` deep-copies the provided `initial_state` before storing it, applying the same defensive copy pattern as `replace_state()` 10. **Defensive copy in execute()** — `execute()` passes a deep copy of state to `send_message()` so stream subscribers cannot hold a reference to the internal state object 11. **Functional per-node timeout** — `sync_executor` reads timeout from `node.config.timeout` (where `NodeConfig` defines it) instead of `getattr(node, "timeout", None)` which always returned `None` 12. **Bounded execution history** — `execution_history` uses `collections.deque(maxlen=1000)` for atomic bounded growth, eliminating TOCTOU race conditions 13. **Consistent error on missing start stream** — `execute()` now raises `ValueError` (matching `start()`) instead of silently returning unmodified state 14. **Configurable executor timeout** — All blocking `future.result()` calls use a configurable timeout (default 300s) with descriptive `TimeoutError` on expiry 15. **Timed-out futures are cancelled** — After catching `TimeoutError`, `future.cancel()` is called before re-raising. A warning is logged when `cancel()` returns `False` (task already running), indicating thread pool pressure. Inline comments document that cancellation is best-effort 16. **CancelledError handling in both paths** — `sync_executor` catches `CancelledError` explicitly in *both* the `run_coroutine_threadsafe` path and the thread pool path to produce a clear "graph stopping" message instead of a misleading generic error 17. **Clean shutdown** — Both `stop()` and `start()` use `shutdown(wait=True, cancel_futures=True)` to properly drain in-flight tasks and prevent old-cycle tasks from corrupting new-cycle state 18. **Resource leak safety net** — `LangGraph.__del__` shuts down `_executor_pool` if `stop()` was never called, with broad `Exception` suppression for interpreter shutdown resilience; `start()` shuts down the previous pool before creating a new one to prevent thread leaks on repeated calls 19. **Restartable after stop** — `start()` re-creates `_executor_pool` so the graph can be restarted after `stop()` 20. **is_running guard** — Both `sync_executor` and `execute()` raise a descriptive `RuntimeError` when the graph is not running, preventing misleading errors from submitting to a shut-down pool 21. **TOCTOU-safe start()** — `start()` sets `is_running = False` before shutting down the old pool and restores it after creating the new pool, preventing new submissions during the pool swap window 22. **TOCTOU-safe submit()** — `sync_executor` wraps `_executor_pool.submit()` in `try/except RuntimeError` to detect the shutdown condition when `stop()` is called between the `is_running` guard and the `submit()` call, re-raising with the same descriptive message used by the `is_running` guard 23. **Consistent locking** — `clear_history()` now acquires `_lock` for consistency with other mutation methods 24. **File I/O outside lock** — Checkpoint save/load operations perform file I/O outside the lock to reduce contention under high-throughput checkpointing. The `_save_checkpoint` fallback path acquires the lock when called without pre-built data to ensure consistent snapshots 25. **Reentrant lock** — `StateManager._lock` upgraded from `threading.Lock` to `threading.RLock`, eliminating the deadlock risk when `_save_checkpoint(None)` is called while the lock is already held by the same thread 26. **Process-safe checkpoint filenames** — Checkpoint filenames now include `os.getpid()` in addition to `update_count` and timestamp, preventing silent data loss when multiple processes share the same `checkpoint_dir` 27. **Removed dead code** — `_history_lock` (declared but never used) removed 28. **Extracted handler factories** — `_make_on_next_handler` and `_make_on_error_handler` are proper methods with consistent type aliases (`_OnNextCallable`, `_OnErrorCallable`) 29. **Thread-safe `get_state()`** — Returns `model_copy(deep=True)` under the lock to prevent observing partially-mutated state 30. **Public constant** — `MAX_EXECUTION_HISTORY` renamed from `_MAX_EXECUTION_HISTORY` since it is part of the testable contract 31. **Precise type annotations** — `run_async` annotated as `-> StreamMessage` instead of `-> Any`; `replace_state()` docstring reworded to describe general purpose 32. **Deferred downstream propagation** — TODO comment in `_make_on_next_handler` references follow-up ticket #10799 for routing executor return values to successor node streams 33. **Emission ordering documented** — `update_state()` and `replace_state()` docstrings now explicitly warn that emission order is not guaranteed to match mutation order under concurrent access 34. **Default pool sizing documented** — Comment on `ThreadPoolExecutor()` documents the default `min(32, cpu_count+4)` sizing 35. **Redundant cast removed** — `cast(dict[str, Any], input_data)` removed from `execute()` since Pyright already narrows the type after the `isinstance` check 36. **Exception-safe event loop cleanup** — `step_execute_graph` now uses `try/finally` matching the pattern in `step_execute_expecting_error` 37. **Restart scenario cleanup safety** — `context.add_cleanup(context.graph.stop)` registered in the `@when` step so the thread pool is cleaned up even if assertions fail 38. **Background loop cleanup guarded** — `_cleanup_bg_loop` wraps `call_soon_threadsafe` in `try/except RuntimeError` to ensure thread join and loop close always execute even if the loop is already closed 39. **start() hang limitation documented** — `start()` docstring warns that `shutdown(wait=True)` blocks indefinitely if a running task is stuck in blocking I/O 40. **Thread-safe deque.append documented** — Inline comment documents that `deque.append()` is thread-safe under CPython GIL 41. **Pinned a2a-sdk<1.0.0** — `a2a-sdk` 1.0.0 removed the legacy `A2AClient` class from `a2a.client`, breaking the TDD test at `features/tdd_a2a_sdk_dependency.feature:21`. Pinned to `>=0.3.0,<1.0.0` in `pyproject.toml` to prevent the breaking version from being installed in CI. Migration to 1.0.0 is separate work ### Files Changed - `src/cleveragents/langgraph/graph.py` — Core fix + deadlock prevention, timeout, future cancellation, CancelledError handling in both paths, resource management, deque, type annotations, is_running guard on execute(), dead code removal, public constant rename, best-effort cancellation documentation, TimeoutError vs Exception log level distinction, RuntimeError shutdown detection in on_next handler, TOCTOU-safe submit() with try/except, cancel() failure warning log, TODO(#10799) for downstream propagation, version-pinned comment for scheduler._loop, TOCTOU-safe start(), start() hang documentation, redundant cast removed, default pool sizing documented, deque thread-safety comment - `src/cleveragents/langgraph/state.py` — Thread-safety RLock on all mutation methods, deep-copy emission inside lock for all five mutation methods, replace_state() creates both copies from original input (no redundant double-copy), reset() deep-copy input, file I/O outside lock with lock-safe fallback, clear_history locking, process-safe checkpoint filenames with PID, emission ordering documentation - `features/consolidated_langgraph.feature` — 20 new BDD scenarios (21 total including pre-existing) - `features/steps/langgraph_graph_coverage_steps.py` — Step definitions with strengthened assertions, cleanup safety, timeout tests, is_running guard test, replace_state direct test with internal state identity assertion, CancelledError test for both paths, __del__ test, execute() not-running guard test, TimeoutError-through-stream test, exception-safe event loop cleanup, restart cleanup registration, background loop cleanup guard, barrier timeout and thread completion assertion, correct parameter names in slow_execute functions, plain assignment for Behave context attributes - `pyproject.toml` — Pinned `a2a-sdk>=0.3.0,<1.0.0` to prevent breaking `A2AClient` removal in 1.0.0 ## Testing - **21 BDD scenarios** covering the on_next wiring and review fixes: - *Node stream messages are processed by the node executor* — verifies wiring - *Raw stream values are wrapped before node processing* — verifies StreamMessage wrapping + metadata - *Missing executor triggers a diagnostic warning* — verifies warning path with node name - *Each node stream dispatches to its own executor* — verifies closure isolation - *Node stream on_next handles executor exceptions gracefully* — verifies exception catch + logging + non-propagation - *Subscription is skipped when observable is missing* — verifies missing observable branch with strengthened assertions (debug message, no error/warning) - *Execute raises when start stream is missing* — verifies consistent error behavior - *Executor completes via run_coroutine_threadsafe on running loop* — verifies the `run_coroutine_threadsafe` path with mock verification that the correct code path was taken - *Execution history is capped at maximum size* — verifies deque maxlen enforcement - *Concurrent state updates are serialized by the lock* — verifies thread safety with 20 threads × 10 updates - *Executor times out via run_coroutine_threadsafe path* — verifies TimeoutError with node name and duration in message - *Executor times out via thread pool path* — verifies TimeoutError from thread pool fallback path - *Graph can be restarted after stop* — verifies stop() → start() → execute cycle works - *Executor raises when graph is not running* — verifies is_running guard produces RuntimeError with node name - *StateManager replace_state atomically replaces and emits deep copy* — verifies replace_state acquires lock, replaces state, emits deep copy on stream, and emitted copy is distinct from both the input and the internal state - *CancelledError in thread pool path raises descriptive RuntimeError* — verifies CancelledError catch produces RuntimeError with "stopping" message - *LangGraph __del__ shuts down executor pool without error* — verifies __del__ safety net - *Execute raises when graph is not running via execute method* — verifies is_running guard on execute() entry point - *TimeoutError through stream path logs warning not exception* — verifies TimeoutError in on_next handler logs at warning level, not exception level (cycle 6 M2 fix) - *CancelledError in run_coroutine_threadsafe path raises descriptive RuntimeError* — verifies CancelledError catch in the coroutine path (cycle 6 M1 fix) - All 15,286 unit test scenarios pass (0 failures) - All 1,986 integration tests pass - E2e test failures are pre-existing (M6 infrastructure timeouts, unrelated to this change) - Coverage: **97.2%** (threshold: 97%) ## Quality Gates | Gate | Status | |------|--------| | `nox -e lint` | ✅ Pass | | `nox -e typecheck` | ✅ Pass (0 errors) | | `nox -e unit_tests` | ✅ Pass (15,286 scenarios) | | `nox -e integration_tests` | ✅ Pass (1,986 tests) | | `nox -e e2e_tests` | ⚠️ Pre-existing M6 failures (unrelated) | | `nox -e coverage_report` | ✅ 97.2% | ## Review Issues Addressed (Cycle 8) ### Major (M1 — CI Blocker, Fixed) - **M1** — `a2a-sdk` 1.0.0 released upstream, removing the legacy `A2AClient` class from `a2a.client`. The existing dependency constraint `>=0.3.0` allowed CI to install 1.0.0, breaking the TDD test `features/tdd_a2a_sdk_dependency.feature:21` ("a2a SDK provides the A2AClient class"). Pinned to `a2a-sdk>=0.3.0,<1.0.0` in `pyproject.toml`. Migration to 1.0.0 is out of scope for this ticket ## Review Issues Addressed (Cycle 7) ### Major (M1 — CI Blocker, Fixed) - **M1** — Ruff format violation in `features/steps/langgraph_graph_coverage_steps.py`: Two `@given`/`@then` decorator strings were wrapped across multiple lines, causing `ruff format --check` to reject them. Collapsed both to single lines (E501 is suppressed for `features/steps/*.py`) ### Minor (All Fixed) - **m1** — TOCTOU race in `sync_executor` between `is_running` guard and `submit()`: Wrapped `_executor_pool.submit()` in `try/except RuntimeError` that detects the shutdown condition and re-raises with the same descriptive message used by the `is_running` guard - **m2** — `CancelledError` during shutdown logged at `exception` level: Added dedicated `except RuntimeError` clause in `_make_on_next_handler` that checks for "stopping"/"not running" patterns and logs at `warning` level instead of `exception` level - **m3** — Concurrent test barrier has no timeout: Added `timeout=10.0` to `barrier.wait()` and added `assert all(not t.is_alive() for t in threads)` after the join loop - **m4** — Double deep copy in `replace_state()`: Changed emission copy to be created from the original `new_state` input rather than from the already-copied stored state, eliminating redundant serialization - **m5** — Checkpoint filename collision across processes: Added `os.getpid()` to checkpoint filename format: `checkpoint_{timestamp}_{update_count}_{pid}.json` - **m6** — Pyright parameter name mismatch in `slow_execute` test functions: Renamed `_state` to `state` in both functions - **m7** — Type annotation on dynamic Behave context object: Changed `context.stream_emissions: list[GraphState] = []` to plain assignment with comment: `context.stream_emissions = [] # list[GraphState]` ### Nits (All Fixed) - **n1** — `_save_checkpoint(None)` deadlock risk: Upgraded `StateManager._lock` from `threading.Lock` to `threading.RLock` (reentrant lock), making the deadlock impossible. Updated docstring to reflect the change - **n2** — `deque.append()` thread safety undocumented: Added inline comment: `# Thread-safe under CPython GIL; deque.append is atomic.` - **n3** — Thread pool exhaustion under sustained timeouts: Added warning log when `future_tp.cancel()` returns `False`, indicating the timed-out task could not be cancelled and the thread slot remains occupied - **n4** — PR description scenario count off by one: Corrected to "20 new BDD scenarios (21 total including pre-existing)" - **n5** — `replace_state` test accesses internal `state` attribute without comment: Added explanatory comment: `# Direct access intentional: identity check requires the actual internal object, not a get_state() copy.` ## Previous Review Cycles ### Cycle 6 (All Fixed) - **M1** — `CancelledError` not caught in `run_coroutine_threadsafe` path: Added `except concurrent.futures.CancelledError` catch mirroring the thread pool path - **M2** — Missing test for `TimeoutError` warning-level log through stream path: Added BDD scenario - **M3** — TOCTOU race in `start()`: `start()` now sets `is_running = False` before shutting down the old pool - **M4** — Event loop cleanup not exception-safe: Wrapped in `try/finally` - **M5** — Restart scenario thread pool not cleaned up on assertion failure: Registered cleanup via `context.add_cleanup()` - **M6** — `start()` can hang indefinitely: Added docstring warning - **n1–n8** — All addressed ### Cycle 5 (All Fixed) - **M1** — `replace_state()` stores caller's mutable reference directly: Fixed by deep-copying input - **M2** — Timed-out executor futures can still complete: Documented best-effort cancellation - **M3** — Missing test for `CancelledError` path: Added BDD scenario - **m1–m6, n1–n8** — All addressed ### Cycles 1–4 See git history for earlier review cycle fixes (all addressed in prior amendments). ## Deferred (Pre-existing, Out of Scope) - `close()` does not acquire `_lock` before setting `is_closed = True` (pre-existing) - `Node.execution_count` increment is not thread-safe (pre-existing, newly reachable) - `asyncio.run()` creates a new event loop per thread-pool invocation (acceptable for current use case) - Migration from `a2a-sdk` 0.3.x to 1.0.0 (separate ticket scope — 1.0.0 removes the legacy `A2AClient` class and introduces `Client` as the replacement)
hurui200320 added this to the v3.2.0 milestone 2026-04-20 07:55:36 +00:00
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from 41b5e8633f
Some checks failed
CI / lint (pull_request) Failing after 1m20s
CI / helm (pull_request) Successful in 28s
CI / quality (pull_request) Successful in 4m21s
CI / typecheck (pull_request) Successful in 4m43s
CI / security (pull_request) Successful in 4m54s
CI / coverage (pull_request) Has been skipped
CI / e2e_tests (pull_request) Successful in 8m56s
CI / push-validation (pull_request) Successful in 22s
CI / integration_tests (pull_request) Successful in 10m58s
CI / unit_tests (pull_request) Successful in 12m8s
CI / docker (pull_request) Has been skipped
CI / status-check (pull_request) Failing after 3s
CI / build (pull_request) Successful in 3m34s
to ebd47fc5f8
All checks were successful
CI / helm (pull_request) Successful in 43s
CI / push-validation (pull_request) Successful in 23s
CI / build (pull_request) Successful in 3m59s
CI / lint (pull_request) Successful in 4m12s
CI / quality (pull_request) Successful in 4m33s
CI / typecheck (pull_request) Successful in 4m53s
CI / security (pull_request) Successful in 4m51s
CI / integration_tests (pull_request) Successful in 7m57s
CI / e2e_tests (pull_request) Successful in 8m21s
CI / unit_tests (pull_request) Successful in 9m36s
CI / coverage (pull_request) Successful in 14m50s
CI / docker (pull_request) Successful in 1m37s
CI / status-check (pull_request) Successful in 2s
2026-04-20 09:31:42 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from ebd47fc5f8
All checks were successful
CI / helm (pull_request) Successful in 43s
CI / push-validation (pull_request) Successful in 23s
CI / build (pull_request) Successful in 3m59s
CI / lint (pull_request) Successful in 4m12s
CI / quality (pull_request) Successful in 4m33s
CI / typecheck (pull_request) Successful in 4m53s
CI / security (pull_request) Successful in 4m51s
CI / integration_tests (pull_request) Successful in 7m57s
CI / e2e_tests (pull_request) Successful in 8m21s
CI / unit_tests (pull_request) Successful in 9m36s
CI / coverage (pull_request) Successful in 14m50s
CI / docker (pull_request) Successful in 1m37s
CI / status-check (pull_request) Successful in 2s
to ed93e19f45
Some checks failed
CI / status-check (pull_request) Blocked by required conditions
CI / helm (pull_request) Successful in 38s
CI / push-validation (pull_request) Successful in 22s
CI / build (pull_request) Successful in 3m52s
CI / lint (pull_request) Successful in 4m1s
CI / quality (pull_request) Successful in 4m23s
CI / typecheck (pull_request) Successful in 4m33s
CI / security (pull_request) Successful in 4m38s
CI / e2e_tests (pull_request) Successful in 6m52s
CI / integration_tests (pull_request) Successful in 8m9s
CI / unit_tests (pull_request) Successful in 9m8s
CI / coverage (pull_request) Successful in 13m21s
CI / docker (pull_request) Has been cancelled
2026-04-20 10:36:03 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from ed93e19f45
Some checks failed
CI / status-check (pull_request) Blocked by required conditions
CI / helm (pull_request) Successful in 38s
CI / push-validation (pull_request) Successful in 22s
CI / build (pull_request) Successful in 3m52s
CI / lint (pull_request) Successful in 4m1s
CI / quality (pull_request) Successful in 4m23s
CI / typecheck (pull_request) Successful in 4m33s
CI / security (pull_request) Successful in 4m38s
CI / e2e_tests (pull_request) Successful in 6m52s
CI / integration_tests (pull_request) Successful in 8m9s
CI / unit_tests (pull_request) Successful in 9m8s
CI / coverage (pull_request) Successful in 13m21s
CI / docker (pull_request) Has been cancelled
to fb2680266e
Some checks failed
CI / unit_tests (pull_request) Failing after 10s
CI / helm (pull_request) Successful in 36s
CI / push-validation (pull_request) Successful in 21s
CI / lint (pull_request) Successful in 4m5s
CI / typecheck (pull_request) Successful in 4m20s
CI / quality (pull_request) Successful in 4m20s
CI / build (pull_request) Successful in 3m32s
CI / security (pull_request) Successful in 4m47s
CI / docker (pull_request) Has been skipped
CI / integration_tests (pull_request) Successful in 6m50s
CI / e2e_tests (pull_request) Successful in 6m43s
CI / coverage (pull_request) Successful in 13m31s
CI / status-check (pull_request) Failing after 3s
2026-04-20 11:30:25 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from fb2680266e
Some checks failed
CI / unit_tests (pull_request) Failing after 10s
CI / helm (pull_request) Successful in 36s
CI / push-validation (pull_request) Successful in 21s
CI / lint (pull_request) Successful in 4m5s
CI / typecheck (pull_request) Successful in 4m20s
CI / quality (pull_request) Successful in 4m20s
CI / build (pull_request) Successful in 3m32s
CI / security (pull_request) Successful in 4m47s
CI / docker (pull_request) Has been skipped
CI / integration_tests (pull_request) Successful in 6m50s
CI / e2e_tests (pull_request) Successful in 6m43s
CI / coverage (pull_request) Successful in 13m31s
CI / status-check (pull_request) Failing after 3s
to 08b7ba627b
Some checks failed
CI / helm (pull_request) Successful in 35s
CI / quality (pull_request) Successful in 4m11s
CI / typecheck (pull_request) Successful in 4m26s
CI / security (pull_request) Successful in 4m32s
CI / build (pull_request) Successful in 3m47s
CI / unit_tests (pull_request) Successful in 7m17s
CI / integration_tests (pull_request) Successful in 7m12s
CI / e2e_tests (pull_request) Successful in 7m28s
CI / push-validation (pull_request) Successful in 21s
CI / status-check (pull_request) Failing after 11s
CI / lint (pull_request) Failing after 55s
CI / docker (pull_request) Has been skipped
CI / coverage (pull_request) Has been skipped
2026-04-20 12:30:19 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from 08b7ba627b
Some checks failed
CI / helm (pull_request) Successful in 35s
CI / quality (pull_request) Successful in 4m11s
CI / typecheck (pull_request) Successful in 4m26s
CI / security (pull_request) Successful in 4m32s
CI / build (pull_request) Successful in 3m47s
CI / unit_tests (pull_request) Successful in 7m17s
CI / integration_tests (pull_request) Successful in 7m12s
CI / e2e_tests (pull_request) Successful in 7m28s
CI / push-validation (pull_request) Successful in 21s
CI / status-check (pull_request) Failing after 11s
CI / lint (pull_request) Failing after 55s
CI / docker (pull_request) Has been skipped
CI / coverage (pull_request) Has been skipped
to 68a2e06f32
Some checks failed
CI / unit_tests (pull_request) Failing after 12s
CI / helm (pull_request) Successful in 41s
CI / lint (pull_request) Failing after 1m24s
CI / push-validation (pull_request) Successful in 23s
CI / build (pull_request) Successful in 3m50s
CI / quality (pull_request) Successful in 4m23s
CI / typecheck (pull_request) Successful in 4m37s
CI / security (pull_request) Successful in 4m43s
CI / coverage (pull_request) Has been skipped
CI / docker (pull_request) Has been skipped
CI / integration_tests (pull_request) Successful in 6m52s
CI / e2e_tests (pull_request) Successful in 6m54s
CI / status-check (pull_request) Failing after 3s
2026-04-20 14:28:22 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from 68a2e06f32
Some checks failed
CI / unit_tests (pull_request) Failing after 12s
CI / helm (pull_request) Successful in 41s
CI / lint (pull_request) Failing after 1m24s
CI / push-validation (pull_request) Successful in 23s
CI / build (pull_request) Successful in 3m50s
CI / quality (pull_request) Successful in 4m23s
CI / typecheck (pull_request) Successful in 4m37s
CI / security (pull_request) Successful in 4m43s
CI / coverage (pull_request) Has been skipped
CI / docker (pull_request) Has been skipped
CI / integration_tests (pull_request) Successful in 6m52s
CI / e2e_tests (pull_request) Successful in 6m54s
CI / status-check (pull_request) Failing after 3s
to 00a3630fc0
Some checks failed
CI / lint (pull_request) Failing after 1m28s
CI / helm (pull_request) Successful in 48s
CI / push-validation (pull_request) Successful in 21s
CI / quality (pull_request) Successful in 4m16s
CI / security (pull_request) Successful in 4m43s
CI / typecheck (pull_request) Successful in 5m2s
CI / coverage (pull_request) Has been skipped
CI / build (pull_request) Successful in 3m39s
CI / e2e_tests (pull_request) Successful in 6m58s
CI / integration_tests (pull_request) Successful in 7m4s
CI / unit_tests (pull_request) Successful in 8m1s
CI / docker (pull_request) Has been skipped
CI / status-check (pull_request) Failing after 2s
2026-04-20 15:17:34 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from 00a3630fc0
Some checks failed
CI / lint (pull_request) Failing after 1m28s
CI / helm (pull_request) Successful in 48s
CI / push-validation (pull_request) Successful in 21s
CI / quality (pull_request) Successful in 4m16s
CI / security (pull_request) Successful in 4m43s
CI / typecheck (pull_request) Successful in 5m2s
CI / coverage (pull_request) Has been skipped
CI / build (pull_request) Successful in 3m39s
CI / e2e_tests (pull_request) Successful in 6m58s
CI / integration_tests (pull_request) Successful in 7m4s
CI / unit_tests (pull_request) Successful in 8m1s
CI / docker (pull_request) Has been skipped
CI / status-check (pull_request) Failing after 2s
to 7455ca6a6d
Some checks failed
CI / helm (pull_request) Successful in 39s
CI / lint (pull_request) Successful in 3m58s
CI / build (pull_request) Successful in 3m53s
CI / quality (pull_request) Successful in 4m27s
CI / typecheck (pull_request) Successful in 4m34s
CI / security (pull_request) Successful in 5m19s
CI / push-validation (pull_request) Successful in 21s
CI / e2e_tests (pull_request) Successful in 6m56s
CI / unit_tests (pull_request) Failing after 8m26s
CI / docker (pull_request) Has been skipped
CI / integration_tests (pull_request) Successful in 10m4s
CI / coverage (pull_request) Successful in 14m3s
CI / status-check (pull_request) Failing after 3s
2026-04-20 16:15:15 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from 7455ca6a6d
Some checks failed
CI / helm (pull_request) Successful in 39s
CI / lint (pull_request) Successful in 3m58s
CI / build (pull_request) Successful in 3m53s
CI / quality (pull_request) Successful in 4m27s
CI / typecheck (pull_request) Successful in 4m34s
CI / security (pull_request) Successful in 5m19s
CI / push-validation (pull_request) Successful in 21s
CI / e2e_tests (pull_request) Successful in 6m56s
CI / unit_tests (pull_request) Failing after 8m26s
CI / docker (pull_request) Has been skipped
CI / integration_tests (pull_request) Successful in 10m4s
CI / coverage (pull_request) Successful in 14m3s
CI / status-check (pull_request) Failing after 3s
to c96ba34f14
Some checks failed
CI / helm (pull_request) Successful in 32s
CI / push-validation (pull_request) Successful in 21s
CI / build (pull_request) Successful in 3m49s
CI / lint (pull_request) Successful in 4m7s
CI / unit_tests (pull_request) Failing after 4m25s
CI / quality (pull_request) Successful in 4m37s
CI / typecheck (pull_request) Successful in 4m37s
CI / security (pull_request) Successful in 4m53s
CI / docker (pull_request) Has been skipped
CI / e2e_tests (pull_request) Successful in 6m46s
CI / integration_tests (pull_request) Successful in 7m44s
CI / coverage (pull_request) Successful in 14m6s
CI / status-check (pull_request) Failing after 3s
2026-04-20 18:00:09 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from c96ba34f14
Some checks failed
CI / helm (pull_request) Successful in 32s
CI / push-validation (pull_request) Successful in 21s
CI / build (pull_request) Successful in 3m49s
CI / lint (pull_request) Successful in 4m7s
CI / unit_tests (pull_request) Failing after 4m25s
CI / quality (pull_request) Successful in 4m37s
CI / typecheck (pull_request) Successful in 4m37s
CI / security (pull_request) Successful in 4m53s
CI / docker (pull_request) Has been skipped
CI / e2e_tests (pull_request) Successful in 6m46s
CI / integration_tests (pull_request) Successful in 7m44s
CI / coverage (pull_request) Successful in 14m6s
CI / status-check (pull_request) Failing after 3s
to 92dfd2f6f3
Some checks failed
CI / helm (pull_request) Successful in 44s
CI / push-validation (pull_request) Successful in 44s
CI / lint (pull_request) Successful in 4m12s
CI / build (pull_request) Successful in 3m55s
CI / quality (pull_request) Successful in 4m33s
CI / typecheck (pull_request) Successful in 4m52s
CI / security (pull_request) Successful in 5m1s
CI / unit_tests (pull_request) Failing after 6m5s
CI / docker (pull_request) Has been skipped
CI / e2e_tests (pull_request) Successful in 8m5s
CI / integration_tests (pull_request) Successful in 8m42s
CI / coverage (pull_request) Successful in 15m0s
CI / status-check (pull_request) Failing after 3s
2026-04-21 02:49:59 +00:00
Compare
hurui200320 force-pushed bugfix/m3-node-stream-on-next-noop from 92dfd2f6f3
Some checks failed
CI / helm (pull_request) Successful in 44s
CI / push-validation (pull_request) Successful in 44s
CI / lint (pull_request) Successful in 4m12s
CI / build (pull_request) Successful in 3m55s
CI / quality (pull_request) Successful in 4m33s
CI / typecheck (pull_request) Successful in 4m52s
CI / security (pull_request) Successful in 5m1s
CI / unit_tests (pull_request) Failing after 6m5s
CI / docker (pull_request) Has been skipped
CI / e2e_tests (pull_request) Successful in 8m5s
CI / integration_tests (pull_request) Successful in 8m42s
CI / coverage (pull_request) Successful in 15m0s
CI / status-check (pull_request) Failing after 3s
to 0d267934a7
All checks were successful
CI / push-validation (pull_request) Successful in 38s
CI / helm (pull_request) Successful in 39s
CI / lint (pull_request) Successful in 3m55s
CI / build (pull_request) Successful in 3m50s
CI / typecheck (pull_request) Successful in 4m33s
CI / quality (pull_request) Successful in 4m58s
CI / security (pull_request) Successful in 5m18s
CI / e2e_tests (pull_request) Successful in 8m59s
CI / integration_tests (pull_request) Successful in 10m48s
CI / unit_tests (pull_request) Successful in 11m51s
CI / docker (pull_request) Successful in 1m55s
CI / coverage (pull_request) Successful in 16m37s
CI / status-check (pull_request) Successful in 3s
2026-04-21 05:14:14 +00:00
Compare
Author
Member

@HAL9000 This PR is ready to review: rebased onto the latest master, all CI checks passed.

@HAL9000 This PR is ready to review: rebased onto the latest master, all CI checks passed.
HAL9000 scheduled this pull request to auto merge when all checks succeed 2026-04-21 06:22:23 +00:00
HAL9001 approved these changes 2026-04-22 07:20:19 +00:00
HAL9001 left a comment

Review Summary

All required checks have passed and this PR successfully addresses bug #6511 by wiring the on_next handlers, improving thread-safety, resource management, and error handling in LangGraph. Tests have been added and updated, coverage remains ≥97%, and no prohibited patterns (e.g., # type: ignore) were introduced. The code adheres to project specifications and conventions. No blocking issues found. Approving.


Automated by CleverAgents Bot
Supervisor: PR Review | Agent: pr-review-worker

## Review Summary All required checks have passed and this PR successfully addresses bug #6511 by wiring the on_next handlers, improving thread-safety, resource management, and error handling in LangGraph. Tests have been added and updated, coverage remains ≥97%, and no prohibited patterns (e.g., # type: ignore) were introduced. The code adheres to project specifications and conventions. No blocking issues found. Approving. --- Automated by CleverAgents Bot Supervisor: PR Review | Agent: pr-review-worker
hurui200320 canceled auto merging this pull request when all checks succeed 2026-04-22 07:33:39 +00:00
Merge branch 'master' into bugfix/m3-node-stream-on-next-noop
Some checks failed
CI / push-validation (pull_request) Successful in 23s
CI / helm (pull_request) Successful in 33s
CI / build (pull_request) Successful in 3m48s
CI / lint (pull_request) Successful in 3m53s
CI / quality (pull_request) Successful in 4m18s
CI / security (pull_request) Successful in 4m35s
CI / typecheck (pull_request) Successful in 4m38s
CI / integration_tests (pull_request) Successful in 6m48s
CI / e2e_tests (pull_request) Successful in 7m16s
CI / unit_tests (pull_request) Successful in 8m49s
CI / docker (pull_request) Successful in 1m28s
CI / coverage (pull_request) Successful in 15m26s
CI / benchmark-regression (push) Waiting to run
CI / benchmark-publish (push) Waiting to run
CI / status-check (pull_request) Successful in 2s
CI / push-validation (push) Successful in 22s
CI / helm (push) Successful in 29s
CI / build (push) Successful in 3m52s
CI / lint (push) Successful in 3m57s
CI / quality (push) Successful in 4m21s
CI / typecheck (push) Successful in 4m29s
CI / security (push) Successful in 4m46s
CI / e2e_tests (push) Successful in 7m22s
CI / integration_tests (push) Successful in 7m28s
CI / unit_tests (push) Successful in 8m38s
CI / docker (push) Successful in 1m36s
CI / coverage (push) Successful in 15m44s
CI / status-check (push) Successful in 3s
CI / benchmark-publish (pull_request) Has been skipped
CI / benchmark-regression (pull_request) Failing after 37m45s
58fa90b1b5
hurui200320 scheduled this pull request to auto merge when all checks succeed 2026-04-22 07:33:59 +00:00
hurui200320 deleted branch bugfix/m3-node-stream-on-next-noop 2026-04-22 07:53:54 +00:00
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
2 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Reference
cleveragents/cleveragents-core!10795
No description provided.