BUG-HUNT: [error-handling] All graph node stream subscriptions have no-op on_next handlers — node execution pipeline is entirely disconnected #6511

Closed
opened 2026-04-09 21:13:28 +00:00 by HAL9000 · 3 comments
Owner

Bug Report: [error-handling] — Node Stream on_next Handlers Are Empty No-ops

Severity Assessment

  • Impact: Every message delivered to any node's reactive stream is silently dropped. Even if LangGraph.execute() were fixed to await stream processing, the node execution callbacks would never fire. The entire reactive node-execution pipeline is permanently disconnected.
  • Likelihood: 100% — affects every stream-triggered node invocation.
  • Priority: Critical

Location

  • File: src/cleveragents/langgraph/graph.py
  • Class: LangGraph
  • Method: _setup_node_stream_subscriptions
  • Lines: ~127–145

Description

_setup_node_stream_subscriptions subscribes an Observer to every node's RxPy observable stream. The on_next handler — which is the callback that should trigger actual node execution — is defined as a complete no-op (pass). This means all messages arriving on any node stream are silently discarded without ever executing the node.

Evidence

def _setup_node_stream_subscriptions(self) -> None:
    for node_name in self.nodes:
        stream_name = f"__{self.name}_node_{node_name}__"
        if stream_name in self.stream_router.observables:
            observable = self.stream_router.observables[stream_name]

            def on_next(_msg: Any) -> None:
                pass  # <-- EMPTY: every message is silently dropped!

            def on_error(error: Exception, name: str = stream_name) -> None:
                self.logger.error("Error in node stream %s: %s", name, error)

            def on_completed() -> None:
                pass

            observer = Observer(
                on_next=on_next, on_error=on_error, on_completed=on_completed
            )
            observable.subscribe(observer)

Additionally, _register_node_executor (lines ~148–175) registers a sync_executor on the stream router via setattr(self.stream_router, f"_builtin_execute_node_{node_name}", sync_executor), but this executor is never wired into the subscription; the subscription uses only the empty on_next above. The registered executors are dead code from the subscription's perspective.

Expected Behavior

The on_next handler should invoke the corresponding node's execute() method (via the registered sync_executor or the async executor closure), update graph state, and propagate the result to downstream node streams.

Actual Behavior

Every message delivered to any node's stream is silently dropped. Graph nodes are never executed via the reactive stream path.

Suggested Fix

The on_next callback should call the per-node executor registered in _register_node_executor. The simplest fix is to look up and call the sync executor:

def make_on_next(name: str) -> Callable[[Any], None]:
    executor_attr = f"_builtin_execute_node_{name}"
    def on_next(msg: Any) -> None:
        executor = getattr(self.stream_router, executor_attr, None)
        if executor is not None:
            executor(msg)
        else:
            self.logger.warning("No executor found for node %s", name)
    return on_next

# In loop:
observer = Observer(on_next=make_on_next(node_name), ...)

Note: the closure-in-loop problem (all closures sharing the same node_name) should also be addressed, which the factory function above handles.

Category

