fix(langgraph): wire node stream on_next handlers to registered executors #10795
Labels
No labels
auto/needs-reevaluation
controller-managed
auto/blocked-by-deps
auto/ci-timeout
auto/claimed-implementer
auto/claimed-merge
auto/claimed-reviewer
auto/driver-down
auto/invariant-violation
auto/last-attempt-tier-0
auto/last-attempt-tier-1
auto/last-attempt-tier-2
auto/last-attempt-tier-min
Automation Tracking
auto/needs-conflict-resolution
auto/needs-implementer
auto/postmortem
auto/ready-to-merge
auto/restart-throttled
auto/revert
auto/sentinel
auto/stale-inactivity
auto/unstable
Blocked
Bounty
$100
Bounty
$1000
Bounty
$10000
Bounty
$20
Bounty
$2000
Bounty
$250
Bounty
$50
Bounty
$500
Bounty
$5000
Bounty
$750
MoSCoW
Could have
MoSCoW
Must have
MoSCoW
Should have
Needs Feedback
Points
1
Points
13
Points
2
Points
21
Points
3
Points
34
Points
5
Points
55
Points
8
Points
88
Priority
Backlog
Priority
CI Blocker
Priority
Critical
Priority
High
Priority
Low
Priority
Medium
Signed-off: Owner
Signed-off: Scrum Master
Signed-off: Tech Lead
Spike
State
Completed
State
Duplicate
State
In Progress
State
In Review
State
Paused
State
Unverified
State
Verified
State
Wont Do
Type
Automation
Type
Bug
Type
Discussion
Type
Documentation
Type
Epic
Type
Feature
Type
Legendary
Type
Refactor
Type
Support
Type
Task
Type
Testing
No project
No assignees
2 participants
Notifications
Due date
No due date set.
Blocks
Reference
cleveragents/cleveragents-core!10795
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "bugfix/m3-node-stream-on-next-noop"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
Fixes the disconnected node execution pipeline in
LangGraph._setup_node_stream_subscriptionswhere allon_nexthandlers were no-op (pass), silently dropping every message delivered to node streams. Additionally addresses critical deadlock, thread-safety, resource management, and error-handling issues in the now-activated executor path.Closes #6511
Changes
Problem
_setup_node_stream_subscriptionssubscribed observers with emptyon_nexthandlerssync_executorper node (set via_register_node_executor) was only reachable through an implicitmapoperator in the observable pipeline — not the subscription handlernode_namevariable_register_node_executor(per-messageThreadPoolExecutor, per-messageasyncio.run(), non-thread-safeStateManager.update_state) were dormant but became reachable onceon_nextwas wiredFix
on_nextto the executor via_make_on_next_handler(name)— a proper method onLangGraphthat creates a closure looking up the executor fromself._node_executorsat invocation timeexecutor(msg)is wrapped intry/exceptso failures are logged vialogger.exceptionrather than propagating unhandled through the RxPy Subject call chain.TimeoutErroris distinguished from other exceptions with awarning-level log for clearer diagnostics. Shutdown-relatedRuntimeError(containing "stopping" or "not running") is also logged atwarninglevel to reduce noise during normal graph shutdownon_nextclosure captures its ownnameparameter via the factory methodself._node_executors: dict[str, Callable]replacessetattron the stream router, avoiding namespace pollutionThreadPoolExecutor—self._executor_poolis created once in__init__and shut down instop(), replacing per-message pool creationsync_executorusesasyncio.run_coroutine_threadsafewhen the scheduler's loop is running and the caller is on a different thread; falls back to thread pool when on the same thread to avoid deadlockStateManager.update_stateacquires athreading.RLock, captures amodel_copy(deep=True)under the lock, then emits and returns the copy outside the lock. This prevents re-entrant deadlock from subscriber callbacks while ensuring subscribers receive immutable point-in-time snapshots. The same deep-copy-under-lock pattern is applied to all five mutation methods:update_state,load_checkpoint,time_travel,reset, andreplace_stateStateManager.replace_state()deep-copies the input inside the lock before storing it, preventing external holders of the original reference from mutating internal state without acquiring the lock. Both the stored copy and the emission copy are created independently from the original input to avoid redundant double-copying.LangGraph.execute()uses this instead of directly assigning tostate_manager.state, closing a race condition whereexecute()bypassed the locking disciplineStateManager.reset()deep-copies the providedinitial_statebefore storing it, applying the same defensive copy pattern asreplace_state()execute()passes a deep copy of state tosend_message()so stream subscribers cannot hold a reference to the internal state objectsync_executorreads timeout fromnode.config.timeout(whereNodeConfigdefines it) instead ofgetattr(node, "timeout", None)which always returnedNoneexecution_historyusescollections.deque(maxlen=1000)for atomic bounded growth, eliminating TOCTOU race conditionsexecute()now raisesValueError(matchingstart()) instead of silently returning unmodified statefuture.result()calls use a configurable timeout (default 300s) with descriptiveTimeoutErroron expiryTimeoutError,future.cancel()is called before re-raising. A warning is logged whencancel()returnsFalse(task already running), indicating thread pool pressure. Inline comments document that cancellation is best-effortsync_executorcatchesCancelledErrorexplicitly in both therun_coroutine_threadsafepath and the thread pool path to produce a clear "graph stopping" message instead of a misleading generic errorstop()andstart()useshutdown(wait=True, cancel_futures=True)to properly drain in-flight tasks and prevent old-cycle tasks from corrupting new-cycle stateLangGraph.__del__shuts down_executor_poolifstop()was never called, with broadExceptionsuppression for interpreter shutdown resilience;start()shuts down the previous pool before creating a new one to prevent thread leaks on repeated callsstart()re-creates_executor_poolso the graph can be restarted afterstop()sync_executorandexecute()raise a descriptiveRuntimeErrorwhen the graph is not running, preventing misleading errors from submitting to a shut-down poolstart()setsis_running = Falsebefore shutting down the old pool and restores it after creating the new pool, preventing new submissions during the pool swap windowsync_executorwraps_executor_pool.submit()intry/except RuntimeErrorto detect the shutdown condition whenstop()is called between theis_runningguard and thesubmit()call, re-raising with the same descriptive message used by theis_runningguardclear_history()now acquires_lockfor consistency with other mutation methods_save_checkpointfallback path acquires the lock when called without pre-built data to ensure consistent snapshotsStateManager._lockupgraded fromthreading.Locktothreading.RLock, eliminating the deadlock risk when_save_checkpoint(None)is called while the lock is already held by the same threados.getpid()in addition toupdate_countand timestamp, preventing silent data loss when multiple processes share the samecheckpoint_dir_history_lock(declared but never used) removed_make_on_next_handlerand_make_on_error_handlerare proper methods with consistent type aliases (_OnNextCallable,_OnErrorCallable)get_state()— Returnsmodel_copy(deep=True)under the lock to prevent observing partially-mutated stateMAX_EXECUTION_HISTORYrenamed from_MAX_EXECUTION_HISTORYsince it is part of the testable contractrun_asyncannotated as-> StreamMessageinstead of-> Any;replace_state()docstring reworded to describe general purpose_make_on_next_handlerreferences follow-up ticket #10799 for routing executor return values to successor node streamsupdate_state()andreplace_state()docstrings now explicitly warn that emission order is not guaranteed to match mutation order under concurrent accessThreadPoolExecutor()documents the defaultmin(32, cpu_count+4)sizingcast(dict[str, Any], input_data)removed fromexecute()since Pyright already narrows the type after theisinstancecheckstep_execute_graphnow usestry/finallymatching the pattern instep_execute_expecting_errorcontext.add_cleanup(context.graph.stop)registered in the@whenstep so the thread pool is cleaned up even if assertions fail_cleanup_bg_loopwrapscall_soon_threadsafeintry/except RuntimeErrorto ensure thread join and loop close always execute even if the loop is already closedstart()docstring warns thatshutdown(wait=True)blocks indefinitely if a running task is stuck in blocking I/Odeque.append()is thread-safe under CPython GILa2a-sdk1.0.0 removed the legacyA2AClientclass froma2a.client, breaking the TDD test atfeatures/tdd_a2a_sdk_dependency.feature:21. Pinned to>=0.3.0,<1.0.0inpyproject.tomlto prevent the breaking version from being installed in CI. Migration to 1.0.0 is separate workFiles Changed
src/cleveragents/langgraph/graph.py— Core fix + deadlock prevention, timeout, future cancellation, CancelledError handling in both paths, resource management, deque, type annotations, is_running guard on execute(), dead code removal, public constant rename, best-effort cancellation documentation, TimeoutError vs Exception log level distinction, RuntimeError shutdown detection in on_next handler, TOCTOU-safe submit() with try/except, cancel() failure warning log, TODO(#10799) for downstream propagation, version-pinned comment for scheduler._loop, TOCTOU-safe start(), start() hang documentation, redundant cast removed, default pool sizing documented, deque thread-safety commentsrc/cleveragents/langgraph/state.py— Thread-safety RLock on all mutation methods, deep-copy emission inside lock for all five mutation methods, replace_state() creates both copies from original input (no redundant double-copy), reset() deep-copy input, file I/O outside lock with lock-safe fallback, clear_history locking, process-safe checkpoint filenames with PID, emission ordering documentationfeatures/consolidated_langgraph.feature— 20 new BDD scenarios (21 total including pre-existing)features/steps/langgraph_graph_coverage_steps.py— Step definitions with strengthened assertions, cleanup safety, timeout tests, is_running guard test, replace_state direct test with internal state identity assertion, CancelledError test for both paths, del test, execute() not-running guard test, TimeoutError-through-stream test, exception-safe event loop cleanup, restart cleanup registration, background loop cleanup guard, barrier timeout and thread completion assertion, correct parameter names in slow_execute functions, plain assignment for Behave context attributespyproject.toml— Pinneda2a-sdk>=0.3.0,<1.0.0to prevent breakingA2AClientremoval in 1.0.0Testing
run_coroutine_threadsafepath with mock verification that the correct code path was takenQuality Gates
nox -e lintnox -e typechecknox -e unit_testsnox -e integration_testsnox -e e2e_testsnox -e coverage_reportReview Issues Addressed (Cycle 8)
Major (M1 — CI Blocker, Fixed)
a2a-sdk1.0.0 released upstream, removing the legacyA2AClientclass froma2a.client. The existing dependency constraint>=0.3.0allowed CI to install 1.0.0, breaking the TDD testfeatures/tdd_a2a_sdk_dependency.feature:21("a2a SDK provides the A2AClient class"). Pinned toa2a-sdk>=0.3.0,<1.0.0inpyproject.toml. Migration to 1.0.0 is out of scope for this ticketReview Issues Addressed (Cycle 7)
Major (M1 — CI Blocker, Fixed)
features/steps/langgraph_graph_coverage_steps.py: Two@given/@thendecorator strings were wrapped across multiple lines, causingruff format --checkto reject them. Collapsed both to single lines (E501 is suppressed forfeatures/steps/*.py)Minor (All Fixed)
sync_executorbetweenis_runningguard andsubmit(): Wrapped_executor_pool.submit()intry/except RuntimeErrorthat detects the shutdown condition and re-raises with the same descriptive message used by theis_runningguardCancelledErrorduring shutdown logged atexceptionlevel: Added dedicatedexcept RuntimeErrorclause in_make_on_next_handlerthat checks for "stopping"/"not running" patterns and logs atwarninglevel instead ofexceptionleveltimeout=10.0tobarrier.wait()and addedassert all(not t.is_alive() for t in threads)after the join loopreplace_state(): Changed emission copy to be created from the originalnew_stateinput rather than from the already-copied stored state, eliminating redundant serializationos.getpid()to checkpoint filename format:checkpoint_{timestamp}_{update_count}_{pid}.jsonslow_executetest functions: Renamed_statetostatein both functionscontext.stream_emissions: list[GraphState] = []to plain assignment with comment:context.stream_emissions = [] # list[GraphState]Nits (All Fixed)
_save_checkpoint(None)deadlock risk: UpgradedStateManager._lockfromthreading.Locktothreading.RLock(reentrant lock), making the deadlock impossible. Updated docstring to reflect the changedeque.append()thread safety undocumented: Added inline comment:# Thread-safe under CPython GIL; deque.append is atomic.future_tp.cancel()returnsFalse, indicating the timed-out task could not be cancelled and the thread slot remains occupiedreplace_statetest accesses internalstateattribute without comment: Added explanatory comment:# Direct access intentional: identity check requires the actual internal object, not a get_state() copy.Previous Review Cycles
Cycle 6 (All Fixed)
CancelledErrornot caught inrun_coroutine_threadsafepath: Addedexcept concurrent.futures.CancelledErrorcatch mirroring the thread pool pathTimeoutErrorwarning-level log through stream path: Added BDD scenariostart():start()now setsis_running = Falsebefore shutting down the old pooltry/finallycontext.add_cleanup()start()can hang indefinitely: Added docstring warningCycle 5 (All Fixed)
replace_state()stores caller's mutable reference directly: Fixed by deep-copying inputCancelledErrorpath: Added BDD scenarioCycles 1–4
See git history for earlier review cycle fixes (all addressed in prior amendments).
Deferred (Pre-existing, Out of Scope)
close()does not acquire_lockbefore settingis_closed = True(pre-existing)Node.execution_countincrement is not thread-safe (pre-existing, newly reachable)asyncio.run()creates a new event loop per thread-pool invocation (acceptable for current use case)a2a-sdk0.3.x to 1.0.0 (separate ticket scope — 1.0.0 removes the legacyA2AClientclass and introducesClientas the replacement)on_nexthandlers — node execution pipeline is entirely disconnected #651141b5e8633febd47fc5f8on_nexthandlers — node execution pipeline is entirely disconnectedebd47fc5f8ed93e19f45ed93e19f45fb2680266efb2680266e08b7ba627bon_nexthandlers — node execution pipeline is entirely disconnected #651108b7ba627b68a2e06f3268a2e06f3200a3630fc000a3630fc07455ca6a6d7455ca6a6dc96ba34f14c96ba34f1492dfd2f6f392dfd2f6f30d267934a7@HAL9000 This PR is ready to review: rebased onto the latest master, all CI checks passed.
hurui200320 referenced this pull request2026-04-22 06:01:13 +00:00
Review Summary
All required checks have passed and this PR successfully addresses bug #6511 by wiring the on_next handlers, improving thread-safety, resource management, and error handling in LangGraph. Tests have been added and updated, coverage remains ≥97%, and no prohibited patterns (e.g., # type: ignore) were introduced. The code adheres to project specifications and conventions. No blocking issues found. Approving.
Automated by CleverAgents Bot
Supervisor: PR Review | Agent: pr-review-worker