feature/m6plus-event-bus #509

Merged
aditya merged 6 commits from feature/m6plus-event-bus into master 2026-03-06 13:29:04 +00:00
Member

Description

Implements a general-purpose domain event system for CleverAgents as defined in
docs/specification.md §Event-Driven Architecture. Adds EventType, DomainEvent,
EventBus Protocol, ReactiveEventBus (RxPY-backed), and LoggingEventBus
(structlog-backed) under src/cleveragents/infrastructure/events/. Wires
DecisionService and PlanLifecycleService to emit domain events on significant
state changes, and registers ReactiveEventBus as a Singleton in the DI container.

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Refactoring (no functional changes)
  • Documentation update
  • Test improvements
  • CI/CD changes

Quality Checklist

  • Code follows the project's coding standards (see CONTRIBUTING.md)
  • All public/protected methods have argument validation
  • Static typing is complete (no Any unless justified — details: dict[str, Any] is justified as an open event payload field per spec)
  • nox -s typecheck passes with no errors
  • nox -s lint passes with no errors
  • Unit tests written/updated (Behave scenarios in features/)
  • Integration tests written/updated (Robot suites in robot/) if applicable
  • Coverage remains above 85% (nox -s coverage_report) — achieved 100% on all new files
  • No security issues introduced (nox -s security_scan)
  • No dead code introduced (nox -s dead_code)
  • Documentation updated if behavior changed

Testing

All tests were written first (TDD) before implementation. The full event bus
surface is covered at 100% branch coverage across all six new source files
(27 scenarios, 75 steps passing).

Test Commands Run

nox -s unit_tests        # 27 scenarios, 75 steps — all passed
nox -s integration_tests # 9 Robot Framework smoke cases — all passed
nox -s coverage_report   # 100% line + branch coverage on all new source files
nox -s typecheck         # 0 errors, 0 warnings
nox -s lint              # all checks passed
nox -s security_scan     # 0 issues
nox -s dead_code         # clean

ISSUES CLOSED #473

Implementation Notes

New package: src/cleveragents/infrastructure/events/

File Purpose
types.py EventType StrEnum — 38 dot-separated identifiers across 8 domains
models.py DomainEvent frozen Pydantic model with auto-UTC timestamp and auto-ULID correlation_id
protocol.py @runtime_checkable EventBus Protocol with emit() and subscribe()
reactive.py ReactiveEventBus — RxPY Subject-backed bus; dual-dispatch (stream + callbacks)
logging_bus.py LoggingEventBus — structlog-only bus; drop-in replacement requiring no RxPY
__init__.py Clean re-export of all five public symbols

Design decisions

  • Optional event_bus parameter on services — both DecisionService and
    PlanLifecycleService accept event_bus: EventBus | None = None. When None,
    emission is silently skipped. This preserves full backward compatibility with
    existing callers and tests that construct services without a bus.
  • No # type: ignore in sourcerx.Observable is imported directly from
    rx.core.observable.observable rather than rx.Observable (which pyright flags
    as reportPrivateImportUsage). All source files pass pyright strict mode with
    0 errors and 0 warnings and without any suppression comments.
  • Dual-dispatch in ReactiveEventBus — every emit() call pushes to the RxPY
    Subject stream (for advanced operators like filter, debounce) and invokes
    registered callbacks synchronously. Plain callback subscribers work with no RxPY
    knowledge; reactive subscribers use bus.stream directly.
  • LoggingEventBus as an alternative — useful for audit-trail-only contexts
    (e.g., CLI invocations, serverless) where an RxPY dependency is undesirable.
  • All files under 500 lines — largest file is event_bus_steps.py at 476 lines,
    compliant with the project's modular design guideline.

Files added

src/cleveragents/infrastructure/events/__init__.py
src/cleveragents/infrastructure/events/types.py
src/cleveragents/infrastructure/events/models.py
src/cleveragents/infrastructure/events/protocol.py
src/cleveragents/infrastructure/events/reactive.py
src/cleveragents/infrastructure/events/logging_bus.py
features/event_bus.feature
features/steps/event_bus_steps.py
robot/event_bus.robot
robot/helper_event_bus.py
benchmarks/event_bus_bench.py
docs/reference/event_bus.md

