BUG-HUNT: [resource] switch/conditional_route operator's flat_map to a Subject accumulates persistent inner subscriptions — message count multiplies on every matching switch #6696

Open
opened 2026-04-09 23:39:57 +00:00 by HAL9000 · 1 comment
Owner

Bug Report: [resource] switch/conditional_route flat_map to Subject creates permanent dangling subscriptions that compound on each matching message

Severity Assessment

  • Impact: Each message that matches a switch case adds a new persistent subscription to the target Subject; after N matching messages, every subsequent target emission is delivered N+1 times — message duplication grows unboundedly; memory and CPU leak
  • Likelihood: Any pipeline that uses the switch or conditional_route operator with a target stream and receives more than one matching message
  • Priority: High

Location

  • File: src/cleveragents/reactive/stream_router.py
  • Function: ReactiveStreamRouter._create_operatorswitch/conditional_route branch
  • Lines: ~360–390

Description

The switch/conditional_route operator, when a case matches and has a
target stream (with no case_operators), builds an inner Observable by
calling flat_map on the target Subject:

if target_stream and target_stream in self.streams:
    target = self.streams[target_stream]
    return rx.just(msg).pipe(
        ops.flat_map(lambda _m, tgt=target: tgt)
    )

ops.flat_map(lambda _m: tgt) subscribes to tgt (a Subject) as an inner
Observable and forwards every future emission from tgt to the outer
subscriber. This inner subscription is created fresh for every matching
message
and is never disposed.

What happens after N matching messages:

  • Message 1 matches → flat_map subscribes to tgt → subscription #1 created
  • Message 2 matches → flat_map subscribes to tgt → subscription #2 created
  • ...
  • Message N matches → flat_map subscribes to tgt → subscription #N created

Every emission on tgt after that is now forwarded N times downstream (once
per dangling subscription). After just 10 matching messages, an agent is
called 10 times per real item.

