BUG-HUNT: [concurrency] RxPyLangGraphBridge._create_graph_executor uses deprecated asyncio.get_event_loop() in sync context — raises RuntimeError on Python 3.12+ when called from a non-asyncio thread #6756

Open
opened 2026-04-10 01:59:30 +00:00 by HAL9000 · 2 comments
Owner

Bug Report: [concurrency] — Deprecated asyncio.get_event_loop() in create_future_task

Severity Assessment

  • Impact: create_future_task raises RuntimeError: There is no current event loop in thread when invoked from a non-asyncio thread (e.g., RxPy's default thread-pool scheduler), causing the graph executor pipeline to fail with an unhandled exception on Python 3.12+
  • Likelihood: High — RxPy flat_map callbacks run in a thread pool by default; calling asyncio.get_event_loop() from that thread raises RuntimeError on Python 3.12+ per PEP 647
  • Priority: High

Location

  • File: src/cleveragents/langgraph/bridge.py
  • Function/Class: RxPyLangGraphBridge._create_graph_executor() → inner create_future_task()
  • Lines: ~227–234

Description

_create_graph_executor() builds a create_future_task closure that calls asyncio.get_event_loop() unconditionally. This call is made from inside an RxPy ops.flat_map() operator, which invokes the closure synchronously in a thread-pool context (not from within an asyncio event loop).

On Python 3.10, asyncio.get_event_loop() emits a DeprecationWarning when called from a thread that has no current event loop and auto-creates one (with a deprecation notice). On Python 3.12+, the auto-creation was removed and asyncio.get_event_loop() raises RuntimeError if there is no running event loop in the current thread, per Python's asyncio policy changes.

Additionally, asyncio.ensure_future() requires a running event loop; calling it with loop=loop where loop was obtained from the deprecated get_event_loop() API can schedule the coroutine on a stale/detached event loop that is not running.

The _create_node_operator method (lines ~294–300) correctly uses asyncio.get_running_loop() with a fallback to asyncio.new_event_loop(), but _create_graph_executor still uses the deprecated pattern.

Evidence

# src/cleveragents/langgraph/bridge.py, lines 225-234
def create_future_task(msg: StreamMessage) -> asyncio.Future[StreamMessage]:
    loop = asyncio.get_event_loop()   # <-- DeprecationWarning on 3.10, RuntimeError on 3.12+
    task: asyncio.Future[StreamMessage] = asyncio.ensure_future(
        execute_graph(msg), loop=loop
    )
    self._active_tasks.add(task)
    task.add_done_callback(
        lambda t: self._active_tasks.discard(cast(asyncio.Task[Any], t))
    )
    return task

return ops.flat_map(lambda msg: rx.from_future(create_future_task(msg)))

Compare with the correct pattern already used in the same file:

# src/cleveragents/langgraph/bridge.py, lines 294-300 (_create_node_operator)
def create_future_task(msg: StreamMessage) -> Any:
    try:
        loop = asyncio.get_running_loop()   # <-- correct
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    return loop.create_task(execute_node(msg))

Expected Behavior

create_future_task should either:

  1. Use asyncio.get_running_loop() (the non-deprecated API) and raise a clear error if there is no running loop, OR
  2. Use the same try/except fallback pattern used in _create_node_operator, OR
  3. Dispatch the coroutine to the event loop via asyncio.run_coroutine_threadsafe() when called from a non-asyncio thread.

Actual Behavior

On Python 3.12+, asyncio.get_event_loop() raises RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-N_M', causing the entire graph executor pipeline to fail immediately on the first flat_map emission. On Python 3.10–3.11 it emits a DeprecationWarning and may schedule onto an orphaned event loop.

Suggested Fix

Replace the deprecated pattern in create_future_task inside _create_graph_executor:

def create_future_task(msg: StreamMessage) -> asyncio.Future[StreamMessage]:
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    task = loop.create_task(execute_graph(msg))
    self._active_tasks.add(task)
    task.add_done_callback(
        lambda t: self._active_tasks.discard(cast(asyncio.Task[Any], t))
    )
    return task

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [concurrency] — Deprecated `asyncio.get_event_loop()` in `create_future_task` ### Severity Assessment - **Impact**: `create_future_task` raises `RuntimeError: There is no current event loop in thread` when invoked from a non-asyncio thread (e.g., RxPy's default thread-pool scheduler), causing the graph executor pipeline to fail with an unhandled exception on Python 3.12+ - **Likelihood**: High — RxPy `flat_map` callbacks run in a thread pool by default; calling `asyncio.get_event_loop()` from that thread raises `RuntimeError` on Python 3.12+ per PEP 647 - **Priority**: High ### Location - **File**: `src/cleveragents/langgraph/bridge.py` - **Function/Class**: `RxPyLangGraphBridge._create_graph_executor()` → inner `create_future_task()` - **Lines**: ~227–234 ### Description `_create_graph_executor()` builds a `create_future_task` closure that calls `asyncio.get_event_loop()` unconditionally. This call is made from inside an RxPy `ops.flat_map()` operator, which invokes the closure synchronously in a thread-pool context (not from within an asyncio event loop). On **Python 3.10**, `asyncio.get_event_loop()` emits a `DeprecationWarning` when called from a thread that has no current event loop and auto-creates one (with a deprecation notice). On **Python 3.12+**, the auto-creation was removed and `asyncio.get_event_loop()` raises `RuntimeError` if there is no running event loop in the current thread, per Python's asyncio policy changes. Additionally, `asyncio.ensure_future()` requires a running event loop; calling it with `loop=loop` where `loop` was obtained from the deprecated `get_event_loop()` API can schedule the coroutine on a stale/detached event loop that is not running. The `_create_node_operator` method (lines ~294–300) correctly uses `asyncio.get_running_loop()` with a fallback to `asyncio.new_event_loop()`, but `_create_graph_executor` still uses the deprecated pattern. ### Evidence ```python # src/cleveragents/langgraph/bridge.py, lines 225-234 def create_future_task(msg: StreamMessage) -> asyncio.Future[StreamMessage]: loop = asyncio.get_event_loop() # <-- DeprecationWarning on 3.10, RuntimeError on 3.12+ task: asyncio.Future[StreamMessage] = asyncio.ensure_future( execute_graph(msg), loop=loop ) self._active_tasks.add(task) task.add_done_callback( lambda t: self._active_tasks.discard(cast(asyncio.Task[Any], t)) ) return task return ops.flat_map(lambda msg: rx.from_future(create_future_task(msg))) ``` Compare with the correct pattern already used in the same file: ```python # src/cleveragents/langgraph/bridge.py, lines 294-300 (_create_node_operator) def create_future_task(msg: StreamMessage) -> Any: try: loop = asyncio.get_running_loop() # <-- correct except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.create_task(execute_node(msg)) ``` ### Expected Behavior `create_future_task` should either: 1. Use `asyncio.get_running_loop()` (the non-deprecated API) and raise a clear error if there is no running loop, OR 2. Use the same try/except fallback pattern used in `_create_node_operator`, OR 3. Dispatch the coroutine to the event loop via `asyncio.run_coroutine_threadsafe()` when called from a non-asyncio thread. ### Actual Behavior On Python 3.12+, `asyncio.get_event_loop()` raises `RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-N_M'`, causing the entire graph executor pipeline to fail immediately on the first `flat_map` emission. On Python 3.10–3.11 it emits a `DeprecationWarning` and may schedule onto an orphaned event loop. ### Suggested Fix Replace the deprecated pattern in `create_future_task` inside `_create_graph_executor`: ```python def create_future_task(msg: StreamMessage) -> asyncio.Future[StreamMessage]: try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task = loop.create_task(execute_graph(msg)) self._active_tasks.add(task) task.add_done_callback( lambda t: self._active_tasks.discard(cast(asyncio.Task[Any], t)) ) return task ``` ### Category concurrency ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
Author
Owner

Label compliance fix applied: Added missing State/Unverified label per CONTRIBUTING.md requirements.


Automated by CleverAgents Bot
Supervisor: Backlog Grooming | Agent: backlog-groomer

Label compliance fix applied: Added missing `State/Unverified` label per CONTRIBUTING.md requirements. --- **Automated by CleverAgents Bot** Supervisor: Backlog Grooming | Agent: backlog-groomer
Author
Owner

Verified — Concurrency bug: deprecated asyncio.get_event_loop() raises RuntimeError on Python 3.12+. MoSCoW: Must-have. Priority: High.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: deprecated asyncio.get_event_loop() raises RuntimeError on Python 3.12+. MoSCoW: Must-have. Priority: High. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6756
No description provided.