UAT: ReactiveEventBus documented as single-threaded but used as a Singleton in multi-threaded context — race condition risk #3935

Open
opened 2026-04-06 07:36:54 +00:00 by freemo · 0 comments
Owner

Metadata

  • Branch: fix/concurrency/reactive-event-bus-thread-safety
  • Commit Message: fix(concurrency): add threading.Lock to ReactiveEventBus for multi-threaded safety
  • Milestone: None — backlog
  • Parent Epic: #368

Backlog note: This issue was discovered during autonomous UAT operation
on milestone v3.4.0. It does not block milestone completion and has been
placed in the backlog for human review and future milestone assignment.


Bug Report

Feature Area: Async and Concurrency Patterns

Severity: Medium — race condition risk in multi-threaded deployments; the event bus is registered as a Singleton in the DI container and can be called from ThreadPoolExecutor worker threads (e.g., AsyncWorker, SubplanExecutionService)

What Was Tested

Code-level analysis of src/cleveragents/infrastructure/events/reactive.py against concurrency requirements.

Expected Behavior (from spec)

The ReactiveEventBus is registered as a Singleton in the DI container and is used by multiple services. When those services are called from worker threads (e.g., AsyncWorker uses a ThreadPoolExecutor with up to max_workers=4 threads, and SubplanExecutionService uses ThreadPoolExecutor for parallel subplan execution), the event bus must be thread-safe.

Actual Behavior (from code analysis)

ReactiveEventBus explicitly documents itself as not thread-safe:

# src/cleveragents/infrastructure/events/reactive.py, lines 38-41
Thread safety: This implementation is designed for single-threaded use
within the main application event loop. The DI container registers it as
a Singleton. If concurrent access from multiple threads is required,
callers must provide external synchronization.

However, the codebase has no external synchronization around event bus calls. The AsyncWorker service runs jobs in a ThreadPoolExecutor (up to 4 concurrent threads), and those jobs can call services that emit domain events via the event bus. Similarly, SubplanExecutionService._execute_parallel() runs subplans concurrently in a ThreadPoolExecutor, and subplan executors may emit events.

The internal state that is not protected includes:

  • self._subscriptions: dict[EventType, list[Callable]] — mutated by subscribe(), iterated by emit()
  • self._audit_log: deque[DomainEvent] — appended by emit() from multiple threads
  • self._subject (RxPY Subject) — on_next() called from multiple threads

Steps to Reproduce (Code Analysis)

  1. AsyncWorker.start() creates a ThreadPoolExecutor(max_workers=4)
  2. Worker threads call pickup_and_execute(job) which calls job_executor(job, token)
  3. Job executors call application services (e.g., PlanLifecycleService) which emit domain events via ReactiveEventBus.emit()
  4. Concurrent emit() calls from multiple threads mutate _audit_log and iterate _subscriptions without any lock

Code Location

  • src/cleveragents/infrastructure/events/reactive.pyReactiveEventBus class
  • src/cleveragents/application/services/async_worker.pyAsyncWorker._poll_loop() and ThreadPoolExecutor
  • src/cleveragents/application/services/subplan_execution_service.py_execute_parallel() and _execute_wave()

Add a threading.Lock to ReactiveEventBus to protect _subscriptions and _audit_log mutations, or document clearly which callers are responsible for external synchronization and add assertions/warnings when called from non-main threads.


Subtasks

  • Audit all call sites of ReactiveEventBus.emit() and subscribe() to confirm which are called from non-main threads
  • Add threading.Lock (or threading.RLock) to ReactiveEventBus protecting _subscriptions and _audit_log
  • Verify RxPY Subject.on_next() thread-safety and add synchronization if needed
  • Update docstring to reflect thread-safe status after fix
  • Tests (unit): Add concurrent emit() test using ThreadPoolExecutor to assert no data corruption
  • Tests (unit): Add concurrent subscribe() + emit() interleaving test
  • Verify coverage >= 97% via nox -s coverage_report
  • Run nox (all default sessions), fix any errors

Definition of Done

This issue is complete when:

  • All subtasks above are completed and checked off.
  • A Git commit is created where the first line of the commit message matches the Commit Message in Metadata exactly (fix(concurrency): add threading.Lock to ReactiveEventBus for multi-threaded safety), followed by a blank line, then additional lines providing relevant details about the implementation.
  • The commit is pushed to the remote on the branch matching the Branch in Metadata exactly (fix/concurrency/reactive-event-bus-thread-safety).
  • The commit is submitted as a pull request to master, reviewed, and merged before this issue is marked done.
  • All nox stages pass
  • Coverage >= 97%

Automated by CleverAgents Bot
Supervisor: UAT Testing | Agent: ca-new-issue-creator