error-handling

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [error-handling] — Node Stream `on_next` Handlers Are Empty No-ops ### Severity Assessment - **Impact**: Every message delivered to any node's reactive stream is silently dropped. Even if `LangGraph.execute()` were fixed to await stream processing, the node execution callbacks would never fire. The entire reactive node-execution pipeline is permanently disconnected. - **Likelihood**: 100% — affects every stream-triggered node invocation. - **Priority**: Critical ### Location - **File**: `src/cleveragents/langgraph/graph.py` - **Class**: `LangGraph` - **Method**: `_setup_node_stream_subscriptions` - **Lines**: ~127–145 ### Description `_setup_node_stream_subscriptions` subscribes an `Observer` to every node's RxPy observable stream. The `on_next` handler — which is the callback that should trigger actual node execution — is defined as a **complete no-op** (`pass`). This means all messages arriving on any node stream are silently discarded without ever executing the node. ### Evidence ```python def _setup_node_stream_subscriptions(self) -> None: for node_name in self.nodes: stream_name = f"__{self.name}_node_{node_name}__" if stream_name in self.stream_router.observables: observable = self.stream_router.observables[stream_name] def on_next(_msg: Any) -> None: pass # <-- EMPTY: every message is silently dropped! def on_error(error: Exception, name: str = stream_name) -> None: self.logger.error("Error in node stream %s: %s", name, error) def on_completed() -> None: pass observer = Observer( on_next=on_next, on_error=on_error, on_completed=on_completed ) observable.subscribe(observer) ``` Additionally, `_register_node_executor` (lines ~148–175) registers a `sync_executor` on the stream router via `setattr(self.stream_router, f"_builtin_execute_node_{node_name}", sync_executor)`, but this executor is never wired into the subscription; the subscription uses only the empty `on_next` above. The registered executors are dead code from the subscription's perspective. ### Expected Behavior The `on_next` handler should invoke the corresponding node's `execute()` method (via the registered `sync_executor` or the async executor closure), update graph state, and propagate the result to downstream node streams. ### Actual Behavior Every message delivered to any node's stream is silently dropped. Graph nodes are never executed via the reactive stream path. ### Suggested Fix The `on_next` callback should call the per-node executor registered in `_register_node_executor`. The simplest fix is to look up and call the sync executor: ```python def make_on_next(name: str) -> Callable[[Any], None]: executor_attr = f"_builtin_execute_node_{name}" def on_next(msg: Any) -> None: executor = getattr(self.stream_router, executor_attr, None) if executor is not None: executor(msg) else: self.logger.warning("No executor found for node %s", name) return on_next # In loop: observer = Observer(on_next=make_on_next(node_name), ...) ``` Note: the closure-in-loop problem (all closures sharing the same `node_name`) should also be addressed, which the factory function above handles. ### Category `error-handling` ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
hurui200320 added this to the v3.2.0 milestone 2026-04-15 12:11:15 +00:00
Member

Implementation Notes

Analysis & Design Decision

During implementation, I discovered that the node execution pipeline was not entirely disconnected as the bug report states. The _create_graph_streams method registered a map operator on each stream's observable:

operators = [{"type": "map", "params": {"function": f"execute_node_{node_name}"}}]

This map operator was resolved by _create_operator via getattr(self, f"_builtin_{func_name}", lambda x: x), which captured a reference to the sync_executor at stream creation time. So node execution was happening via the operator pipeline — the on_next handler received the executor's result and discarded it.

However, there were real problems:

  1. Fragile coupling — the executor reference was captured at creation time by the map operator. If registration order changed, a passthrough lambda x: x would silently replace the executor.
  2. Closure-in-loop risk — while the on_next handler didn't use loop variables, the pattern was error-prone for future changes.
  3. Opaque architecture — the execution was hidden inside the operator pipeline; the subscription handler appeared to be a no-op.

Fix Applied

Rather than simply adding executor calls to on_next (which would cause double execution through both the map operator and the handler), I:

  1. Removed the map operator from _create_graph_streams — streams are now created with no operators
  2. Wired on_next to executors via _make_on_next(name) factory — uses getattr at invocation time (not closure time), wraps non-StreamMessage inputs, and warns on missing executors
  3. Preserved the _register_node_executor pattern — executors are still registered as stream router attributes, just looked up lazily

Key Code Locations

  • LangGraph._create_graph_streams (cleveragents.langgraph.graph, commit 41b5e863) — removed map operator
  • LangGraph._setup_node_stream_subscriptions (cleveragents.langgraph.graph, commit 41b5e863) — factory-based on_next wiring
  • _OnNextCallable type alias (cleveragents.langgraph.graph, commit 41b5e863) — added for type safety

Test Results

  • 4 new BDD scenarios in features/consolidated_langgraph.feature
  • All 15,270 unit test scenarios pass
  • Integration tests pass
  • Coverage: 97.18%

PR

PR #10795