Files modified

src/cleveragents/application/container.py                       — added event_bus Singleton provider
src/cleveragents/application/services/decision_service.py       — emit DECISION_CREATED
src/cleveragents/application/services/plan_lifecycle_service.py — emit PLAN_CREATED, PLAN_PHASE_CHANGED
CHANGELOG.md                                                    — added Unreleased entry for #473
## Description Implements a general-purpose domain event system for CleverAgents as defined in `docs/specification.md` §Event-Driven Architecture. Adds `EventType`, `DomainEvent`, `EventBus` Protocol, `ReactiveEventBus` (RxPY-backed), and `LoggingEventBus` (structlog-backed) under `src/cleveragents/infrastructure/events/`. Wires `DecisionService` and `PlanLifecycleService` to emit domain events on significant state changes, and registers `ReactiveEventBus` as a Singleton in the DI container. ## Type of Change - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Refactoring (no functional changes) - [x] Documentation update - [x] Test improvements - [ ] CI/CD changes ## Quality Checklist - [x] Code follows the project's coding standards (see CONTRIBUTING.md) - [x] All public/protected methods have argument validation - [x] Static typing is complete (no `Any` unless justified — `details: dict[str, Any]` is justified as an open event payload field per spec) - [x] `nox -s typecheck` passes with no errors - [x] `nox -s lint` passes with no errors - [x] Unit tests written/updated (Behave scenarios in `features/`) - [x] Integration tests written/updated (Robot suites in `robot/`) if applicable - [x] Coverage remains above 85% (`nox -s coverage_report`) — achieved 100% on all new files - [x] No security issues introduced (`nox -s security_scan`) - [x] No dead code introduced (`nox -s dead_code`) - [x] Documentation updated if behavior changed ## Testing All tests were written first (TDD) before implementation. The full event bus surface is covered at 100% branch coverage across all six new source files (27 scenarios, 75 steps passing). ### Test Commands Run ```bash nox -s unit_tests # 27 scenarios, 75 steps — all passed nox -s integration_tests # 9 Robot Framework smoke cases — all passed nox -s coverage_report # 100% line + branch coverage on all new source files nox -s typecheck # 0 errors, 0 warnings nox -s lint # all checks passed nox -s security_scan # 0 issues nox -s dead_code # clean ``` ## Related Issues ISSUES CLOSED #473 ## Implementation Notes ### New package: `src/cleveragents/infrastructure/events/` | File | Purpose | |---|---| | `types.py` | `EventType` StrEnum — 38 dot-separated identifiers across 8 domains | | `models.py` | `DomainEvent` frozen Pydantic model with auto-UTC timestamp and auto-ULID `correlation_id` | | `protocol.py` | `@runtime_checkable EventBus` Protocol with `emit()` and `subscribe()` | | `reactive.py` | `ReactiveEventBus` — RxPY `Subject`-backed bus; dual-dispatch (stream + callbacks) | | `logging_bus.py` | `LoggingEventBus` — structlog-only bus; drop-in replacement requiring no RxPY | | `__init__.py` | Clean re-export of all five public symbols | ### Design decisions - **Optional `event_bus` parameter on services** — both `DecisionService` and `PlanLifecycleService` accept `event_bus: EventBus | None = None`. When `None`, emission is silently skipped. This preserves full backward compatibility with existing callers and tests that construct services without a bus. - **No `# type: ignore` in source** — `rx.Observable` is imported directly from `rx.core.observable.observable` rather than `rx.Observable` (which pyright flags as `reportPrivateImportUsage`). All source files pass pyright strict mode with 0 errors and 0 warnings and without any suppression comments. - **Dual-dispatch in `ReactiveEventBus`** — every `emit()` call pushes to the RxPY `Subject` stream (for advanced operators like `filter`, `debounce`) *and* invokes registered callbacks synchronously. Plain callback subscribers work with no RxPY knowledge; reactive subscribers use `bus.stream` directly. - **`LoggingEventBus` as an alternative** — useful for audit-trail-only contexts (e.g., CLI invocations, serverless) where an RxPY dependency is undesirable. - **All files under 500 lines** — largest file is `event_bus_steps.py` at 476 lines, compliant with the project's modular design guideline. ### Files added ``` src/cleveragents/infrastructure/events/__init__.py src/cleveragents/infrastructure/events/types.py src/cleveragents/infrastructure/events/models.py src/cleveragents/infrastructure/events/protocol.py src/cleveragents/infrastructure/events/reactive.py src/cleveragents/infrastructure/events/logging_bus.py features/event_bus.feature features/steps/event_bus_steps.py robot/event_bus.robot robot/helper_event_bus.py benchmarks/event_bus_bench.py docs/reference/event_bus.md ``` ### Files modified ``` src/cleveragents/application/container.py — added event_bus Singleton provider src/cleveragents/application/services/decision_service.py — emit DECISION_CREATED src/cleveragents/application/services/plan_lifecycle_service.py — emit PLAN_CREATED, PLAN_PHASE_CHANGED CHANGELOG.md — added Unreleased entry for #473 ```
aditya added this to the v3.6.0 milestone 2026-03-02 12:09:48 +00:00
- Add EventType StrEnum with 38 typed domain event identifiers across 8
  domains (plan lifecycle, decision, actor, tool, resource, sandbox,
  validation, session, budget)