Furthermore, since tgt is a Subject that never calls on_completed(), the
inner subscription from flat_map never terminates on its own. It is also
not tracked in self.subscriptions, so ReactiveStreamRouter.dispose()
cannot clean it up (this compounds the leak described in #6540).

Evidence

# stream_router.py — switch/conditional_route branch of _create_operator:
def switch_mapper(msg: StreamMessage) -> ObservableType:
    for case in cases:
        condition = case.get("condition", {})
        target_stream = case.get("target")
        case_operators = case.get("operators", [])

        if self._evaluate_condition(msg, condition):
            if case_operators:
                ...
            if target_stream and target_stream in self.streams:
                target = self.streams[target_stream]
                return rx.just(msg).pipe(
                    ops.flat_map(lambda _m, tgt=target: tgt)
                    # ↑ new subscription to `tgt` (Subject) on EVERY match
                    # ↑ never disposed, not tracked in self.subscriptions
                )
    ...

return ops.flat_map(switch_mapper)

Sequence demonstrating duplication:

t=0: subscribe pipeline (0 inner subs to target)
t=1: msg_A matches case → flat_map subscribes to target (1 inner sub)
t=2: msg_B matches case → flat_map subscribes to target (2 inner subs)
t=3: target.on_next(X)  → downstream receives X × 2 (from both subs)
t=4: msg_C matches case → flat_map subscribes to target (3 inner subs)
t=5: target.on_next(Y)  → downstream receives Y × 3

Expected Behavior

The switch operator should route the matched message to the target stream
by calling target.on_next(msg) directly, without creating a new inner
subscription each time. The operator is supposed to forward the current
message to the target, not subscribe indefinitely to all future emissions from
the target.

Actual Behavior

Each matching message creates a persistent inner subscription to the target
Subject. Over time, every target emission is duplicated once per prior match.
These subscriptions are not tracked and not disposed on dispose().

Suggested Fix

Replace the flat_map-to-Subject pattern with a direct on_next forwarding
and rx.just(msg) passthrough. The switch case should send the message to
the target stream as a side-effect, not subscribe to it:

if target_stream and target_stream in self.streams:
    target = self.streams[target_stream]
    # Forward msg to target as a side-effect, return msg to continue chain
    def _forward(m: Any, tgt: Any = target) -> Any:
        tgt.on_next(m)
        return m
    return ops.map(_forward)

Or if the intent is to redirect processing entirely to the target's operator
chain and not continue in the current pipeline, use rx.empty() after
forwarding:

def _redirect(m: Any, tgt: Any = target) -> Observable:
    tgt.on_next(m)
    return rx.empty()   # no output from this branch in the outer pipeline

return ops.flat_map(_redirect)

Category

resource

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be
created for TDD. The test will use tags: @tdd_issue,
@tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug
exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [resource] `switch`/`conditional_route` `flat_map` to Subject creates permanent dangling subscriptions that compound on each matching message ### Severity Assessment - **Impact**: Each message that matches a `switch` case adds a new persistent subscription to the target `Subject`; after N matching messages, every subsequent target emission is delivered N+1 times — message duplication grows unboundedly; memory and CPU leak - **Likelihood**: Any pipeline that uses the `switch` or `conditional_route` operator with a `target` stream and receives more than one matching message - **Priority**: High ### Location - **File**: `src/cleveragents/reactive/stream_router.py` - **Function**: `ReactiveStreamRouter._create_operator` — `switch`/`conditional_route` branch - **Lines**: ~360–390 ### Description The `switch`/`conditional_route` operator, when a case matches and has a `target` stream (with no `case_operators`), builds an inner Observable by calling `flat_map` on the target `Subject`: ```python if target_stream and target_stream in self.streams: target = self.streams[target_stream] return rx.just(msg).pipe( ops.flat_map(lambda _m, tgt=target: tgt) ) ``` `ops.flat_map(lambda _m: tgt)` subscribes to `tgt` (a `Subject`) as an inner Observable and forwards every future emission from `tgt` to the outer subscriber. This inner subscription is **created fresh for every matching message** and is **never disposed**. **What happens after N matching messages**: - Message 1 matches → `flat_map` subscribes to `tgt` → subscription #1 created - Message 2 matches → `flat_map` subscribes to `tgt` → subscription #2 created - ... - Message N matches → `flat_map` subscribes to `tgt` → subscription #N created Every emission on `tgt` after that is now forwarded N times downstream (once per dangling subscription). After just 10 matching messages, an agent is called 10 times per real item. Furthermore, since `tgt` is a `Subject` that never calls `on_completed()`, the inner subscription from `flat_map` never terminates on its own. It is also **not tracked in `self.subscriptions`**, so `ReactiveStreamRouter.dispose()` cannot clean it up (this compounds the leak described in #6540). ### Evidence ```python # stream_router.py — switch/conditional_route branch of _create_operator: def switch_mapper(msg: StreamMessage) -> ObservableType: for case in cases: condition = case.get("condition", {}) target_stream = case.get("target") case_operators = case.get("operators", []) if self._evaluate_condition(msg, condition): if case_operators: ... if target_stream and target_stream in self.streams: target = self.streams[target_stream] return rx.just(msg).pipe( ops.flat_map(lambda _m, tgt=target: tgt) # ↑ new subscription to `tgt` (Subject) on EVERY match # ↑ never disposed, not tracked in self.subscriptions ) ... return ops.flat_map(switch_mapper) ``` **Sequence demonstrating duplication**: ``` t=0: subscribe pipeline (0 inner subs to target) t=1: msg_A matches case → flat_map subscribes to target (1 inner sub) t=2: msg_B matches case → flat_map subscribes to target (2 inner subs) t=3: target.on_next(X) → downstream receives X × 2 (from both subs) t=4: msg_C matches case → flat_map subscribes to target (3 inner subs) t=5: target.on_next(Y) → downstream receives Y × 3 ``` ### Expected Behavior The `switch` operator should route the matched message to the target stream by calling `target.on_next(msg)` directly, without creating a new inner subscription each time. The operator is supposed to forward the *current* message to the target, not subscribe indefinitely to all future emissions from the target. ### Actual Behavior Each matching message creates a persistent inner subscription to the target `Subject`. Over time, every target emission is duplicated once per prior match. These subscriptions are not tracked and not disposed on `dispose()`. ### Suggested Fix Replace the `flat_map`-to-Subject pattern with a direct `on_next` forwarding and `rx.just(msg)` passthrough. The switch case should *send* the message to the target stream as a side-effect, not *subscribe* to it: ```python if target_stream and target_stream in self.streams: target = self.streams[target_stream] # Forward msg to target as a side-effect, return msg to continue chain def _forward(m: Any, tgt: Any = target) -> Any: tgt.on_next(m) return m return ops.map(_forward) ``` Or if the intent is to redirect processing entirely to the target's operator chain and not continue in the current pipeline, use `rx.empty()` after forwarding: ```python def _redirect(m: Any, tgt: Any = target) -> Observable: tgt.on_next(m) return rx.empty() # no output from this branch in the outer pipeline return ops.flat_map(_redirect) ``` ### Category resource ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
Author
Owner

Verified — Resource leak: switch/conditional_route operator accumulates persistent inner subscriptions. MoSCoW: Should-have. Priority: Medium.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Resource leak: switch/conditional_route operator accumulates persistent inner subscriptions. MoSCoW: Should-have. Priority: Medium. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6696
No description provided.