## Implementation Notes ### Analysis & Design Decision During implementation, I discovered that the node execution pipeline was not *entirely* disconnected as the bug report states. The `_create_graph_streams` method registered a `map` operator on each stream's observable: ```python operators = [{"type": "map", "params": {"function": f"execute_node_{node_name}"}}] ``` This `map` operator was resolved by `_create_operator` via `getattr(self, f"_builtin_{func_name}", lambda x: x)`, which captured a reference to the `sync_executor` at stream creation time. So node execution **was** happening via the operator pipeline — the `on_next` handler received the executor's *result* and discarded it. However, there were real problems: 1. **Fragile coupling** — the executor reference was captured at creation time by the map operator. If registration order changed, a passthrough `lambda x: x` would silently replace the executor. 2. **Closure-in-loop risk** — while the `on_next` handler didn't use loop variables, the pattern was error-prone for future changes. 3. **Opaque architecture** — the execution was hidden inside the operator pipeline; the subscription handler appeared to be a no-op. ### Fix Applied Rather than simply adding executor calls to `on_next` (which would cause **double execution** through both the map operator and the handler), I: 1. **Removed the `map` operator** from `_create_graph_streams` — streams are now created with no operators 2. **Wired `on_next` to executors** via `_make_on_next(name)` factory — uses `getattr` at invocation time (not closure time), wraps non-StreamMessage inputs, and warns on missing executors 3. **Preserved the `_register_node_executor` pattern** — executors are still registered as stream router attributes, just looked up lazily ### Key Code Locations - `LangGraph._create_graph_streams` (`cleveragents.langgraph.graph`, commit `41b5e863`) — removed map operator - `LangGraph._setup_node_stream_subscriptions` (`cleveragents.langgraph.graph`, commit `41b5e863`) — factory-based on_next wiring - `_OnNextCallable` type alias (`cleveragents.langgraph.graph`, commit `41b5e863`) — added for type safety ### Test Results - 4 new BDD scenarios in `features/consolidated_langgraph.feature` - All 15,270 unit test scenarios pass - Integration tests pass - Coverage: 97.18% ### PR PR #10795
Member

Self-QA Implementation Notes (Cycles 1–5)

This comment documents the internal self-QA review/fix loop run on PR #10795. Five review/fix cycles were completed. All quality gates pass (lint , typecheck , unit_tests 15,281 scenarios, integration_tests , coverage 97.2%).


Cycle 1

Review findings (0C / 6M / 7m / 5n):

  • M1: Missing exception handling in on_next — executor errors propagate unhandled
  • M2: Fragile and incomplete warning assertion in Scenario 3 (str repr of call_args)
  • M3: Missing test scenario for executor exception propagation
  • M4: ThreadPoolExecutor created per message (now activated by the fix)
  • M5: asyncio.run() creates new event loop per execution (now activated)
  • M6: state_manager.update_state and execution_history not thread-safe (now activated)
  • m1–m6: Various minor issues (silent no-op in execute(), missing metadata assertion, missing branch coverage, unbounded execution_history, setattr pollution, ruff format)
  • n1–n5: Nits (factory redefined in loop, underscore convention, inconsistent closure patterns, no-op on_completed, scenario titles)

Fixes applied:

  • Added try/except around executor(msg) in _make_on_next_handler
  • Fixed fragile warning assertion to use call_args tuple directly
  • Added BDD scenario for executor exception propagation
  • Created shared ThreadPoolExecutor as instance attribute, shut down in stop()
  • Used asyncio.run_coroutine_threadsafe when scheduler loop is running
  • Added threading.Lock to StateManager.update_state
  • Fixed all minor issues: ValueError in execute(), metadata assertion, observable branch coverage, execution_history cap, moved executors to self._node_executors dict, ran ruff format
  • Fixed all nits: extracted _make_on_next_handler as method, factory pattern for on_error too, on_completed before loop, renamed scenarios to describe behavior

Cycle 2

Review findings (1C / 6M / 9m / 7n):

  • C1: Deadlock in sync_executor when called from scheduler's event loop thread (run_coroutine_threadsafe + future.result() on same thread)
  • M1: Non-reentrant threading.Lock risks deadlock via state_stream.on_next inside lock
  • M2: No timeout on blocking future.result() calls
  • M3: _executor_pool.shutdown(wait=False) abandons in-flight tasks
  • M4: Thread pool resource leak if stop() is never called
  • M5: Zero test coverage for asyncio.run_coroutine_threadsafe path
  • M6: Scenario 6 assertion too weak
  • m1–m9: Thread-safety gaps, missing tests, event loop leak, type annotation

