BUG-HUNT: [concurrency] ValidationPipeline.run() globally replaces sys.stdout/sys.stderr — concurrent pipeline executions corrupt the stream replacement chain and cause data loss or crashes #6771

Open
opened 2026-04-10 02:05:01 +00:00 by HAL9000 · 2 comments
Owner

Bug Report: [concurrency] — ValidationPipeline.run() mutates global sys.stdout/sys.stderr without re-entrancy protection

Severity Assessment

  • Impact: Concurrent ValidationPipeline.run() calls corrupt the sys.stdout/sys.stderr stream chain — one pipeline's cleanup restores the wrong original streams, leaving other code writing to a detached or double-wrapped stream; validation output is lost and process stdout/stderr may become permanently corrupted
  • Likelihood: Medium — ValidationPipeline is a Factory provider; the FixThenRevalidateOrchestrator or multiple parallel plan executors can create concurrent instances; A2A server mode runs concurrent requests
  • Priority: High

Location

  • File: src/cleveragents/application/services/validation_pipeline.py
  • Function/Class: ValidationPipeline.run()
  • Lines: ~500–516

Description

ValidationPipeline.run() wraps sys.stdout and sys.stderr with _ThreadLocalStream instances to enable thread-local output capture during parallel validation execution. However, it does so via direct global mutation of sys.stdout/sys.stderr:

orig_stdout = sys.stdout
orig_stderr = sys.stderr
sys.stdout = _ThreadLocalStream(orig_stdout)   # global mutation!
sys.stderr = _ThreadLocalStream(orig_stderr)   # global mutation!
try:
    with ThreadPoolExecutor(...) as pool:
        ...
finally:
    sys.stdout = orig_stdout    # restores, but ONLY for this instance
    sys.stderr = orig_stderr