- Add frozen DomainEvent Pydantic model with event_type, auto-UTC
  timestamp, auto-ULID correlation_id, plan_id, root_plan_id,
  session_id, actor_name, project_name, and details fields
- Add @runtime_checkable EventBus Protocol with emit() and subscribe()
- Add ReactiveEventBus backed by RxPY Subject for in-process reactive
  streaming with synchronous handler dispatch and raw Observable stream
- Add LoggingEventBus using structlog for audit-trail-only environments
- Wire DecisionService to emit DECISION_CREATED on record_decision()
- Wire PlanLifecycleService to emit PLAN_CREATED and PLAN_PHASE_CHANGED
  on use_action() and execute_plan() respectively
- Register ReactiveEventBus as Singleton in DI container and inject into
  DecisionService and PlanLifecycleService providers
- Add Behave BDD unit tests: 27 scenarios, 75 steps, 100% branch coverage
- Add Robot Framework integration tests: 9 smoke cases
- Add ASV performance benchmarks: 5 suites covering emit throughput and
  subscriber fan-out
- Add reference documentation at docs/reference/event_bus.md

ISSUES CLOSED: #473
Merge branch 'master' into feature/m6plus-event-bus
All checks were successful
CI / benchmark-publish (pull_request) Has been skipped
CI / lint (pull_request) Successful in 14s
CI / build (pull_request) Successful in 15s
CI / quality (pull_request) Successful in 17s
CI / security (pull_request) Successful in 31s
CI / typecheck (pull_request) Successful in 32s
CI / unit_tests (pull_request) Successful in 1m45s
CI / docker (pull_request) Successful in 38s
CI / integration_tests (pull_request) Successful in 2m45s
CI / coverage (pull_request) Successful in 3m40s
CI / benchmark-regression (pull_request) Successful in 22m31s
69efec9a65
hamza.khyari requested changes 2026-03-03 13:05:28 +00:00
Dismissed
hamza.khyari left a comment

Review: feat(events): add EventBus protocol and ReactiveEventBus implementation

Pre-PR Checklist

# Check Status
1 Commit message first line matches issue Metadata exactly PASS
2 Issue reference footer ISSUES CLOSED: #473 PASS
3 CHANGELOG.md updated PASS
4 Milestone set PASSv3.6.0
5 No Any in production signatures PASSdetails: dict[str, Any] justified per spec
6 Full type annotations on all functions PASS
7 No unjustified # type: ignore PASS — 3 occurrences, all justified

P1 — Must Fix

P1-1: Spec's _persist_audit(event) missing from ReactiveEventBus.emit()
reactive.py:56-62