Fixes applied:

  • C1: Added asyncio.get_running_loop() detection; routes to thread pool when on scheduler loop thread
  • M1: Moved state_stream.on_next() outside lock in all 4 mutation methods; state snapshot captured inside lock
  • M2: Added configurable timeout (300s default) to both future.result() calls
  • M3: Changed to shutdown(wait=True, cancel_futures=True)
  • M4: Added __del__ safety net
  • M5: Added BDD scenario with background event loop thread
  • M6: Strengthened assertion with specific debug message, stream name, negative assertions
  • All minor issues and nits fixed

Cycle 3

Review findings (0C / 5M / 9m / 7n):

  • M1: Mutable state reference emitted outside lock in all 4 StateManager mutation methods (state_snapshot = self.state is a reference alias, not a copy)
  • M2: execute() bypasses StateManager._lock by direct assignment self.state_manager.state = state
  • M3: Per-node timeout non-functional — getattr(node, "timeout", None) always returns None (attribute is on node.config)
  • M4: Scenario 8 doesn't verify which execution path was taken (both paths produce identical results)
  • M5: No test coverage for TimeoutError branches
  • m1–m9: Thread pool leak on repeated start(), dead _history_lock, clear_history() without lock, file I/O inside lock, post-stop executor error, duplicate subscriptions, no restart test, cleanup resource leak, misleading variable name

Fixes applied:

  • M1: All 4 mutation methods now capture self.state.model_copy(deep=True) inside lock, emit copy outside
  • M2: Added StateManager.replace_state() with lock; execute() calls replace_state()
  • M3: Fixed to node.config.timeout if node.config.timeout is not None else _DEFAULT_EXECUTOR_TIMEOUT
  • M4: Added patch("asyncio.run_coroutine_threadsafe", wraps=...) mock to verify code path
  • M5: Added 2 BDD scenarios for TimeoutError (both paths)
  • All minor issues and nits fixed; _history_lock removed, deque(maxlen=...) used for atomic bounded history

Cycle 4

Review findings (0C / 3M / 6m / 6n):

  • M1: replace_state() deep copy captured outside the lock — same race condition as cycle 3 M1
  • M2: Timed-out executor futures not cancelled — silent state corruption after TimeoutError
  • M3: start() with shutdown(wait=False) allows old-cycle tasks to corrupt new-cycle state
  • m1–m6: _save_checkpoint() fallback without lock, CancelledError not handled, execute() passes same mutable reference to both replace_state() and send_message(), missing is_running guard test, event loop cleanup leak, missing direct replace_state() test
  • n1–n6: Type annotation, docstring, public constant rename, patch comment, del suppress scope, Scenario 13 scope comment

Fixes applied:

  • M1: Moved state_copy = new_state.model_copy(deep=True) inside with self._lock: in replace_state()
  • M2: Added future.cancel() / future_tp.cancel() before re-raising TimeoutError in both paths
  • M3: Changed start() to shutdown(wait=True, cancel_futures=True)
  • All minor issues and nits fixed; MAX_EXECUTION_HISTORY renamed public; 15 BDD scenarios total

Cycle 5

Review findings (0C / 3M / 6m / 8n):

  • M1: replace_state() stores caller's mutable reference directly — self.state = new_state without deep-copying the input; external holder can mutate self.state without lock
  • M2: Timed-out executor futures can still complete and mutate state (best-effort cancellation limitation — needs documentation)
  • M3: Missing test for CancelledError path in sync_executor
  • m1–m6: _save_checkpoint fallback non-reentrant lock risk, on_next swallows all exceptions with no error propagation, missing del test, Scenario 15 incomplete identity assertion, checkpoint filename second-precision collision, downstream propagation not implemented
  • n1–n8: Various nits

Fixes applied (pending — cycle 5 fix not yet run):

  • None yet — this is the checkpoint post after 5 cycles

Remaining Issues (after Cycle 5 review, before Cycle 5 fix)

Must fix before merge:

  1. M1replace_state() must deep-copy the input: self.state = new_state.model_copy(deep=True) inside the lock (one-line fix)
  2. M2 — Document that timed-out futures may still complete and mutate state (Python cooperative cancellation limitation)
  3. M3 — Add BDD scenario for CancelledError path in sync_executor

