BUG-HUNT: [CONCURRENCY] Race condition in CircuitBreaker shared state access #7088

Open
opened 2026-04-10 07:35:12 +00:00 by HAL9000 · 1 comment
Owner

Metadata

  • Branch: fix/circuit-breaker-thread-safety
  • Commit Message: fix(acms): add threading.Lock to CircuitBreaker for thread-safe state access
  • Milestone: (none — backlog, see note below)
  • Parent Epic: #396

Backlog note: This issue was discovered during autonomous operation
on milestone v3.4.0. It does not block milestone completion and has been
placed in the backlog for human review and future milestone assignment.


Background and Context

The CircuitBreaker class in src/cleveragents/application/services/acms_pipeline.py (lines 91–134) is used by ParallelStrategyExecutor to track per-strategy failure counts and temporarily disable misbehaving strategies. ParallelStrategyExecutor executes strategies concurrently via ThreadPoolExecutor (see _execute_parallel(), line ~417).

The CircuitBreaker docstring at line 98 claims:

"Thread-safe: internal state is only mutated from the executor thread that owns the futures, never concurrently."

This claim is incorrect. The as_completed() loop in _execute_parallel() calls self._circuit_breaker.record_success(name) and self._circuit_breaker.record_failure(name) from the main thread while worker threads are still running and may be completing futures concurrently. Multiple futures can complete at nearly the same time, causing the as_completed() loop to call record_success / record_failure in rapid succession — and in some executor configurations, callbacks or future resolution can overlap.

More critically, the CircuitBreaker instance is shared across pipeline invocations (it lives on the ParallelStrategyExecutor instance), so if the pipeline is called concurrently from multiple threads (e.g., multiple plan executions), all threads share the same _failures dict and _disabled set with no synchronization.

Current Behavior

  • File: src/cleveragents/application/services/acms_pipeline.py
  • Class: CircuitBreaker (lines 105–124)
  • Shared mutable state: self._failures: dict[str, int] and self._disabled: set[str]
  • Unsynchronized methods:
    • record_success() (line 110): reads and writes _failures, mutates _disabled
    • record_failure() (line 115): read-modify-write on _failures, conditionally mutates _disabled
    • is_open() (line 122): reads _disabled
    • reset() (line 126): mutates both _failures and _disabled
    • reset_all() (line 131): clears both collections

Race condition example in record_failure():

def record_failure(self, name: str) -> None:
    count = self._failures.get(name, 0) + 1   # Thread A reads 2
    # --- Thread B also reads 2 here ---
    self._failures[name] = count               # Thread A writes 3
    # Thread B writes 3 (lost increment — should be 4)
    if count >= self.failure_threshold:
        self._disabled.add(name)

Multiple threads can corrupt the failure counts and the disabled set, leading to:

  • Strategies never being disabled (lost failure increments)
  • Strategies being incorrectly re-enabled (lost success resets)
  • dict / set corruption under CPython's GIL in edge cases (non-atomic read-modify-write)

Expected Behavior

All state mutations in CircuitBreaker must be protected by a threading.Lock to guarantee atomicity of read-modify-write operations. The class docstring must accurately reflect the actual thread-safety mechanism.

Acceptance Criteria

  • CircuitBreaker holds a threading.Lock instance initialized in __init__
  • All methods that read or mutate _failures or _disabled acquire the lock before accessing state
  • The class docstring accurately describes the thread-safety guarantee
  • Concurrent calls to record_failure() from multiple threads produce correct cumulative counts
  • Concurrent calls to record_success() and record_failure() do not corrupt the disabled set
  • No deadlocks introduced (lock is non-reentrant; no nested lock acquisition)

Supporting Information

  • Used in: ParallelStrategyExecutor._execute_parallel()ThreadPoolExecutor with up to max_workers=4 concurrent strategy executions (line ~417)
  • Category: concurrency
  • Severity: High — can cause incorrect strategy disabling/enabling, leading to strategies being silently skipped or incorrectly retried
  • Fix: Add self._lock = threading.Lock() in __init__ and wrap all state-mutating methods with with self._lock:
  • Related issues: #7060 (race condition in SandboxManager), #7036 (AsyncJob heartbeat race), #7076 (race condition in tool execution)
  • Spec reference: docs/specification.md ~line 42936 (ParallelStrategyExecutor circuit-breaker pattern)

Suggested fix sketch:

import threading

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 3) -> None:
        self.failure_threshold = failure_threshold
        self._failures: dict[str, int] = {}
        self._disabled: set[str] = set()
        self._lock = threading.Lock()

    def record_success(self, name: str) -> None:
        with self._lock:
            self._failures[name] = 0
            self._disabled.discard(name)

    def record_failure(self, name: str) -> None:
        with self._lock:
            count = self._failures.get(name, 0) + 1
            self._failures[name] = count
            if count >= self.failure_threshold:
                self._disabled.add(name)

    def is_open(self, name: str) -> bool:
        with self._lock:
            return name in self._disabled

Subtasks

  • Add self._lock = threading.Lock() to CircuitBreaker.__init__
  • Wrap record_success() body with with self._lock:
  • Wrap record_failure() body with with self._lock: (ensuring atomic read-modify-write)
  • Wrap is_open() body with with self._lock:
  • Wrap reset() body with with self._lock:
  • Wrap reset_all() body with with self._lock:
  • Update CircuitBreaker docstring to accurately describe the threading.Lock mechanism
  • Tests (Behave): Add BDD scenarios for concurrent CircuitBreaker access (verify no lost increments under concurrent record_failure calls)
  • Tests (Robot): Add integration test for ParallelStrategyExecutor thread safety under concurrent pipeline invocations
  • Verify coverage ≥ 97% via nox -s coverage_report
  • Run nox (all default sessions), fix any errors

Definition of Done

This issue is complete when:

  • All subtasks above are completed and checked off
  • A Git commit is created where the first line of the commit message matches the Commit Message in Metadata exactly (fix(acms): add threading.Lock to CircuitBreaker for thread-safe state access), followed by a blank line, then additional lines providing relevant implementation details
  • The commit is pushed to the remote on the branch matching the Branch in Metadata exactly (fix/circuit-breaker-thread-safety)
  • The commit is submitted as a pull request to master, reviewed, and merged before this issue is marked done
  • All nox stages pass
  • Coverage ≥ 97%

Automated by CleverAgents Bot
Supervisor: Bug Hunt | Agent: new-issue-creator