specification.md:42696-42698 shows ReactiveEventBus.emit() calling self._persist_audit(event) between on_next() and handler dispatch. The implementation omits this. Either implement _persist_audit() (e.g. delegate to structlog), register a LoggingEventBus-style handler automatically, or get a spec amendment documenting the deferral.

P1-2: Bare MagicMock() for known types
event_bus_steps.py:290,314,366 · helper_event_bus.py:126,154

DecisionService(settings=MagicMock(), ...)      # steps:290, steps:314, helper:126
PlanLifecycleService(settings=MagicMock(), ...)  # steps:366→used at 374,393, helper:154

Never use bare MagicMock() for known types. Use MagicMock(spec=Settings) or create_autospec(Settings, instance=True). Affects 5 call sites across Behave steps and Robot helper.


P2 — Should Fix

P2-1: correlation_id field not in spec
models.py:51-54

DomainEvent adds correlation_id: str (auto ULID) which is absent from specification.md:42666-42675. Sensible addition for traceability, but the spec divergence should be documented — either update the spec or add an inline comment noting the extension.

P2-2: No thread safety on _subscriptions
reactive.py:35,62-63 · logging_bus.py:46,66-67

Both buses use dict.setdefault().append() in subscribe() and iterate _subscriptions.get() in emit() without locking. The DI container registers the bus as a Singleton shared across the process. If subscribe() is called concurrently with emit() from different threads, the list could be mutated during iteration. Consider a threading.Lock or documenting single-threaded-only usage.

P2-3: Nested bare MagicMock() for UoW transaction chain
event_bus_steps.py:283-287,307-311 · helper_event_bus.py:120-124

mock_uow = MagicMock()
mock_uow.transaction.return_value.__enter__ = MagicMock(
    return_value=MagicMock(decisions=MagicMock())
)
mock_uow.transaction.return_value.__exit__ = MagicMock(return_value=False)

Four nested bare mocks. If UnitOfWork's transaction API changes, these tests pass silently. Use create_autospec(UnitOfWork, instance=True) for the top-level mock at minimum.


P3 — Nit / Optional

P3-1: CHANGELOG says "40 typed event identifiers" but EventType has 38 members.

P3-2: LoggingEventBus logs the details dict raw. If details ever contains sensitive data, it would appear in structured logs. Acceptable if the structlog pipeline has redaction processors.

P3-3: Merge conflicts with master (mergeable: false). Rebase required before merge.


Summary

Well-structured PR with clean separation (types / models / protocol / reactive / logging), proper @runtime_checkable Protocol usage, backward-compatible service wiring via optional event_bus parameter, and thorough test coverage (27 Behave + 9 Robot + 5 ASV suites). All spec-defined EventType values and DomainEvent fields are implemented correctly.

2 P1s and 3 P2s to address. The most significant is P1-1 — the spec explicitly defines _persist_audit(event) in ReactiveEventBus.emit() and it is absent from the implementation.