Lower priority (can be follow-up tickets):

  • m1: _save_checkpoint fallback lock reentrancy risk
  • m2: No error propagation path for failed node executions
  • m3: Missing __del__ test
  • m4: Scenario 15 incomplete identity assertion
  • m5: Checkpoint filename second-precision collision
  • m6: Downstream propagation to successor nodes not implemented (pre-existing gap, ticket mentions it)

Self-QA loop run by hurui200320 on 2026-04-20. All quality gates pass on the current commit.

## Self-QA Implementation Notes (Cycles 1–5) This comment documents the internal self-QA review/fix loop run on PR #10795. Five review/fix cycles were completed. All quality gates pass (lint ✅, typecheck ✅, unit_tests ✅ 15,281 scenarios, integration_tests ✅, coverage ✅ 97.2%). --- ### Cycle 1 **Review findings (0C / 6M / 7m / 5n):** - M1: Missing exception handling in `on_next` — executor errors propagate unhandled - M2: Fragile and incomplete warning assertion in Scenario 3 (str repr of call_args) - M3: Missing test scenario for executor exception propagation - M4: `ThreadPoolExecutor` created per message (now activated by the fix) - M5: `asyncio.run()` creates new event loop per execution (now activated) - M6: `state_manager.update_state` and `execution_history` not thread-safe (now activated) - m1–m6: Various minor issues (silent no-op in execute(), missing metadata assertion, missing branch coverage, unbounded execution_history, setattr pollution, ruff format) - n1–n5: Nits (factory redefined in loop, underscore convention, inconsistent closure patterns, no-op on_completed, scenario titles) **Fixes applied:** - Added `try/except` around `executor(msg)` in `_make_on_next_handler` - Fixed fragile warning assertion to use `call_args` tuple directly - Added BDD scenario for executor exception propagation - Created shared `ThreadPoolExecutor` as instance attribute, shut down in `stop()` - Used `asyncio.run_coroutine_threadsafe` when scheduler loop is running - Added `threading.Lock` to `StateManager.update_state` - Fixed all minor issues: ValueError in execute(), metadata assertion, observable branch coverage, execution_history cap, moved executors to `self._node_executors` dict, ran ruff format - Fixed all nits: extracted `_make_on_next_handler` as method, factory pattern for on_error too, on_completed before loop, renamed scenarios to describe behavior --- ### Cycle 2 **Review findings (1C / 6M / 9m / 7n):** - C1: Deadlock in `sync_executor` when called from scheduler's event loop thread (`run_coroutine_threadsafe` + `future.result()` on same thread) - M1: Non-reentrant `threading.Lock` risks deadlock via `state_stream.on_next` inside lock - M2: No timeout on blocking `future.result()` calls - M3: `_executor_pool.shutdown(wait=False)` abandons in-flight tasks - M4: Thread pool resource leak if `stop()` is never called - M5: Zero test coverage for `asyncio.run_coroutine_threadsafe` path - M6: Scenario 6 assertion too weak - m1–m9: Thread-safety gaps, missing tests, event loop leak, type annotation **Fixes applied:** - C1: Added `asyncio.get_running_loop()` detection; routes to thread pool when on scheduler loop thread - M1: Moved `state_stream.on_next()` outside lock in all 4 mutation methods; state snapshot captured inside lock - M2: Added configurable timeout (300s default) to both `future.result()` calls - M3: Changed to `shutdown(wait=True, cancel_futures=True)` - M4: Added `__del__` safety net - M5: Added BDD scenario with background event loop thread - M6: Strengthened assertion with specific debug message, stream name, negative assertions - All minor issues and nits fixed --- ### Cycle 3 **Review findings (0C / 5M / 9m / 7n):** - M1: Mutable state reference emitted outside lock in all 4 `StateManager` mutation methods (`state_snapshot = self.state` is a reference alias, not a copy) - M2: `execute()` bypasses `StateManager._lock` by direct assignment `self.state_manager.state = state` - M3: Per-node timeout non-functional — `getattr(node, "timeout", None)` always returns `None` (attribute is on `node.config`) - M4: Scenario 8 doesn't verify which execution path was taken (both paths produce identical results) - M5: No test coverage for `TimeoutError` branches - m1–m9: Thread pool leak on repeated start(), dead _history_lock, clear_history() without lock, file I/O inside lock, post-stop executor error, duplicate subscriptions, no restart test, cleanup resource leak, misleading variable name **Fixes applied:** - M1: All 4 mutation methods now capture `self.state.model_copy(deep=True)` inside lock, emit copy outside - M2: Added `StateManager.replace_state()` with lock; `execute()` calls `replace_state()` - M3: Fixed to `node.config.timeout if node.config.timeout is not None else _DEFAULT_EXECUTOR_TIMEOUT` - M4: Added `patch("asyncio.run_coroutine_threadsafe", wraps=...)` mock to verify code path - M5: Added 2 BDD scenarios for TimeoutError (both paths) - All minor issues and nits fixed; `_history_lock` removed, `deque(maxlen=...)` used for atomic bounded history --- ### Cycle 4 **Review findings (0C / 3M / 6m / 6n):** - M1: `replace_state()` deep copy captured outside the lock — same race condition as cycle 3 M1 - M2: Timed-out executor futures not cancelled — silent state corruption after TimeoutError - M3: `start()` with `shutdown(wait=False)` allows old-cycle tasks to corrupt new-cycle state - m1–m6: _save_checkpoint() fallback without lock, CancelledError not handled, execute() passes same mutable reference to both replace_state() and send_message(), missing is_running guard test, event loop cleanup leak, missing direct replace_state() test - n1–n6: Type annotation, docstring, public constant rename, patch comment, __del__ suppress scope, Scenario 13 scope comment **Fixes applied:** - M1: Moved `state_copy = new_state.model_copy(deep=True)` inside `with self._lock:` in `replace_state()` - M2: Added `future.cancel()` / `future_tp.cancel()` before re-raising TimeoutError in both paths - M3: Changed `start()` to `shutdown(wait=True, cancel_futures=True)` - All minor issues and nits fixed; `MAX_EXECUTION_HISTORY` renamed public; 15 BDD scenarios total --- ### Cycle 5 **Review findings (0C / 3M / 6m / 8n):** - M1: `replace_state()` stores caller's mutable reference directly — `self.state = new_state` without deep-copying the input; external holder can mutate `self.state` without lock - M2: Timed-out executor futures can still complete and mutate state (best-effort cancellation limitation — needs documentation) - M3: Missing test for `CancelledError` path in `sync_executor` - m1–m6: _save_checkpoint fallback non-reentrant lock risk, on_next swallows all exceptions with no error propagation, missing __del__ test, Scenario 15 incomplete identity assertion, checkpoint filename second-precision collision, downstream propagation not implemented - n1–n8: Various nits **Fixes applied (pending — cycle 5 fix not yet run):** - None yet — this is the checkpoint post after 5 cycles --- ### Remaining Issues (after Cycle 5 review, before Cycle 5 fix) **Must fix before merge:** 1. **M1** — `replace_state()` must deep-copy the input: `self.state = new_state.model_copy(deep=True)` inside the lock (one-line fix) 2. **M2** — Document that timed-out futures may still complete and mutate state (Python cooperative cancellation limitation) 3. **M3** — Add BDD scenario for `CancelledError` path in `sync_executor` **Lower priority (can be follow-up tickets):** - m1: `_save_checkpoint` fallback lock reentrancy risk - m2: No error propagation path for failed node executions - m3: Missing `__del__` test - m4: Scenario 15 incomplete identity assertion - m5: Checkpoint filename second-precision collision - m6: Downstream propagation to successor nodes not implemented (pre-existing gap, ticket mentions it) --- *Self-QA loop run by `hurui200320` on 2026-04-20. All quality gates pass on the current commit.*
Member

