feat(streaming): add Executor.execute_stream() returning AsyncIterator[str] for token-by-token delivery #16

Open
opened 2026-06-03 06:00:17 +00:00 by hurui200320 · 0 comments
Member

Background

The actor lifecycle requires both non-streaming and streaming execution (step 6). Currently LLMAgent.process_message() calls ainvoke() only, which buffers the full LLM response before returning. There is no token-by-token delivery path anywhere in PureLangGraph or LLMAgent.

The CleverThis router needs to stream partial responses to end-users for a better experience on long-running LLM calls.

Spec reference: Actor Lifecycle Step 6 (streaming path)

Depends on: #13 (Executor must exist); #14 (recommended — last_result: ActorResult populated after stream for billing)

What Is Currently Missing

  • No execute_stream() method on Executor.
  • No astream() usage in LLMAgent or PureLangGraph.
  • No AsyncIterator path through the graph execution pipeline.

Acceptance Criteria

class Executor:
    async def execute_stream(self, message: str) -> AsyncIterator[str]: ...
    # after iterator is exhausted: self.last_result -> ActorResult
  1. LLMAgent gains stream_message(messages) -> AsyncIterator[str] using self.chat_model.astream(messages), yielding chunk.content per chunk.
  2. For multi-node graphs: tokens are yielded from the terminal node only; intermediate nodes use ainvoke().
  3. Token counts for streaming: collected from the final streaming chunk's usage_metadata where available; fallback to 0 with a warning log.
  4. After the stream is exhausted, executor.last_result exposes the ActorResult (for billing by the router).
  5. Execution limits from C6 apply: timeout_ms wraps the stream; max_model_calls/max_tool_calls counters increment as normal.
  6. execute_stream is accessible on the Executor object. No new top-level package export is required.

Subtasks

  • Add stream_message(messages) -> AsyncIterator[str] to LLMAgent using astream()
  • Add execute_stream(message) -> AsyncIterator[str] to Executor
  • Route terminal node through streaming path; intermediate nodes stay on ainvoke
  • Collect token counts from final stream chunk; store as executor.last_result (ActorResult) after stream exhaustion
  • Apply limit enforcement (timeout_ms, model/tool call counters) to streaming path
  • Write BDD/unit tests for streaming output (mock LangChain astream)
  • Write test confirming executor.last_result is populated after stream exhaustion
  • Verify all existing non-streaming tests still pass

Definition of Done

  • All subtasks checked off.
  • async for token in executor.execute_stream(msg) yields string tokens.
  • executor.last_result is an ActorResult after the stream finishes.
  • Limits are enforced in the streaming path.
  • All tests pass. Coverage at or above project threshold.
## Background The actor lifecycle requires both non-streaming and streaming execution (step 6). Currently `LLMAgent.process_message()` calls `ainvoke()` only, which buffers the full LLM response before returning. There is no token-by-token delivery path anywhere in `PureLangGraph` or `LLMAgent`. The CleverThis router needs to stream partial responses to end-users for a better experience on long-running LLM calls. **Spec reference:** Actor Lifecycle Step 6 (streaming path) **Depends on:** #13 (`Executor` must exist); #14 (recommended — `last_result: ActorResult` populated after stream for billing) ## What Is Currently Missing - No `execute_stream()` method on `Executor`. - No `astream()` usage in `LLMAgent` or `PureLangGraph`. - No `AsyncIterator` path through the graph execution pipeline. ## Acceptance Criteria ```python class Executor: async def execute_stream(self, message: str) -> AsyncIterator[str]: ... # after iterator is exhausted: self.last_result -> ActorResult ``` 1. `LLMAgent` gains `stream_message(messages) -> AsyncIterator[str]` using `self.chat_model.astream(messages)`, yielding `chunk.content` per chunk. 2. For multi-node graphs: tokens are yielded from the **terminal node** only; intermediate nodes use `ainvoke()`. 3. Token counts for streaming: collected from the final streaming chunk's `usage_metadata` where available; fallback to `0` with a warning log. 4. After the stream is exhausted, `executor.last_result` exposes the `ActorResult` (for billing by the router). 5. Execution limits from C6 apply: `timeout_ms` wraps the stream; `max_model_calls`/`max_tool_calls` counters increment as normal. 6. `execute_stream` is accessible on the `Executor` object. No new top-level package export is required. ## Subtasks - [ ] Add `stream_message(messages) -> AsyncIterator[str]` to `LLMAgent` using `astream()` - [ ] Add `execute_stream(message) -> AsyncIterator[str]` to `Executor` - [ ] Route terminal node through streaming path; intermediate nodes stay on `ainvoke` - [ ] Collect token counts from final stream chunk; store as `executor.last_result` (ActorResult) after stream exhaustion - [ ] Apply limit enforcement (timeout_ms, model/tool call counters) to streaming path - [ ] Write BDD/unit tests for streaming output (mock LangChain `astream`) - [ ] Write test confirming `executor.last_result` is populated after stream exhaustion - [ ] Verify all existing non-streaming tests still pass ## Definition of Done - All subtasks checked off. - `async for token in executor.execute_stream(msg)` yields string tokens. - `executor.last_result` is an `ActorResult` after the stream finishes. - Limits are enforced in the streaming path. - All tests pass. Coverage at or above project threshold.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Reference
cleveragents/cleveractors-core#16
No description provided.