## Metadata - **Branch**: `fix/circuit-breaker-thread-safety` - **Commit Message**: `fix(acms): add threading.Lock to CircuitBreaker for thread-safe state access` - **Milestone**: (none — backlog, see note below) - **Parent Epic**: #396 > **Backlog note:** This issue was discovered during autonomous operation > on milestone v3.4.0. It does not block milestone completion and has been > placed in the backlog for human review and future milestone assignment. --- ## Background and Context The `CircuitBreaker` class in `src/cleveragents/application/services/acms_pipeline.py` (lines 91–134) is used by `ParallelStrategyExecutor` to track per-strategy failure counts and temporarily disable misbehaving strategies. `ParallelStrategyExecutor` executes strategies concurrently via `ThreadPoolExecutor` (see `_execute_parallel()`, line ~417). The `CircuitBreaker` docstring at line 98 claims: > *"Thread-safe: internal state is only mutated from the executor thread that owns the futures, never concurrently."* This claim is **incorrect**. The `as_completed()` loop in `_execute_parallel()` calls `self._circuit_breaker.record_success(name)` and `self._circuit_breaker.record_failure(name)` from the **main thread** while worker threads are still running and may be completing futures concurrently. Multiple futures can complete at nearly the same time, causing the `as_completed()` loop to call `record_success` / `record_failure` in rapid succession — and in some executor configurations, callbacks or future resolution can overlap. More critically, the `CircuitBreaker` instance is shared across pipeline invocations (it lives on the `ParallelStrategyExecutor` instance), so if the pipeline is called concurrently from multiple threads (e.g., multiple plan executions), all threads share the same `_failures` dict and `_disabled` set with no synchronization. ## Current Behavior - **File**: `src/cleveragents/application/services/acms_pipeline.py` - **Class**: `CircuitBreaker` (lines 105–124) - **Shared mutable state**: `self._failures: dict[str, int]` and `self._disabled: set[str]` - **Unsynchronized methods**: - `record_success()` (line 110): reads and writes `_failures`, mutates `_disabled` - `record_failure()` (line 115): read-modify-write on `_failures`, conditionally mutates `_disabled` - `is_open()` (line 122): reads `_disabled` - `reset()` (line 126): mutates both `_failures` and `_disabled` - `reset_all()` (line 131): clears both collections **Race condition example in `record_failure()`:** ```python def record_failure(self, name: str) -> None: count = self._failures.get(name, 0) + 1 # Thread A reads 2 # --- Thread B also reads 2 here --- self._failures[name] = count # Thread A writes 3 # Thread B writes 3 (lost increment — should be 4) if count >= self.failure_threshold: self._disabled.add(name) ``` Multiple threads can corrupt the failure counts and the disabled set, leading to: - Strategies never being disabled (lost failure increments) - Strategies being incorrectly re-enabled (lost success resets) - `dict` / `set` corruption under CPython's GIL in edge cases (non-atomic read-modify-write) ## Expected Behavior All state mutations in `CircuitBreaker` must be protected by a `threading.Lock` to guarantee atomicity of read-modify-write operations. The class docstring must accurately reflect the actual thread-safety mechanism. ## Acceptance Criteria - [ ] `CircuitBreaker` holds a `threading.Lock` instance initialized in `__init__` - [ ] All methods that read or mutate `_failures` or `_disabled` acquire the lock before accessing state - [ ] The class docstring accurately describes the thread-safety guarantee - [ ] Concurrent calls to `record_failure()` from multiple threads produce correct cumulative counts - [ ] Concurrent calls to `record_success()` and `record_failure()` do not corrupt the disabled set - [ ] No deadlocks introduced (lock is non-reentrant; no nested lock acquisition) ## Supporting Information - **Used in**: `ParallelStrategyExecutor._execute_parallel()` — `ThreadPoolExecutor` with up to `max_workers=4` concurrent strategy executions (line ~417) - **Category**: concurrency - **Severity**: High — can cause incorrect strategy disabling/enabling, leading to strategies being silently skipped or incorrectly retried - **Fix**: Add `self._lock = threading.Lock()` in `__init__` and wrap all state-mutating methods with `with self._lock:` - **Related issues**: #7060 (race condition in SandboxManager), #7036 (AsyncJob heartbeat race), #7076 (race condition in tool execution) - **Spec reference**: `docs/specification.md` ~line 42936 (ParallelStrategyExecutor circuit-breaker pattern) **Suggested fix sketch:** ```python import threading class CircuitBreaker: def __init__(self, failure_threshold: int = 3) -> None: self.failure_threshold = failure_threshold self._failures: dict[str, int] = {} self._disabled: set[str] = set() self._lock = threading.Lock() def record_success(self, name: str) -> None: with self._lock: self._failures[name] = 0 self._disabled.discard(name) def record_failure(self, name: str) -> None: with self._lock: count = self._failures.get(name, 0) + 1 self._failures[name] = count if count >= self.failure_threshold: self._disabled.add(name) def is_open(self, name: str) -> bool: with self._lock: return name in self._disabled ``` --- ## Subtasks - [ ] Add `self._lock = threading.Lock()` to `CircuitBreaker.__init__` - [ ] Wrap `record_success()` body with `with self._lock:` - [ ] Wrap `record_failure()` body with `with self._lock:` (ensuring atomic read-modify-write) - [ ] Wrap `is_open()` body with `with self._lock:` - [ ] Wrap `reset()` body with `with self._lock:` - [ ] Wrap `reset_all()` body with `with self._lock:` - [ ] Update `CircuitBreaker` docstring to accurately describe the `threading.Lock` mechanism - [ ] Tests (Behave): Add BDD scenarios for concurrent `CircuitBreaker` access (verify no lost increments under concurrent `record_failure` calls) - [ ] Tests (Robot): Add integration test for `ParallelStrategyExecutor` thread safety under concurrent pipeline invocations - [ ] Verify coverage ≥ 97% via `nox -s coverage_report` - [ ] Run `nox` (all default sessions), fix any errors ## Definition of Done This issue is complete when: - [ ] All subtasks above are completed and checked off - [ ] A Git commit is created where the **first line** of the commit message matches the Commit Message in Metadata exactly (`fix(acms): add threading.Lock to CircuitBreaker for thread-safe state access`), followed by a blank line, then additional lines providing relevant implementation details - [ ] The commit is pushed to the remote on the branch matching the **Branch** in Metadata exactly (`fix/circuit-breaker-thread-safety`) - [ ] The commit is submitted as a **pull request** to `master`, reviewed, and **merged** before this issue is marked done - [ ] All nox stages pass - [ ] Coverage ≥ 97% --- **Automated by CleverAgents Bot** Supervisor: Bug Hunt | Agent: new-issue-creator
Author
Owner

Verified — Concurrency bug: race condition in CircuitBreaker shared state access. MoSCoW: Should-have. Priority: Medium.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Concurrency bug: race condition in CircuitBreaker shared state access. MoSCoW: Should-have. Priority: Medium. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Blocks
#396 Epic: ACMS Context Pipeline
cleveragents/cleveragents-core
Reference
cleveragents/cleveragents-core#7088
No description provided.