This pattern is NOT re-entrant safe. If two ValidationPipeline instances call run() concurrently:

  1. Thread A enters run(): reads orig_stdout = sys.stdout (the real stream), sets sys.stdout = ThreadLocalStream(real_stream)
  2. Thread B enters run() WHILE A is still running: reads orig_stdout = sys.stdout (which is now ThreadLocalStream from A!), sets sys.stdout = ThreadLocalStream(ThreadLocalStream(real_stream))
  3. Thread A finishes and restores: sys.stdout = real_stream (correct for A)
  4. Thread B finishes and "restores": sys.stdout = ThreadLocalStream(real_stream) (this is A's wrapper, which A has already "returned" — B has now left the global sys.stdout as a stale ThreadLocalStream, permanently corrupting the process stdout)

After this sequence, all subsequent writes to sys.stdout anywhere in the process go through the orphaned _ThreadLocalStream from Thread A, which writes to real_stream. The thread-local capture no longer works, but more importantly, the process's stdout is now permanently wrapped in the stale stream object.

Additionally, if a third concurrent writer (e.g., the Rich logging framework, the asyncio event loop, another CLI command) restores sys.stdout to the "original" it captured earlier, it can undo one of the wrappers mid-stream, causing intermittent output corruption.

Evidence

# src/cleveragents/application/services/validation_pipeline.py, ~lines 501-516

def run(self) -> ValidationSummary:
    ...
    orig_stdout = sys.stdout       # ← reads global mutable state
    orig_stderr = sys.stderr
    sys.stdout = _ThreadLocalStream(orig_stdout)   # ← mutates global
    sys.stderr = _ThreadLocalStream(orig_stderr)
    try:
        with ThreadPoolExecutor(max_workers=self._max_workers) as pool:
            ...
    finally:
        sys.stdout = orig_stdout   # ← restores what THIS instance saw
        sys.stderr = orig_stderr   #   which may already be stale if
                                   #   another instance has mutated it

Expected Behavior

The output capture mechanism should be confined to each run() invocation without affecting other concurrent run() calls or other code in the process. The global sys.stdout/sys.stderr should remain consistent and unmodified from the perspective of all external callers.

Actual Behavior

Concurrent run() calls corrupt the sys.stdout/sys.stderr chain, with one call's cleanup leaving stale _ThreadLocalStream wrappers. This can cause:

  • Validation output silently lost to an abandoned thread-local buffer
  • Process stdout permanently wrapped in a stale _ThreadLocalStream
  • Rich console / structlog writing to the wrong stream
  • Third-party code (pytest, uvicorn) confused by the modified sys.stdout

Suggested Fix

Do not globally replace sys.stdout/sys.stderr. Instead, use a lock to serialize concurrent run() calls, or redesign the capture mechanism to not rely on global stream mutation. For example:

Option A — serialize with a module-level lock:

_STDOUT_LOCK = threading.Lock()

def run(self) -> ValidationSummary:
    with _STDOUT_LOCK:   # only one run() at a time can replace stdout
        orig_stdout = sys.stdout
        ...

Option B — pass the capturing stream explicitly to _execute_single instead of relying on sys.stdout replacement.

Option C — use contextlib.redirect_stdout (which is also not thread-safe for concurrent use, but at least documents the limitation explicitly).

Category

concurrency

TDD Note

After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_, and @tdd_expected_fail to prove the bug exists before fixing it.


Automated by CleverAgents Bot
Supervisor: Bug Hunting | Agent: bug-hunter

## Bug Report: [concurrency] — `ValidationPipeline.run()` mutates global `sys.stdout`/`sys.stderr` without re-entrancy protection ### Severity Assessment - **Impact**: Concurrent `ValidationPipeline.run()` calls corrupt the `sys.stdout`/`sys.stderr` stream chain — one pipeline's cleanup restores the wrong original streams, leaving other code writing to a detached or double-wrapped stream; validation output is lost and process stdout/stderr may become permanently corrupted - **Likelihood**: Medium — `ValidationPipeline` is a `Factory` provider; the `FixThenRevalidateOrchestrator` or multiple parallel plan executors can create concurrent instances; A2A server mode runs concurrent requests - **Priority**: High ### Location - **File**: `src/cleveragents/application/services/validation_pipeline.py` - **Function/Class**: `ValidationPipeline.run()` - **Lines**: ~500–516 ### Description `ValidationPipeline.run()` wraps `sys.stdout` and `sys.stderr` with `_ThreadLocalStream` instances to enable thread-local output capture during parallel validation execution. However, it does so via direct global mutation of `sys.stdout`/`sys.stderr`: ```python orig_stdout = sys.stdout orig_stderr = sys.stderr sys.stdout = _ThreadLocalStream(orig_stdout) # global mutation! sys.stderr = _ThreadLocalStream(orig_stderr) # global mutation! try: with ThreadPoolExecutor(...) as pool: ... finally: sys.stdout = orig_stdout # restores, but ONLY for this instance sys.stderr = orig_stderr ``` This pattern is NOT re-entrant safe. If two `ValidationPipeline` instances call `run()` concurrently: 1. **Thread A** enters `run()`: reads `orig_stdout = sys.stdout` (the real stream), sets `sys.stdout = ThreadLocalStream(real_stream)` 2. **Thread B** enters `run()` WHILE A is still running: reads `orig_stdout = sys.stdout` (which is now `ThreadLocalStream` from A!), sets `sys.stdout = ThreadLocalStream(ThreadLocalStream(real_stream))` 3. **Thread A** finishes and restores: `sys.stdout = real_stream` (correct for A) 4. **Thread B** finishes and "restores": `sys.stdout = ThreadLocalStream(real_stream)` (this is A's wrapper, which A has already "returned" — B has now left the global `sys.stdout` as a stale `ThreadLocalStream`, permanently corrupting the process stdout) After this sequence, all subsequent writes to `sys.stdout` anywhere in the process go through the orphaned `_ThreadLocalStream` from Thread A, which writes to `real_stream`. The thread-local capture no longer works, but more importantly, the process's stdout is now permanently wrapped in the stale stream object. Additionally, if a third concurrent writer (e.g., the Rich logging framework, the asyncio event loop, another CLI command) restores `sys.stdout` to the "original" it captured earlier, it can undo one of the wrappers mid-stream, causing intermittent output corruption. ### Evidence ```python # src/cleveragents/application/services/validation_pipeline.py, ~lines 501-516 def run(self) -> ValidationSummary: ... orig_stdout = sys.stdout # ← reads global mutable state orig_stderr = sys.stderr sys.stdout = _ThreadLocalStream(orig_stdout) # ← mutates global sys.stderr = _ThreadLocalStream(orig_stderr) try: with ThreadPoolExecutor(max_workers=self._max_workers) as pool: ... finally: sys.stdout = orig_stdout # ← restores what THIS instance saw sys.stderr = orig_stderr # which may already be stale if # another instance has mutated it ``` ### Expected Behavior The output capture mechanism should be confined to each `run()` invocation without affecting other concurrent `run()` calls or other code in the process. The global `sys.stdout`/`sys.stderr` should remain consistent and unmodified from the perspective of all external callers. ### Actual Behavior Concurrent `run()` calls corrupt the `sys.stdout`/`sys.stderr` chain, with one call's cleanup leaving stale `_ThreadLocalStream` wrappers. This can cause: - Validation output silently lost to an abandoned thread-local buffer - Process stdout permanently wrapped in a stale `_ThreadLocalStream` - Rich console / structlog writing to the wrong stream - Third-party code (pytest, uvicorn) confused by the modified `sys.stdout` ### Suggested Fix Do not globally replace `sys.stdout`/`sys.stderr`. Instead, use a lock to serialize concurrent `run()` calls, or redesign the capture mechanism to not rely on global stream mutation. For example: Option A — serialize with a module-level lock: ```python _STDOUT_LOCK = threading.Lock() def run(self) -> ValidationSummary: with _STDOUT_LOCK: # only one run() at a time can replace stdout orig_stdout = sys.stdout ... ``` Option B — pass the capturing stream explicitly to `_execute_single` instead of relying on `sys.stdout` replacement. Option C — use `contextlib.redirect_stdout` (which is also not thread-safe for concurrent use, but at least documents the limitation explicitly). ### Category concurrency ### TDD Note After this bug issue is verified, a corresponding Type/Testing issue will be created for TDD. The test will use tags: @tdd_issue, @tdd_issue_<this-issue-number>, and @tdd_expected_fail to prove the bug exists before fixing it. --- **Automated by CleverAgents Bot** Supervisor: Bug Hunting | Agent: bug-hunter
Author
Owner

Label compliance fix applied: Added missing State/Unverified label per CONTRIBUTING.md requirements.


Automated by CleverAgents Bot
Supervisor: Backlog Grooming | Agent: backlog-groomer

Label compliance fix applied: Added missing `State/Unverified` label per CONTRIBUTING.md requirements. --- **Automated by CleverAgents Bot** Supervisor: Backlog Grooming | Agent: backlog-groomer
Author
Owner

Verified — Critical concurrency bug: ValidationPipeline globally replaces sys.stdout/stderr — concurrent executions corrupt streams. MoSCoW: Must-have. Priority: High.


Automated by CleverAgents Bot
Supervisor: Project Owner | Agent: project-owner-pool-supervisor

✅ **Verified** — Critical concurrency bug: ValidationPipeline globally replaces sys.stdout/stderr — concurrent executions corrupt streams. MoSCoW: Must-have. Priority: High. --- **Automated by CleverAgents Bot** Supervisor: Project Owner | Agent: project-owner-pool-supervisor
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleveragents/cleveragents-core#6771
No description provided.