BUG-HUNT: [spec-alignment] StreamType.COLD creates a hot Subject, not a cold observable — subscribers who join after on_next miss all prior values #6688

Open
opened 2026-04-09 23:36:15 +00:00 by HAL9000 · 1 comment
Owner

Bug Report: [spec-alignment] StreamType.COLD is a misnomer — creates a hot Subject with no replay

Severity Assessment

  • Impact: Late-joining subscribers silently miss all values already emitted; multi-subscriber pipelines produce incomplete results with no error signal
  • Likelihood: Every stream configured with stream_type: cold (the default) that has more than one subscriber, or any subscriber that attaches after the first on_next
  • Priority: Medium

Location

  • File: src/cleveragents/reactive/stream_router.py
  • Function: ReactiveStreamRouter.create_stream and _create_builtin_streams
  • Lines: create_stream ~317–338; _create_builtin_streams ~256–280

Description

StreamConfig defines three stream types:

class StreamType(Enum):
    HOT = "hot"
    COLD = "cold"
    REPLAY = "replay"

create_stream maps these to RxPY subjects:

if config.type == StreamType.HOT:
    stream: Any = BehaviorSubject(config.initial_value)  # replays last value
elif config.type == StreamType.REPLAY:
    stream = ReplaySubject(buffer_size=config.buffer_size)  # replays N values
else:
    stream = Subject()  # plain hot subject — no replay at all

All three types create hot observables. In reactive programming, a cold
observable re-executes its producer factory for each new subscriber (e.g.,
Observable.create(lambda obs: obs.on_next(fetch())) — each subscriber gets
a fresh sequence). A Subject is the opposite: it is a multicast hot
source. Late subscribers receive only values emitted after they subscribed.

The naming StreamType.COLD therefore means the opposite of what it says:

Configured type Created object Actual hot/cold Expected cold behaviour
COLD Subject() Hot Each subscriber triggers its own sequence
HOT BehaviorSubject(v) Hot (replay-1)
REPLAY ReplaySubject(N) Hot (replay-N)

The built-in __input__, __output__, and __error__ streams also use
StreamType.COLD with plain Subject:

# _create_builtin_streams
input_stream = Subject()
self.stream_configs["__input__"] = StreamConfig(
    name="__input__", type=StreamType.COLD   # ← labelled COLD, is actually HOT
)

Evidence

# stream_router.py — create_stream:
else:
    stream = Subject()   # hot multicast subject, no replay

# _create_builtin_streams:
input_stream = Subject()
self.stream_configs["__input__"] = StreamConfig(
    name="__input__", type=StreamType.COLD   # mislabeled
)

Concrete impact scenario: A config-driven pipeline where a second route is
registered after the first message has already been emitted will use
subscribe() on a StreamType.COLD stream expecting to replay the missed
value (cold semantics). Instead it gets nothing — the Subject has already
fired and the new subscriber joined too late.

Similarly, any tooling or monitoring code that introspects
stream_configs[name].type == StreamType.COLD and infers "each subscriber
gets its own independent sequence" will behave incorrectly.

Expected Behavior

Either:

  1. StreamType.COLD should be backed by an Observable.create() factory
    that produces a new sequence per subscriber, matching standard reactive
    programming semantics for cold observables.
  2. Or the enum should be renamed to accurately reflect what it is:
    StreamType.BROADCAST / StreamType.MULTICAST (hot, no replay), with
    StreamType.HOT renamed to StreamType.STATEFUL (hot, replay-last).

Actual Behavior

StreamType.COLD creates a plain Subject — a hot, no-replay multicast
source. Late subscribers miss all prior emissions with no indication of
missed values.

Suggested Fix

Option A (rename for clarity, no behaviour change):
Rename StreamType.COLDStreamType.BROADCAST everywhere. This at least
removes the misleading name without changing runtime behaviour, allowing
callers to reason correctly about what they get.

Option B (implement true cold semantics):
Accept a factory: Callable[[], Observable] on StreamConfig and use it
when type == StreamType.COLD:

if config.type == StreamType.COLD and config.factory:
    stream = rx.create(config.factory)
else:
    stream = Subject()

Category

