feat(a2a): implement SSE streaming for task updates and artifacts #875

Open
opened 2026-03-13 22:59:12 +00:00 by freemo · 1 comment
Owner

Metadata

  • Commit Message: feat(a2a): implement SSE streaming for task updates and artifacts
  • Branch: feature/m6-a2a-sse-streaming

Background and Context

The A2A protocol specification defines Server-Sent Events (SSE) for streaming task status updates and artifact delivery. The spec defines two event types: TaskStatusUpdateEvent and TaskArtifactUpdateEvent, delivered via SSE over the HTTP transport.

Currently, an in-memory A2aEventQueue exists with subscribe_local() (Python callback) and subscribe_remote() (raises A2aNotAvailableError). There is no HTTP SSE endpoint, no text/event-stream content type handling, and no EventSource protocol implementation.

Expected Behavior

  • SSE endpoint at the A2A HTTP transport that streams events to subscribed clients
  • TaskStatusUpdateEvent emitted on plan phase transitions, decision creation, and completion
  • TaskArtifactUpdateEvent emitted when plans produce artifacts (diffs, files, reports)
  • Local mode: events delivered via A2aEventQueue callbacks (already partial)
  • Server mode: events delivered via HTTP SSE

Acceptance Criteria

  • SSE endpoint returns text/event-stream content type
  • TaskStatusUpdateEvent events are emitted on plan lifecycle transitions
  • TaskArtifactUpdateEvent events are emitted when artifacts are produced
  • Clients can subscribe/unsubscribe to event streams
  • Events include A2A-compliant task IDs and metadata
  • Local mode event queue works for in-process subscribers
  • Integration with the A2A HTTP transport (#692)

Subtasks

  • Define SSE event schema aligned with A2A protocol
  • Implement SSE endpoint in the HTTP transport layer
  • Emit TaskStatusUpdateEvent from plan lifecycle service
  • Emit TaskArtifactUpdateEvent from plan execution
  • Wire A2aEventQueue to emit events to SSE subscribers
  • Tests (Behave): Add scenarios for event subscription and delivery
  • Tests (Robot): Integration test for SSE over HTTP
  • Verify coverage >= 97% via nox -s coverage_report
  • Run nox (all default sessions), fix any errors

Definition of Done

This issue is complete when:

  • All subtasks above are completed and checked off.
  • A Git commit is created where the first line of the commit message matches the Commit Message in Metadata exactly, followed by a blank line, then additional lines providing relevant details about the implementation.
  • The commit is pushed to the remote on the branch matching the Branch in Metadata exactly.
  • The commit is submitted as a pull request to master, reviewed, and merged before this issue is marked done.
## Metadata - **Commit Message**: `feat(a2a): implement SSE streaming for task updates and artifacts` - **Branch**: `feature/m6-a2a-sse-streaming` ## Background and Context The A2A protocol specification defines Server-Sent Events (SSE) for streaming task status updates and artifact delivery. The spec defines two event types: `TaskStatusUpdateEvent` and `TaskArtifactUpdateEvent`, delivered via SSE over the HTTP transport. Currently, an in-memory `A2aEventQueue` exists with `subscribe_local()` (Python callback) and `subscribe_remote()` (raises `A2aNotAvailableError`). There is no HTTP SSE endpoint, no `text/event-stream` content type handling, and no EventSource protocol implementation. ## Expected Behavior - SSE endpoint at the A2A HTTP transport that streams events to subscribed clients - `TaskStatusUpdateEvent` emitted on plan phase transitions, decision creation, and completion - `TaskArtifactUpdateEvent` emitted when plans produce artifacts (diffs, files, reports) - Local mode: events delivered via `A2aEventQueue` callbacks (already partial) - Server mode: events delivered via HTTP SSE ## Acceptance Criteria - [ ] SSE endpoint returns `text/event-stream` content type - [ ] `TaskStatusUpdateEvent` events are emitted on plan lifecycle transitions - [ ] `TaskArtifactUpdateEvent` events are emitted when artifacts are produced - [ ] Clients can subscribe/unsubscribe to event streams - [ ] Events include A2A-compliant task IDs and metadata - [ ] Local mode event queue works for in-process subscribers - [ ] Integration with the A2A HTTP transport (#692) ## Subtasks - [ ] Define SSE event schema aligned with A2A protocol - [ ] Implement SSE endpoint in the HTTP transport layer - [ ] Emit `TaskStatusUpdateEvent` from plan lifecycle service - [ ] Emit `TaskArtifactUpdateEvent` from plan execution - [ ] Wire `A2aEventQueue` to emit events to SSE subscribers - [ ] Tests (Behave): Add scenarios for event subscription and delivery - [ ] Tests (Robot): Integration test for SSE over HTTP - [ ] Verify coverage >= 97% via `nox -s coverage_report` - [ ] Run `nox` (all default sessions), fix any errors ## Definition of Done This issue is complete when: - All subtasks above are completed and checked off. - A Git commit is created where the **first line** of the commit message matches the Commit Message in Metadata exactly, followed by a blank line, then additional lines providing relevant details about the implementation. - The commit is pushed to the remote on the branch matching the **Branch** in Metadata exactly. - The commit is submitted as a **pull request** to `master`, reviewed, and **merged** before this issue is marked done.
freemo added this to the v3.6.0 milestone 2026-03-13 23:00:10 +00:00
freemo self-assigned this 2026-03-14 04:27:30 +00:00
Author
Owner

Implementation Notes — Day 37

Architecture

Three new components in a2a/events.py:

  1. SseEventFormatter — stateless formatter converting A2aEvent to text/event-stream strings
  2. EventBusBridge — subscriber that translates DomainEvent→A2aEvent with type mapping
  3. SSE type constants — TASK_STATUS_UPDATE, TASK_ARTIFACT_UPDATE

Key Code Locations

  • SseEventFormatter.format() / format_keepalive()src/cleveragents/a2a/events.py (commit da4f8202)
  • EventBusBridge._on_domain_event() — same file, translation logic
  • _STATUS_EVENT_TYPES / _ARTIFACT_EVENT_TYPES — same file, mapping tables

Quality Gates

Lint: | Typecheck: 0 errors | Unit tests: 387 features, 11115 scenarios | Integration: all passed | Coverage: 97%

## Implementation Notes — Day 37 ### Architecture Three new components in `a2a/events.py`: 1. **SseEventFormatter** — stateless formatter converting A2aEvent to text/event-stream strings 2. **EventBusBridge** — subscriber that translates DomainEvent→A2aEvent with type mapping 3. **SSE type constants** — TASK_STATUS_UPDATE, TASK_ARTIFACT_UPDATE ### Key Code Locations - `SseEventFormatter.format()` / `format_keepalive()` — `src/cleveragents/a2a/events.py` (commit da4f8202) - `EventBusBridge._on_domain_event()` — same file, translation logic - `_STATUS_EVENT_TYPES` / `_ARTIFACT_EVENT_TYPES` — same file, mapping tables ### Quality Gates Lint: ✅ | Typecheck: ✅ 0 errors | Unit tests: ✅ 387 features, 11115 scenarios | Integration: ✅ all passed | Coverage: ✅ 97%
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Blocks
#397 Epic: Server & Autonomy Infrastructure
cleveragents/cleveragents-core
Depends on
Reference
cleveragents/cleveragents-core#875
No description provided.