Self-QA Implementation Notes (Cycles 6–8)

Continuation of the self-QA loop from the previous comment. Three additional cycles were run. The PR was approved at the end of Cycle 8.


Cycle 6

Review findings (0C / 0M / 6m / 8n):

  • m1: CancelledError not caught in run_coroutine_threadsafe path — inconsistent error semantics with thread pool path
  • m2: Missing test for TimeoutError warning-level log through stream path (cycle-5 fix not fully verified)
  • m3: TOCTOU race between is_running guard and _executor_pool.submit() in start()
  • m4: Event loop cleanup not exception-safe in step_execute_graph
  • m5: Restart scenario thread pool not cleaned up on assertion failure
  • m6: start() can hang indefinitely on stuck executor tasks — undocumented
  • n1–n8: Various nits (redundant assertions, ruff format, cast() redundancy, emission ordering docs, ThreadPoolExecutor sizing comment, _cleanup_bg_loop guard, TODO without ticket reference)

Fixes applied:

  • Added CancelledError catch in run_coroutine_threadsafe path; added BDD scenario
  • Added BDD scenario for TimeoutError warning-level log through stream path
  • start() now sets is_running=False before shutdown and True after new pool creation
  • step_execute_graph wrapped in try/finally for event loop cleanup
  • Restart scenario registers context.add_cleanup(context.graph.stop)
  • Added docstring to start() documenting hang limitation
  • All nits fixed; created follow-up issue #10799 for downstream propagation

