[Bug Hunt][Cycle 2][Reactive] Thread Safety Violation in ReactiveEventBus #7052

Open
opened 2026-04-10 07:27:07 +00:00 by HAL9000 · 2 comments
Owner

Metadata

  • Branch: bugfix/m3-reactive-event-bus-thread-safety
  • Commit Message: fix(events): add thread synchronization to ReactiveEventBus
  • Milestone: v3.2.0

Background and Context

ReactiveEventBus (src/cleveragents/infrastructure/events/reactive.py) is registered as a Singleton in the DI container. Its class docstring (lines 42–45) states it is designed for single-threaded use and that callers must provide external synchronization. However, no external synchronization is enforced at any call site, and in practice the bus is accessible from background workers, async tasks, and signal handlers that may run on different threads.

The _subscriptions dictionary and _audit_log deque are accessed and mutated without any locking. While CPython's GIL makes individual dict.get() and deque.append() calls individually atomic, compound read-modify-write sequences and iteration-while-mutating are not atomic. Under concurrent access this leads to race conditions.

Current Behavior

  • Line 125self._audit_log.append(event.model_copy(deep=True)) — no lock held
  • Line 127for handler in list(self._subscriptions.get(event.event_type, ())) — no lock held; a concurrent subscribe() or unsubscribe() can mutate _subscriptions between the get() and the list() copy
# Line 125: Audit log modification without synchronization
self._audit_log.append(event.model_copy(deep=True))

# Line 127: No synchronization when iterating over subscriptions
for handler in list(self._subscriptions.get(event.event_type, ())):
    try:
        handler(event)
    except Exception as exc:
        _logger.warning(
            "event_handler_failed",
            event_type=event.event_type.value,
            handler=getattr(handler, "__qualname__", repr(handler)),
            error_type=type(exc).__name__,
            error=str(exc),
        )

Expected Behavior

ReactiveEventBus must be safe for concurrent use. Either:

  1. A threading.RLock is acquired around all accesses to _subscriptions and _audit_log in emit(), subscribe(), and unsubscribe(), or
  2. The class enforces single-threaded access with a runtime guard and all call sites are audited to guarantee they never invoke the bus from multiple threads simultaneously.

Option 1 (adding a lock) is the preferred fix as it removes the implicit contract that callers must coordinate externally.

Impact

  • RuntimeError: dictionary changed size during iteration crashes under concurrent subscribe()/emit() calls
  • Silent event loss — handlers registered just before emit() may be skipped
  • Duplicate dispatch — handlers deregistered during iteration may still be called
  • Data corruption in _audit_log under concurrent emit() calls
  • Non-deterministic failures that are hard to reproduce and diagnose

Acceptance Criteria

  • ReactiveEventBus holds a threading.RLock (or equivalent) around all accesses to _subscriptions and _audit_log
  • emit(), subscribe(), and unsubscribe() are all covered by the lock
  • The class docstring is updated to accurately reflect the thread-safety guarantees after the fix
  • BDD scenario added: concurrent emit() and subscribe() calls from multiple threads do not cause data corruption, missed dispatches, or crashes
  • Robot integration test added for multi-threaded event bus usage
  • All nox stages pass; coverage ≥ 97%

Subtasks

  • Add threading.RLock to ReactiveEventBus.__init__
  • Wrap _subscriptions and _audit_log accesses in emit() with the lock
  • Wrap _subscriptions accesses in subscribe() and unsubscribe() with the lock
  • Update class docstring to reflect thread-safety guarantees
  • Tests (Behave): Add scenario for concurrent emit/subscribe thread safety (tagged @tdd_issue @tdd_issue_<N>)
  • Tests (Robot): Add integration test for multi-threaded event bus usage
  • Verify coverage ≥ 97% via nox -s coverage_report
  • Run nox (all default sessions), fix any errors

Definition of Done

This issue is complete when:

  • All subtasks above are completed and checked off.
  • A Git commit is created where the first line of the commit message matches the Commit Message in Metadata exactly, followed by a blank line, then additional lines providing relevant details about the implementation.
  • The commit is pushed to the remote on the branch matching the Branch in Metadata exactly.
  • The commit is submitted as a pull request to master, reviewed, and merged before this issue is marked done.
  • All nox stages pass.
  • Coverage ≥ 97%.

Automated by CleverAgents Bot
Supervisor: Bug Hunt | Agent: new-issue-creator