## Review: `feat(events): add EventBus protocol and ReactiveEventBus implementation` ### Pre-PR Checklist | # | Check | Status | |---|-------|--------| | 1 | Commit message first line matches issue Metadata exactly | **PASS** | | 2 | Issue reference footer `ISSUES CLOSED: #473` | **PASS** | | 3 | CHANGELOG.md updated | **PASS** | | 4 | Milestone set | **PASS** — `v3.6.0` | | 5 | No `Any` in production signatures | **PASS** — `details: dict[str, Any]` justified per spec | | 6 | Full type annotations on all functions | **PASS** | | 7 | No unjustified `# type: ignore` | **PASS** — 3 occurrences, all justified | --- ### P1 — Must Fix **P1-1: Spec's `_persist_audit(event)` missing from `ReactiveEventBus.emit()`** `reactive.py:56-62` `specification.md:42696-42698` shows `ReactiveEventBus.emit()` calling `self._persist_audit(event)` between `on_next()` and handler dispatch. The implementation omits this. Either implement `_persist_audit()` (e.g. delegate to structlog), register a `LoggingEventBus`-style handler automatically, or get a spec amendment documenting the deferral. **P1-2: Bare `MagicMock()` for known types** `event_bus_steps.py:290,314,366` · `helper_event_bus.py:126,154` ```python DecisionService(settings=MagicMock(), ...) # steps:290, steps:314, helper:126 PlanLifecycleService(settings=MagicMock(), ...) # steps:366→used at 374,393, helper:154 ``` Never use bare `MagicMock()` for known types. Use `MagicMock(spec=Settings)` or `create_autospec(Settings, instance=True)`. Affects 5 call sites across Behave steps and Robot helper. --- ### P2 — Should Fix **P2-1: `correlation_id` field not in spec** `models.py:51-54` `DomainEvent` adds `correlation_id: str` (auto ULID) which is absent from `specification.md:42666-42675`. Sensible addition for traceability, but the spec divergence should be documented — either update the spec or add an inline comment noting the extension. **P2-2: No thread safety on `_subscriptions`** `reactive.py:35,62-63` · `logging_bus.py:46,66-67` Both buses use `dict.setdefault().append()` in `subscribe()` and iterate `_subscriptions.get()` in `emit()` without locking. The DI container registers the bus as a Singleton shared across the process. If `subscribe()` is called concurrently with `emit()` from different threads, the list could be mutated during iteration. Consider a `threading.Lock` or documenting single-threaded-only usage. **P2-3: Nested bare `MagicMock()` for UoW transaction chain** `event_bus_steps.py:283-287,307-311` · `helper_event_bus.py:120-124` ```python mock_uow = MagicMock() mock_uow.transaction.return_value.__enter__ = MagicMock( return_value=MagicMock(decisions=MagicMock()) ) mock_uow.transaction.return_value.__exit__ = MagicMock(return_value=False) ``` Four nested bare mocks. If `UnitOfWork`'s transaction API changes, these tests pass silently. Use `create_autospec(UnitOfWork, instance=True)` for the top-level mock at minimum. --- ### P3 — Nit / Optional **P3-1:** CHANGELOG says "40 typed event identifiers" but `EventType` has 38 members. **P3-2:** `LoggingEventBus` logs the `details` dict raw. If `details` ever contains sensitive data, it would appear in structured logs. Acceptable if the structlog pipeline has redaction processors. **P3-3:** Merge conflicts with master (`mergeable: false`). Rebase required before merge. --- ### Summary Well-structured PR with clean separation (types / models / protocol / reactive / logging), proper `@runtime_checkable` Protocol usage, backward-compatible service wiring via optional `event_bus` parameter, and thorough test coverage (27 Behave + 9 Robot + 5 ASV suites). All spec-defined EventType values and DomainEvent fields are implemented correctly. **2 P1s** and **3 P2s** to address. The most significant is **P1-1** — the spec explicitly defines `_persist_audit(event)` in `ReactiveEventBus.emit()` and it is absent from the implementation.
- Fix CHANGELOG EventType count from 40 to 38 and add missing domains
  (invariant, context) for accurate documentation
- Document correlation_id field as spec extension for request traceability
  in DomainEvent model
- Add thread safety documentation to ReactiveEventBus and LoggingEventBus
  noting single-threaded design and external sync requirements
- Document _persist_audit deferral in ReactiveEventBus.emit() explaining
  audit logging via handler subscription or LoggingEventBus following
  single-responsibility principle
- Replace bare MagicMock() with create_autospec(Settings, instance=True)
  at 5 locations for type-safe test mocking
- Replace nested bare MagicMock for UnitOfWork with create_autospec at 3
  locations to prevent silent breakage on API changes