spec-alignment

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be
created for TDD. The test will use tags: @tdd_issue,
@tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug
exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [spec-alignment] `StreamType.COLD` is a misnomer — creates a hot `Subject` with no replay ### Severity Assessment - **Impact**: Late-joining subscribers silently miss all values already emitted; multi-subscriber pipelines produce incomplete results with no error signal - **Likelihood**: Every stream configured with `stream_type: cold` (the default) that has more than one subscriber, or any subscriber that attaches after the first `on_next` - **Priority**: Medium ### Location - **File**: `src/cleveragents/reactive/stream_router.py` - **Function**: `ReactiveStreamRouter.create_stream` and `_create_builtin_streams` - **Lines**: `create_stream` ~317–338; `_create_builtin_streams` ~256–280 ### Description `StreamConfig` defines three stream types: ```python class StreamType(Enum): HOT = "hot" COLD = "cold" REPLAY = "replay" ``` `create_stream` maps these to RxPY subjects: ```python if config.type == StreamType.HOT: stream: Any = BehaviorSubject(config.initial_value) # replays last value elif config.type == StreamType.REPLAY: stream = ReplaySubject(buffer_size=config.buffer_size) # replays N values else: stream = Subject() # plain hot subject — no replay at all ``` **All three types create hot observables.** In reactive programming, a *cold* observable re-executes its producer factory for each new subscriber (e.g., `Observable.create(lambda obs: obs.on_next(fetch()))` — each subscriber gets a fresh sequence). A `Subject` is the opposite: it is a multicast **hot** source. Late subscribers receive only values emitted after they subscribed. The naming `StreamType.COLD` therefore means the *opposite* of what it says: | Configured type | Created object | Actual hot/cold | Expected cold behaviour | |----------------|---------------|-----------------|------------------------| | `COLD` | `Subject()` | **Hot** | Each subscriber triggers its own sequence | | `HOT` | `BehaviorSubject(v)` | Hot (replay-1) | — | | `REPLAY` | `ReplaySubject(N)` | Hot (replay-N) | — | The built-in `__input__`, `__output__`, and `__error__` streams also use `StreamType.COLD` with plain `Subject`: ```python # _create_builtin_streams input_stream = Subject() self.stream_configs["__input__"] = StreamConfig( name="__input__", type=StreamType.COLD # ← labelled COLD, is actually HOT ) ``` ### Evidence ```python # stream_router.py — create_stream: else: stream = Subject() # hot multicast subject, no replay # _create_builtin_streams: input_stream = Subject() self.stream_configs["__input__"] = StreamConfig( name="__input__", type=StreamType.COLD # mislabeled ) ``` **Concrete impact scenario**: A config-driven pipeline where a second route is registered after the first message has already been emitted will use `subscribe()` on a `StreamType.COLD` stream expecting to replay the missed value (cold semantics). Instead it gets nothing — the Subject has already fired and the new subscriber joined too late. Similarly, any tooling or monitoring code that introspects `stream_configs[name].type == StreamType.COLD` and infers "each subscriber gets its own independent sequence" will behave incorrectly. ### Expected Behavior Either: 1. `StreamType.COLD` should be backed by an `Observable.create()` factory that produces a new sequence per subscriber, matching standard reactive programming semantics for cold observables. 2. Or the enum should be renamed to accurately reflect what it is: `StreamType.BROADCAST` / `StreamType.MULTICAST` (hot, no replay), with `StreamType.HOT` renamed to `StreamType.STATEFUL` (hot, replay-last). ### Actual Behavior `StreamType.COLD` creates a plain `Subject` — a hot, no-replay multicast source. Late subscribers miss all prior emissions with no indication of missed values. ### Suggested Fix **Option A (rename for clarity, no behaviour change)**: Rename `StreamType.COLD` → `StreamType.BROADCAST` everywhere. This at least removes the misleading name without changing runtime behaviour, allowing callers to reason correctly about what they get. **Option B (implement true cold semantics)**: Accept a `factory: Callable[[], Observable]` on `StreamConfig` and use it when `type == StreamType.COLD`: ```python if config.type == StreamType.COLD and config.factory: stream = rx.create(config.factory) else: stream = Subject() ``` ### Category spec-alignment ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: `@tdd_issue`, `@tdd_issue_<this-issue-number>`, and `@tdd_expected_fail` to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
HAL9000 added this to the v3.2.0 milestone 2026-04-09 23:40:29 +00:00
Author
Owner

Verified — Spec alignment bug: StreamType.COLD creates hot Subject instead of cold observable. MoSCoW: Should-have. Priority: High.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Spec alignment bug: StreamType.COLD creates hot Subject instead of cold observable. MoSCoW: Should-have. Priority: High. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6688
No description provided.