Cycle 7

Review findings (0C / 1M / 7m / 5n):

  • M1: Ruff format violation — two @given/@then decorator strings wrapped across multiple lines (CI lint blocker)
  • m1: TOCTOU race in sync_executor produces misleading log during shutdown
  • m2: CancelledError during shutdown logged at exception level (noisy)
  • m3: Concurrent test barrier has no timeout — could hang indefinitely
  • m4: Double deep copy in replace_state() is redundant
  • m5: Checkpoint filename collision across concurrent processes
  • m6: Pyright parameter name mismatch in slow_execute test functions
  • m7: Type annotation on dynamic Behave context object
  • n1–n5: Various nits

Fixes applied:

  • M1: Collapsed decorator strings to single lines (CI unblocked)
  • m1: Wrapped submit() in try/except RuntimeError for shutdown detection
  • m2: Added dedicated except RuntimeError clause in _make_on_next_handler for "stopping"/"not running" at warning level
  • m3: Added timeout=10.0 to barrier.wait() and assert all threads completed
  • m4: Fixed double deep copy — both copies now from original input
  • m5: Added os.getpid() to checkpoint filename
  • m6: Renamed _state to state in slow_execute functions
  • m7: Removed type annotation from Behave context attribute
  • n1: Upgraded threading.Lock to threading.RLock in StateManager
  • n2–n5: All nits fixed

Cycle 8

Review findings (0C / 0M / 5m / 5n) — APPROVED

The reviewer approved the PR with the following minor observations (not blocking):

  • m1: Missing cancel() return-value warning in run_coroutine_threadsafe timeout path (inconsistency with thread pool path)
  • m2: Missing test for RuntimeError shutdown detection through stream path
  • m3: Missing test for cancel() returning False warning path
  • m4: Subscription disposables discarded — subscriptions cannot be disposed on stop()
  • m5: Fragile string matching for RuntimeError classification in _make_on_next_handler
  • n1–n5: Various nits (phantom PR description item, comment wording, cleanup guard, is_running GIL reliance, commit body notation)

Verdict: Approve — "The PR is ready to merge. The minor issues above are recommended for follow-up in a subsequent ticket or addressed in a quick amendment if the author prefers to close them before merge."


Remaining Issues (post-approval, recommended for follow-up)

  1. Missing cancel() return-value warning in run_coroutine_threadsafe timeout path
  2. Missing test for RuntimeError shutdown detection through stream path
  3. Missing test for cancel() returning False warning path
  4. Subscription disposables not stored/disposed on stop()
  5. Fragile string matching for RuntimeError classification

These are observability/robustness gaps, not correctness bugs. The core fix is correct and safe to merge.


Final Quality Gate Status

Gate Status
nox -e lint Pass
nox -e typecheck Pass (0 errors)
nox -e unit_tests Pass (15,286 scenarios)
nox -e integration_tests Pass (1,986 tests)
nox -e coverage_report 97.2%

Self-QA loop completed on 2026-04-20. Total: 8 review cycles, all quality gates pass.

