BUG-HUNT: [concurrency] bridge.py _create_node_operator calls loop.create_task() on a non-running event loop — raises RuntimeError when no loop is active #6523

Open
opened 2026-04-09 21:14:56 +00:00 by HAL9000 · 0 comments
Owner

Bug Report: [concurrency] — create_task() Called on Non-Running Event Loop in _create_node_operator

Severity Assessment

  • Impact: When _create_node_operator is used outside of a running event loop context (e.g., in a new thread or at startup), the fallback path creates a new event loop but immediately calls loop.create_task() on it. create_task() requires the loop to be running; calling it on a non-running loop raises RuntimeError: This event loop is not running. The operator silently fails without executing the node.
  • Likelihood: Medium — triggered whenever the operator is invoked in a non-async context (e.g., synchronous stream processing pipeline, test setup).
  • Priority: Medium

Location

  • File: src/cleveragents/langgraph/bridge.py
  • Class: RxPyLangGraphBridge
  • Method: _create_node_operator
  • Lines: ~195–212

Description

The create_future_task closure in _create_node_operator has a fallback path that creates a new event loop when no running loop is detected. However, asyncio.new_event_loop() creates an idle (non-running) loop. Calling loop.create_task(execute_node(msg)) on a non-running loop raises RuntimeError: This event loop is not running.

Evidence

def create_future_task(msg: StreamMessage) -> Any:
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        # No running loop → create a new idle loop
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    return loop.create_task(execute_node(msg))   # <-- RuntimeError: loop is not running!

asyncio.new_event_loop() returns an event loop that is not running. loop.create_task() requires a running loop (loop.is_running() must be True). The correct approach in the fallback case is asyncio.run(execute_node(msg)) or loop.run_until_complete(execute_node(msg)).

Additionally, there is a secondary issue: asyncio.set_event_loop(loop) sets a new loop as the thread's current loop, but this loop is never started or closed, which leaks the event loop resource.

Expected Behavior

The fallback path should either:

  1. Run the coroutine synchronously: loop.run_until_complete(execute_node(msg))
  2. Or schedule it via asyncio.run(execute_node(msg)) which handles loop lifecycle correctly

Actual Behavior

When called outside a running event loop, loop.create_task() raises RuntimeError: This event loop is not running, the exception propagates through the RxPy pipeline, and the node is never executed.

Suggested Fix

def create_future_task(msg: StreamMessage) -> Any:
    try:
        loop = asyncio.get_running_loop()
        return loop.create_task(execute_node(msg))
    except RuntimeError:
        # No running loop — run synchronously
        return asyncio.run(execute_node(msg))

Or, if a Future must always be returned, wrap in a completed future:

    except RuntimeError:
        loop = asyncio.new_event_loop()
        try:
            result = loop.run_until_complete(execute_node(msg))
            f: asyncio.Future = loop.create_future()
            f.set_result(result)
            return f
        finally:
            loop.close()

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [concurrency] — `create_task()` Called on Non-Running Event Loop in `_create_node_operator` ### Severity Assessment - **Impact**: When `_create_node_operator` is used outside of a running event loop context (e.g., in a new thread or at startup), the fallback path creates a new event loop but immediately calls `loop.create_task()` on it. `create_task()` requires the loop to be **running**; calling it on a non-running loop raises `RuntimeError: This event loop is not running`. The operator silently fails without executing the node. - **Likelihood**: Medium — triggered whenever the operator is invoked in a non-async context (e.g., synchronous stream processing pipeline, test setup). - **Priority**: Medium ### Location - **File**: `src/cleveragents/langgraph/bridge.py` - **Class**: `RxPyLangGraphBridge` - **Method**: `_create_node_operator` - **Lines**: ~195–212 ### Description The `create_future_task` closure in `_create_node_operator` has a fallback path that creates a new event loop when no running loop is detected. However, `asyncio.new_event_loop()` creates an idle (non-running) loop. Calling `loop.create_task(execute_node(msg))` on a non-running loop raises `RuntimeError: This event loop is not running`. ### Evidence ```python def create_future_task(msg: StreamMessage) -> Any: try: loop = asyncio.get_running_loop() except RuntimeError: # No running loop → create a new idle loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.create_task(execute_node(msg)) # <-- RuntimeError: loop is not running! ``` `asyncio.new_event_loop()` returns an event loop that is **not running**. `loop.create_task()` requires a running loop (`loop.is_running()` must be `True`). The correct approach in the fallback case is `asyncio.run(execute_node(msg))` or `loop.run_until_complete(execute_node(msg))`. Additionally, there is a secondary issue: `asyncio.set_event_loop(loop)` sets a new loop as the thread's current loop, but this loop is never started or closed, which leaks the event loop resource. ### Expected Behavior The fallback path should either: 1. Run the coroutine synchronously: `loop.run_until_complete(execute_node(msg))` 2. Or schedule it via `asyncio.run(execute_node(msg))` which handles loop lifecycle correctly ### Actual Behavior When called outside a running event loop, `loop.create_task()` raises `RuntimeError: This event loop is not running`, the exception propagates through the RxPy pipeline, and the node is never executed. ### Suggested Fix ```python def create_future_task(msg: StreamMessage) -> Any: try: loop = asyncio.get_running_loop() return loop.create_task(execute_node(msg)) except RuntimeError: # No running loop — run synchronously return asyncio.run(execute_node(msg)) ``` Or, if a `Future` must always be returned, wrap in a completed future: ```python except RuntimeError: loop = asyncio.new_event_loop() try: result = loop.run_until_complete(execute_node(msg)) f: asyncio.Future = loop.create_future() f.set_result(result) return f finally: loop.close() ``` ### Category `concurrency` ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 21:27:54 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6523
No description provided.