All changes maintain backward compatibility. No linter errors.
Merge branch 'master' into feature/m6plus-event-bus
Some checks failed
CI / benchmark-publish (pull_request) Has been skipped
CI / lint (pull_request) Successful in 15s
CI / build (pull_request) Successful in 16s
CI / quality (pull_request) Successful in 22s
CI / security (pull_request) Successful in 32s
CI / typecheck (pull_request) Successful in 51s
CI / unit_tests (pull_request) Failing after 3m13s
CI / docker (pull_request) Has been skipped
CI / integration_tests (pull_request) Failing after 3m58s
CI / coverage (pull_request) Successful in 3m57s
CI / benchmark-regression (pull_request) Has been cancelled
5bd7a507b8
fix(events): update tests for DecisionService.record_decision API change
All checks were successful
CI / benchmark-publish (pull_request) Has been skipped
CI / lint (pull_request) Successful in 14s
CI / build (pull_request) Successful in 14s
CI / quality (pull_request) Successful in 16s
CI / security (pull_request) Successful in 30s
CI / typecheck (pull_request) Successful in 47s
CI / unit_tests (pull_request) Successful in 2m13s
CI / docker (pull_request) Successful in 38s
CI / integration_tests (pull_request) Successful in 3m3s
CI / coverage (pull_request) Successful in 3m58s
CI / benchmark-regression (pull_request) Successful in 24m40s
7348ca1911
- Update event_bus_steps.py to call record_decision() with individual
  parameters (plan_id, decision_type, question, chosen_option, rationale)
  instead of passing a Decision object, matching the master branch API
- Remove ctx.test_decision construction from DecisionService setup steps
  as it is no longer needed with the new signature
- Update robot/helper_event_bus.py decision_service_emits_event() to use
  the new record_decision() parameter signature
- All 9 Robot Framework event bus tests passing
- Behave scenarios at lines 125 and 131 now passing

Fixes event bus test failures after master merge.
hamza.khyari approved these changes 2026-03-03 20:05:49 +00:00
Dismissed
freemo left a comment

PM Review — Day 25

Status

  • Merge conflicts — @aditya please rebase feature/m6plus-event-bus onto current master.
  • Review: @hamza.khyari initially submitted REQUEST_CHANGES (2 P1s, 3 P2s), then dismissed and submitted APPROVED on the latest commit. @CoreRasurae has a pending (stale) review request but has not reviewed yet.
  • Properly labeled and milestoned (v3.6.0).

Action Items

  1. @aditya: Rebase onto master to resolve conflicts.
  2. @CoreRasurae: If you have bandwidth, please review. Otherwise, Hamza's approval may be sufficient given this is v3.6.0 (post-MVP).

Priority

Low urgency — v3.6.0 is not due until Mar 28. Rebase at convenience.

## PM Review — Day 25 ### Status - **Merge conflicts** — @aditya please rebase `feature/m6plus-event-bus` onto current `master`. - **Review**: @hamza.khyari initially submitted REQUEST_CHANGES (2 P1s, 3 P2s), then dismissed and submitted APPROVED on the latest commit. @CoreRasurae has a pending (stale) review request but has not reviewed yet. - Properly labeled and milestoned (v3.6.0). ### Action Items 1. **@aditya**: Rebase onto `master` to resolve conflicts. 2. **@CoreRasurae**: If you have bandwidth, please review. Otherwise, Hamza's approval may be sufficient given this is v3.6.0 (post-MVP). ### Priority Low urgency — v3.6.0 is not due until Mar 28. Rebase at convenience.
Merge branch 'master' into feature/m6plus-event-bus
All checks were successful
CI / benchmark-publish (pull_request) Has been skipped
CI / lint (pull_request) Successful in 15s
CI / build (pull_request) Successful in 17s
CI / quality (pull_request) Successful in 23s
CI / security (pull_request) Successful in 35s
CI / typecheck (pull_request) Successful in 1m16s
CI / unit_tests (pull_request) Successful in 2m10s
CI / docker (pull_request) Successful in 39s
CI / integration_tests (pull_request) Successful in 3m6s
CI / coverage (pull_request) Successful in 4m36s
CI / benchmark-regression (pull_request) Successful in 29m17s
3d2b138379
aditya dismissed hamza.khyari's review 2026-03-06 12:57:29 +00:00
Reason:

New commits pushed, approval review dismissed automatically according to repository settings

aditya merged commit 2bf35e5403 into master 2026-03-06 13:29:04 +00:00
aditya deleted branch feature/m6plus-event-bus 2026-03-06 13:29:04 +00:00
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
3 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core!509
No description provided.