## Metadata - **Branch**: `bugfix/m3-reactive-event-bus-thread-safety` - **Commit Message**: `fix(events): add thread synchronization to ReactiveEventBus` - **Milestone**: v3.2.0 ## Background and Context `ReactiveEventBus` (`src/cleveragents/infrastructure/events/reactive.py`) is registered as a **Singleton** in the DI container. Its class docstring (lines 42–45) states it is designed for single-threaded use and that callers must provide external synchronization. However, no external synchronization is enforced at any call site, and in practice the bus is accessible from background workers, async tasks, and signal handlers that may run on different threads. The `_subscriptions` dictionary and `_audit_log` deque are accessed and mutated without any locking. While CPython's GIL makes individual `dict.get()` and `deque.append()` calls individually atomic, compound read-modify-write sequences and iteration-while-mutating are **not** atomic. Under concurrent access this leads to race conditions. ## Current Behavior - **Line 125** — `self._audit_log.append(event.model_copy(deep=True))` — no lock held - **Line 127** — `for handler in list(self._subscriptions.get(event.event_type, ()))` — no lock held; a concurrent `subscribe()` or `unsubscribe()` can mutate `_subscriptions` between the `get()` and the `list()` copy ```python # Line 125: Audit log modification without synchronization self._audit_log.append(event.model_copy(deep=True)) # Line 127: No synchronization when iterating over subscriptions for handler in list(self._subscriptions.get(event.event_type, ())): try: handler(event) except Exception as exc: _logger.warning( "event_handler_failed", event_type=event.event_type.value, handler=getattr(handler, "__qualname__", repr(handler)), error_type=type(exc).__name__, error=str(exc), ) ``` ## Expected Behavior `ReactiveEventBus` must be safe for concurrent use. Either: 1. A `threading.RLock` is acquired around all accesses to `_subscriptions` and `_audit_log` in `emit()`, `subscribe()`, and `unsubscribe()`, **or** 2. The class enforces single-threaded access with a runtime guard and all call sites are audited to guarantee they never invoke the bus from multiple threads simultaneously. Option 1 (adding a lock) is the preferred fix as it removes the implicit contract that callers must coordinate externally. ## Impact - `RuntimeError: dictionary changed size during iteration` crashes under concurrent `subscribe()`/`emit()` calls - Silent event loss — handlers registered just before `emit()` may be skipped - Duplicate dispatch — handlers deregistered during iteration may still be called - Data corruption in `_audit_log` under concurrent `emit()` calls - Non-deterministic failures that are hard to reproduce and diagnose ## Acceptance Criteria - [ ] `ReactiveEventBus` holds a `threading.RLock` (or equivalent) around all accesses to `_subscriptions` and `_audit_log` - [ ] `emit()`, `subscribe()`, and `unsubscribe()` are all covered by the lock - [ ] The class docstring is updated to accurately reflect the thread-safety guarantees after the fix - [ ] BDD scenario added: concurrent `emit()` and `subscribe()` calls from multiple threads do not cause data corruption, missed dispatches, or crashes - [ ] Robot integration test added for multi-threaded event bus usage - [ ] All nox stages pass; coverage ≥ 97% ## Subtasks - [ ] Add `threading.RLock` to `ReactiveEventBus.__init__` - [ ] Wrap `_subscriptions` and `_audit_log` accesses in `emit()` with the lock - [ ] Wrap `_subscriptions` accesses in `subscribe()` and `unsubscribe()` with the lock - [ ] Update class docstring to reflect thread-safety guarantees - [ ] Tests (Behave): Add scenario for concurrent emit/subscribe thread safety (tagged `@tdd_issue @tdd_issue_<N>`) - [ ] Tests (Robot): Add integration test for multi-threaded event bus usage - [ ] Verify coverage ≥ 97% via `nox -s coverage_report` - [ ] Run `nox` (all default sessions), fix any errors ## Definition of Done This issue is complete when: - All subtasks above are completed and checked off. - A Git commit is created where the **first line** of the commit message matches the Commit Message in Metadata exactly, followed by a blank line, then additional lines providing relevant details about the implementation. - The commit is pushed to the remote on the branch matching the **Branch** in Metadata exactly. - The commit is submitted as a **pull request** to `master`, reviewed, and **merged** before this issue is marked done. - All nox stages pass. - Coverage ≥ 97%. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunt | Agent: new-issue-creator
HAL9000 added this to the v3.2.0 milestone 2026-04-10 07:27:41 +00:00
Author
Owner

⚠️ Orphan Issue — Needs Manual Parent Epic Linking

This issue was created during automated bug hunting (Bug Hunt Cycle 2) and no open parent Epic covering infrastructure/events or ReactiveEventBus was found in the repository. Per CONTRIBUTING.md, all non-Epic issues must be linked to a parent Epic.

Action required: A maintainer must identify or create the appropriate parent Epic and link this issue as a child (this issue should block the parent Epic).

TDD dependency: This bug issue depends on (is blocked by) TDD issue #7055, which must be merged first to prove the bug exists before the fix is implemented.


Automated by CleverAgents Bot
Supervisor: Bug Hunt | Agent: new-issue-creator

⚠️ **Orphan Issue — Needs Manual Parent Epic Linking** This issue was created during automated bug hunting (Bug Hunt Cycle 2) and no open parent Epic covering `infrastructure/events` or `ReactiveEventBus` was found in the repository. Per CONTRIBUTING.md, all non-Epic issues must be linked to a parent Epic. **Action required:** A maintainer must identify or create the appropriate parent Epic and link this issue as a child (this issue should **block** the parent Epic). **TDD dependency:** This bug issue **depends on** (is blocked by) TDD issue #7055, which must be merged first to prove the bug exists before the fix is implemented. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunt | Agent: new-issue-creator
Author
Owner

Verified — Critical concurrency bug: thread safety violation in ReactiveEventBus. MoSCoW: Must-have. Priority: Critical.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Critical concurrency bug: thread safety violation in ReactiveEventBus. MoSCoW: Must-have. Priority: Critical. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Reference
cleveragents/cleveragents-core#7052
No description provided.