feature/m6plus-event-bus #509
No reviewers
Labels
No labels
auto/needs-reevaluation
controller-managed
auto/blocked-by-deps
auto/ci-timeout
auto/claimed-implementer
auto/claimed-merge
auto/claimed-reviewer
auto/driver-down
auto/invariant-violation
auto/last-attempt-tier-0
auto/last-attempt-tier-1
auto/last-attempt-tier-2
auto/last-attempt-tier-min
Automation Tracking
auto/needs-conflict-resolution
auto/needs-implementer
auto/postmortem
auto/ready-to-merge
auto/restart-throttled
auto/revert
auto/sentinel
auto/stale-inactivity
auto/unstable
Blocked
Bounty
$100
Bounty
$1000
Bounty
$10000
Bounty
$20
Bounty
$2000
Bounty
$250
Bounty
$50
Bounty
$500
Bounty
$5000
Bounty
$750
MoSCoW
Could have
MoSCoW
Must have
MoSCoW
Should have
Needs Feedback
Points
1
Points
13
Points
2
Points
21
Points
3
Points
34
Points
5
Points
55
Points
8
Points
88
Priority
Backlog
Priority
CI Blocker
Priority
Critical
Priority
High
Priority
Low
Priority
Medium
Signed-off: Owner
Signed-off: Scrum Master
Signed-off: Tech Lead
Spike
State
Completed
State
Duplicate
State
In Progress
State
In Review
State
Paused
State
Unverified
State
Verified
State
Wont Do
Type
Automation
Type
Bug
Type
Discussion
Type
Documentation
Type
Epic
Type
Feature
Type
Legendary
Type
Refactor
Type
Support
Type
Task
Type
Testing
No project
No assignees
3 participants
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
cleveragents/cleveragents-core!509
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "feature/m6plus-event-bus"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Description
Implements a general-purpose domain event system for CleverAgents as defined in
docs/specification.md§Event-Driven Architecture. AddsEventType,DomainEvent,EventBusProtocol,ReactiveEventBus(RxPY-backed), andLoggingEventBus(structlog-backed) under
src/cleveragents/infrastructure/events/. WiresDecisionServiceandPlanLifecycleServiceto emit domain events on significantstate changes, and registers
ReactiveEventBusas a Singleton in the DI container.Type of Change
Quality Checklist
Anyunless justified —details: dict[str, Any]is justified as an open event payload field per spec)nox -s typecheckpasses with no errorsnox -s lintpasses with no errorsfeatures/)robot/) if applicablenox -s coverage_report) — achieved 100% on all new filesnox -s security_scan)nox -s dead_code)Testing
All tests were written first (TDD) before implementation. The full event bus
surface is covered at 100% branch coverage across all six new source files
(27 scenarios, 75 steps passing).
Test Commands Run
Related Issues
ISSUES CLOSED #473
Implementation Notes
New package:
src/cleveragents/infrastructure/events/types.pyEventTypeStrEnum — 38 dot-separated identifiers across 8 domainsmodels.pyDomainEventfrozen Pydantic model with auto-UTC timestamp and auto-ULIDcorrelation_idprotocol.py@runtime_checkable EventBusProtocol withemit()andsubscribe()reactive.pyReactiveEventBus— RxPYSubject-backed bus; dual-dispatch (stream + callbacks)logging_bus.pyLoggingEventBus— structlog-only bus; drop-in replacement requiring no RxPY__init__.pyDesign decisions
event_busparameter on services — bothDecisionServiceandPlanLifecycleServiceacceptevent_bus: EventBus | None = None. WhenNone,emission is silently skipped. This preserves full backward compatibility with
existing callers and tests that construct services without a bus.
# type: ignorein source —rx.Observableis imported directly fromrx.core.observable.observablerather thanrx.Observable(which pyright flagsas
reportPrivateImportUsage). All source files pass pyright strict mode with0 errors and 0 warnings and without any suppression comments.
ReactiveEventBus— everyemit()call pushes to the RxPYSubjectstream (for advanced operators likefilter,debounce) and invokesregistered callbacks synchronously. Plain callback subscribers work with no RxPY
knowledge; reactive subscribers use
bus.streamdirectly.LoggingEventBusas an alternative — useful for audit-trail-only contexts(e.g., CLI invocations, serverless) where an RxPY dependency is undesirable.
event_bus_steps.pyat 476 lines,compliant with the project's modular design guideline.
Files added
Files modified
Review:
feat(events): add EventBus protocol and ReactiveEventBus implementationPre-PR Checklist
ISSUES CLOSED: #473v3.6.0Anyin production signaturesdetails: dict[str, Any]justified per spec# type: ignoreP1 — Must Fix
P1-1: Spec's
_persist_audit(event)missing fromReactiveEventBus.emit()reactive.py:56-62specification.md:42696-42698showsReactiveEventBus.emit()callingself._persist_audit(event)betweenon_next()and handler dispatch. The implementation omits this. Either implement_persist_audit()(e.g. delegate to structlog), register aLoggingEventBus-style handler automatically, or get a spec amendment documenting the deferral.P1-2: Bare
MagicMock()for known typesevent_bus_steps.py:290,314,366·helper_event_bus.py:126,154Never use bare
MagicMock()for known types. UseMagicMock(spec=Settings)orcreate_autospec(Settings, instance=True). Affects 5 call sites across Behave steps and Robot helper.P2 — Should Fix
P2-1:
correlation_idfield not in specmodels.py:51-54DomainEventaddscorrelation_id: str(auto ULID) which is absent fromspecification.md:42666-42675. Sensible addition for traceability, but the spec divergence should be documented — either update the spec or add an inline comment noting the extension.P2-2: No thread safety on
_subscriptionsreactive.py:35,62-63·logging_bus.py:46,66-67Both buses use
dict.setdefault().append()insubscribe()and iterate_subscriptions.get()inemit()without locking. The DI container registers the bus as a Singleton shared across the process. Ifsubscribe()is called concurrently withemit()from different threads, the list could be mutated during iteration. Consider athreading.Lockor documenting single-threaded-only usage.P2-3: Nested bare
MagicMock()for UoW transaction chainevent_bus_steps.py:283-287,307-311·helper_event_bus.py:120-124Four nested bare mocks. If
UnitOfWork's transaction API changes, these tests pass silently. Usecreate_autospec(UnitOfWork, instance=True)for the top-level mock at minimum.P3 — Nit / Optional
P3-1: CHANGELOG says "40 typed event identifiers" but
EventTypehas 38 members.P3-2:
LoggingEventBuslogs thedetailsdict raw. Ifdetailsever contains sensitive data, it would appear in structured logs. Acceptable if the structlog pipeline has redaction processors.P3-3: Merge conflicts with master (
mergeable: false). Rebase required before merge.Summary
Well-structured PR with clean separation (types / models / protocol / reactive / logging), proper
@runtime_checkableProtocol usage, backward-compatible service wiring via optionalevent_busparameter, and thorough test coverage (27 Behave + 9 Robot + 5 ASV suites). All spec-defined EventType values and DomainEvent fields are implemented correctly.2 P1s and 3 P2s to address. The most significant is P1-1 — the spec explicitly defines
_persist_audit(event)inReactiveEventBus.emit()and it is absent from the implementation.PM Review — Day 25
Status
feature/m6plus-event-busonto currentmaster.Action Items
masterto resolve conflicts.Priority
Low urgency — v3.6.0 is not due until Mar 28. Rebase at convenience.
New commits pushed, approval review dismissed automatically according to repository settings