## Metadata - **Branch**: `fix/concurrency/reactive-event-bus-thread-safety` - **Commit Message**: `fix(concurrency): add threading.Lock to ReactiveEventBus for multi-threaded safety` - **Milestone**: None — backlog - **Parent Epic**: #368 > **Backlog note:** This issue was discovered during autonomous UAT operation > on milestone v3.4.0. It does not block milestone completion and has been > placed in the backlog for human review and future milestone assignment. --- ## Bug Report **Feature Area:** Async and Concurrency Patterns **Severity:** Medium — race condition risk in multi-threaded deployments; the event bus is registered as a Singleton in the DI container and can be called from `ThreadPoolExecutor` worker threads (e.g., `AsyncWorker`, `SubplanExecutionService`) ### What Was Tested Code-level analysis of `src/cleveragents/infrastructure/events/reactive.py` against concurrency requirements. ### Expected Behavior (from spec) The `ReactiveEventBus` is registered as a Singleton in the DI container and is used by multiple services. When those services are called from worker threads (e.g., `AsyncWorker` uses a `ThreadPoolExecutor` with up to `max_workers=4` threads, and `SubplanExecutionService` uses `ThreadPoolExecutor` for parallel subplan execution), the event bus must be thread-safe. ### Actual Behavior (from code analysis) `ReactiveEventBus` explicitly documents itself as **not thread-safe**: ```python # src/cleveragents/infrastructure/events/reactive.py, lines 38-41 Thread safety: This implementation is designed for single-threaded use within the main application event loop. The DI container registers it as a Singleton. If concurrent access from multiple threads is required, callers must provide external synchronization. ``` However, the codebase has no external synchronization around event bus calls. The `AsyncWorker` service runs jobs in a `ThreadPoolExecutor` (up to 4 concurrent threads), and those jobs can call services that emit domain events via the event bus. Similarly, `SubplanExecutionService._execute_parallel()` runs subplans concurrently in a `ThreadPoolExecutor`, and subplan executors may emit events. The internal state that is not protected includes: - `self._subscriptions: dict[EventType, list[Callable]]` — mutated by `subscribe()`, iterated by `emit()` - `self._audit_log: deque[DomainEvent]` — appended by `emit()` from multiple threads - `self._subject` (RxPY Subject) — `on_next()` called from multiple threads ### Steps to Reproduce (Code Analysis) 1. `AsyncWorker.start()` creates a `ThreadPoolExecutor(max_workers=4)` 2. Worker threads call `pickup_and_execute(job)` which calls `job_executor(job, token)` 3. Job executors call application services (e.g., `PlanLifecycleService`) which emit domain events via `ReactiveEventBus.emit()` 4. Concurrent `emit()` calls from multiple threads mutate `_audit_log` and iterate `_subscriptions` without any lock ### Code Location - `src/cleveragents/infrastructure/events/reactive.py` — `ReactiveEventBus` class - `src/cleveragents/application/services/async_worker.py` — `AsyncWorker._poll_loop()` and `ThreadPoolExecutor` - `src/cleveragents/application/services/subplan_execution_service.py` — `_execute_parallel()` and `_execute_wave()` ### Recommended Fix Add a `threading.Lock` to `ReactiveEventBus` to protect `_subscriptions` and `_audit_log` mutations, or document clearly which callers are responsible for external synchronization and add assertions/warnings when called from non-main threads. --- ## Subtasks - [ ] Audit all call sites of `ReactiveEventBus.emit()` and `subscribe()` to confirm which are called from non-main threads - [ ] Add `threading.Lock` (or `threading.RLock`) to `ReactiveEventBus` protecting `_subscriptions` and `_audit_log` - [ ] Verify RxPY `Subject.on_next()` thread-safety and add synchronization if needed - [ ] Update docstring to reflect thread-safe status after fix - [ ] Tests (unit): Add concurrent `emit()` test using `ThreadPoolExecutor` to assert no data corruption - [ ] Tests (unit): Add concurrent `subscribe()` + `emit()` interleaving test - [ ] Verify coverage >= 97% via `nox -s coverage_report` - [ ] Run `nox` (all default sessions), fix any errors ## Definition of Done This issue is complete when: - All subtasks above are completed and checked off. - A Git commit is created where the **first line** of the commit message matches the Commit Message in Metadata exactly (`fix(concurrency): add threading.Lock to ReactiveEventBus for multi-threaded safety`), followed by a blank line, then additional lines providing relevant details about the implementation. - The commit is pushed to the remote on the branch matching the **Branch** in Metadata exactly (`fix/concurrency/reactive-event-bus-thread-safety`). - The commit is submitted as a **pull request** to `master`, reviewed, and **merged** before this issue is marked done. - All nox stages pass - Coverage >= 97% --- **Automated by CleverAgents Bot** Supervisor: UAT Testing | Agent: ca-new-issue-creator
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Blocks
#368 Epic: Subplans & Parallelism
cleveragents/cleveragents-core
Reference
cleveragents/cleveragents-core#3935
No description provided.