## Self-QA Implementation Notes (Cycles 6–8) Continuation of the self-QA loop from the previous comment. Three additional cycles were run. The PR was **approved** at the end of Cycle 8. --- ### Cycle 6 **Review findings (0C / 0M / 6m / 8n):** - m1: CancelledError not caught in run_coroutine_threadsafe path — inconsistent error semantics with thread pool path - m2: Missing test for TimeoutError warning-level log through stream path (cycle-5 fix not fully verified) - m3: TOCTOU race between is_running guard and _executor_pool.submit() in start() - m4: Event loop cleanup not exception-safe in step_execute_graph - m5: Restart scenario thread pool not cleaned up on assertion failure - m6: start() can hang indefinitely on stuck executor tasks — undocumented - n1–n8: Various nits (redundant assertions, ruff format, cast() redundancy, emission ordering docs, ThreadPoolExecutor sizing comment, _cleanup_bg_loop guard, TODO without ticket reference) **Fixes applied:** - Added CancelledError catch in run_coroutine_threadsafe path; added BDD scenario - Added BDD scenario for TimeoutError warning-level log through stream path - start() now sets is_running=False before shutdown and True after new pool creation - step_execute_graph wrapped in try/finally for event loop cleanup - Restart scenario registers context.add_cleanup(context.graph.stop) - Added docstring to start() documenting hang limitation - All nits fixed; created follow-up issue #10799 for downstream propagation --- ### Cycle 7 **Review findings (0C / 1M / 7m / 5n):** - M1: Ruff format violation — two @given/@then decorator strings wrapped across multiple lines (CI lint blocker) - m1: TOCTOU race in sync_executor produces misleading log during shutdown - m2: CancelledError during shutdown logged at exception level (noisy) - m3: Concurrent test barrier has no timeout — could hang indefinitely - m4: Double deep copy in replace_state() is redundant - m5: Checkpoint filename collision across concurrent processes - m6: Pyright parameter name mismatch in slow_execute test functions - m7: Type annotation on dynamic Behave context object - n1–n5: Various nits **Fixes applied:** - M1: Collapsed decorator strings to single lines (CI unblocked) - m1: Wrapped submit() in try/except RuntimeError for shutdown detection - m2: Added dedicated except RuntimeError clause in _make_on_next_handler for "stopping"/"not running" at warning level - m3: Added timeout=10.0 to barrier.wait() and assert all threads completed - m4: Fixed double deep copy — both copies now from original input - m5: Added os.getpid() to checkpoint filename - m6: Renamed _state to state in slow_execute functions - m7: Removed type annotation from Behave context attribute - n1: Upgraded threading.Lock to threading.RLock in StateManager - n2–n5: All nits fixed --- ### Cycle 8 **Review findings (0C / 0M / 5m / 5n) — APPROVED** The reviewer approved the PR with the following minor observations (not blocking): - m1: Missing cancel() return-value warning in run_coroutine_threadsafe timeout path (inconsistency with thread pool path) - m2: Missing test for RuntimeError shutdown detection through stream path - m3: Missing test for cancel() returning False warning path - m4: Subscription disposables discarded — subscriptions cannot be disposed on stop() - m5: Fragile string matching for RuntimeError classification in _make_on_next_handler - n1–n5: Various nits (phantom PR description item, comment wording, cleanup guard, is_running GIL reliance, commit body notation) **Verdict: ✅ Approve** — "The PR is ready to merge. The minor issues above are recommended for follow-up in a subsequent ticket or addressed in a quick amendment if the author prefers to close them before merge." --- ### Remaining Issues (post-approval, recommended for follow-up) 1. Missing cancel() return-value warning in run_coroutine_threadsafe timeout path 2. Missing test for RuntimeError shutdown detection through stream path 3. Missing test for cancel() returning False warning path 4. Subscription disposables not stored/disposed on stop() 5. Fragile string matching for RuntimeError classification These are observability/robustness gaps, not correctness bugs. The core fix is correct and safe to merge. --- ### Final Quality Gate Status | Gate | Status | |------|--------| | `nox -e lint` | ✅ Pass | | `nox -e typecheck` | ✅ Pass (0 errors) | | `nox -e unit_tests` | ✅ Pass (15,286 scenarios) | | `nox -e integration_tests` | ✅ Pass (1,986 tests) | | `nox -e coverage_report` | ✅ 97.2% | *Self-QA loop completed on 2026-04-20. Total: 8 review cycles, all quality gates pass.*
hurui200320 2026-04-22 07:53:54 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
2 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Reference
cleveragents/cleveragents-core#6511
No description provided.