fix(security): close async resources and leaks #435

Merged
hamza.khyari merged 2 commits from feature/m4-security-async-cleanup into master 2026-02-26 15:19:16 +00:00
Member

Summary

Closes #321. Adds a unified async resource lifecycle and cleanup subsystem.

  • AsyncResourceTracker (core/async_cleanup.py) — central registry for async resources with timeout-bounded close_all(), async context manager, and __del__ finalizer that logs leaked resources by name
  • LangGraphBridge.cleanup_tasks_async() — awaits in-flight tasks with a deadline instead of fire-and-forget cancel; tracks cancellation_reasons
  • StateManager.close() — properly releases checkpoint file handles and completes the RxPY BehaviorSubject
  • AcpEventQueue.close() — disposes all subscriptions with logged count

Files Changed

File Status Description
src/cleveragents/core/async_cleanup.py NEW AsyncResourceTracker + AsyncResource protocol
src/cleveragents/langgraph/bridge.py MODIFIED Graceful task cancellation with timeout
src/cleveragents/langgraph/state.py MODIFIED StateManager.close() for checkpoint cleanup
src/cleveragents/acp/events.py MODIFIED AcpEventQueue.close()
features/security_async.feature NEW 14 BDD scenarios
features/steps/security_async_steps.py NEW Step definitions (67 steps)
docs/reference/async_safety.md NEW Reference documentation
robot/security_async.robot NEW Integration smoke tests
benchmarks/security_async_cleanup_bench.py NEW ASV benchmarks

Quality

  • 14 scenarios, 67 steps — all passing
  • Pyright: 0 errors
  • Ruff: all checks passed

Dependencies

  • Blocks issue #321 (this PR must be merged before the issue can be closed)
## Summary Closes #321. Adds a unified async resource lifecycle and cleanup subsystem. - **`AsyncResourceTracker`** (`core/async_cleanup.py`) — central registry for async resources with timeout-bounded `close_all()`, async context manager, and `__del__` finalizer that logs leaked resources by name - **`LangGraphBridge.cleanup_tasks_async()`** — awaits in-flight tasks with a deadline instead of fire-and-forget cancel; tracks `cancellation_reasons` - **`StateManager.close()`** — properly releases checkpoint file handles and completes the RxPY BehaviorSubject - **`AcpEventQueue.close()`** — disposes all subscriptions with logged count ## Files Changed | File | Status | Description | |------|--------|-------------| | `src/cleveragents/core/async_cleanup.py` | NEW | AsyncResourceTracker + AsyncResource protocol | | `src/cleveragents/langgraph/bridge.py` | MODIFIED | Graceful task cancellation with timeout | | `src/cleveragents/langgraph/state.py` | MODIFIED | StateManager.close() for checkpoint cleanup | | `src/cleveragents/acp/events.py` | MODIFIED | AcpEventQueue.close() | | `features/security_async.feature` | NEW | 14 BDD scenarios | | `features/steps/security_async_steps.py` | NEW | Step definitions (67 steps) | | `docs/reference/async_safety.md` | NEW | Reference documentation | | `robot/security_async.robot` | NEW | Integration smoke tests | | `benchmarks/security_async_cleanup_bench.py` | NEW | ASV benchmarks | ## Quality - 14 scenarios, 67 steps — all passing - Pyright: 0 errors - Ruff: all checks passed ## Dependencies - Blocks issue #321 (this PR must be merged before the issue can be closed)
freemo added this to the v3.3.0 milestone 2026-02-25 18:11:24 +00:00
brent.edwards approved these changes 2026-02-25 22:31:10 +00:00
Dismissed
brent.edwards left a comment

Approved so that you can merge.

Approved so that you can merge.
Member

Code Review — PR #435: fix(security): close async resources and leaks

Reviewer: @brent.edwards | Review type: Comment-only (not blocking)

Good work on this, Hamza. The AsyncResourceTracker pattern is solid and the test/bench/doc coverage is well above average for a first pass. I found a few issues that should be addressed, organized by severity per the review playbook.


P0 — Critical (must fix before merge)

#1 · src/cleveragents/core/async_cleanup.py:59–78register() accepts resources after close_all()

close_all() sets self._closed = True and clears _resources, but register() never checks _closed. Any resource registered after shutdown is silently accepted into the (now permanently empty) tracker and will never be closed — a permanent leak.

# Suggested fix — add at the top of register():
with self._lock:
    if self._closed:
        raise RuntimeError("Cannot register resource after tracker is closed")
    ...

The docs (async_safety.md line 42) and the idempotent close_all scenario both assume this can't happen, but nothing enforces it.


P1 — High (must fix before merge)

#2 · async_cleanup.py:104–123except Exception misses CancelledError

On Python ≥ 3.9, CancelledError inherits from BaseException, not Exception. If a resource's close() is cancelled (e.g., during interpreter shutdown or outer task cancellation), the except Exception on line 119 won't catch it — the exception propagates up, skipping all remaining resources. Consider:

except (Exception, asyncio.CancelledError):

or catching BaseException with appropriate re-raise logic.

#3 · langgraph/bridge.py:32,97cancellation_reasons dict leaks memory

cancellation_reasons uses Task objects as keys (strong references). Completed/cancelled tasks are never removed from this dict, so they (and their coroutine frames) are kept alive indefinitely. This is an unbounded memory leak proportional to the number of ever-cancelled tasks. Consider using a WeakKeyDictionary or explicitly discarding entries in cleanup_tasks_async() and the done_callback.

#4 · langgraph/state.py:109–125,151,169,182 — State mutation after close() silently corrupts

update_state(), reset(), load_checkpoint(), and time_travel() all call self.state_stream.on_next() without checking self.is_closed. After close() completes the BehaviorSubject via on_completed(), subsequent on_next() calls are silently dropped by RxPY. This means the in-memory self.state is mutated but subscribers never see the update — silent data corruption.

The docs say close() "prevents further state updates" (async_safety.md:84) but the code doesn't enforce this. Add a guard:

def update_state(self, ...):
    if self.is_closed:
        raise RuntimeError("StateManager is closed")
    ...

Same for reset(), load_checkpoint(), and time_travel().


P2 — Medium (fix in follow-up PR within 3 days)

#5 · async_cleanup.py:145–148__del__ risks AttributeError on partial construction

If __init__ fails partway through, __del__ will fire and self._resources may not exist yet, raising AttributeError during GC. Wrap in try/except AttributeError or use getattr(self, '_resources', {}).

#6 · async_cleanup.py:104–106 — Sequential close is O(n × timeout)

Resources are closed one-by-one. With 500 resources and a 30s timeout, worst case is 4+ hours. Consider asyncio.gather(*[wait_for(r.close(), timeout) for r in ...]) for concurrent close.

#7 · bridge.py:62,70–77 — Pending tasks orphaned after timeout

In cleanup_tasks_async(), tasks that land in pending after asyncio.wait() are logged but never re-cancelled, then _active_tasks.clear() drops all references. These orphaned tasks continue running in the background with no owner.

#8 · bridge.py:54,62,77 — Tasks added during await asyncio.wait() window are lost

tasks = list(self._active_tasks) snapshots at line 54, but new tasks can be added to _active_tasks during the await asyncio.wait() call. The _active_tasks.clear() on line 77 then silently drops them.

#9 · bridge.py:35–36__del__ lacks try/except guard

Same pattern as #5 — if __init__ fails partway or during interpreter shutdown, cleanup_tasks() may raise.

#10 · acp/events.py:42publish() after close() leaks events

close() clears _events and _subscriptions, but doesn't set any closed flag. Subsequent publish() calls will re-accumulate events in _events with no subscribers — an unbounded memory leak. Add an _is_closed flag consistent with StateManager.

#11 · acp/events.py:27–92 — No is_closed attribute

StateManager exposes is_closed for callers to check, but AcpEventQueue has no equivalent. Inconsistent API across the cleanup surface.


P3 — Low (author discretion)

#12 · async_cleanup.py:95–100 — Lock design note

The lock correctly serializes register() vs close_all() — but only contingent on fixing #1. Currently a thread can register() while another thread's close_all() has already set _closed and cleared the dict.

#13 · bridge.py:31,212,214_active_tasks is a plain set (GIL-dependent)

Thread safety relies on the GIL. If this code ever runs on a free-threaded build (PEP 703), the set mutations will race. Low priority since ≥3.13 still has GIL by default.

#14 · bridge.py:79–103cancel_task_with_reason doesn't discard from _active_tasks

After cancelling a task, it remains in _active_tasks until the done callback fires. If cleanup_tasks_async() runs before the callback, the task appears twice. Minor, since asyncio.wait handles this, but worth a comment.

#15 · state.py:190–192close() idempotency relies on plain bool

The if self.is_closed: return check is TOCTOU-racy if called from two threads simultaneously. Low priority since StateManager is typically single-threaded.

#16 · acp/events.py:88–89 — Non-atomic clear of two structures

_subscriptions.clear() then _events.clear() — a concurrent publish() between the two calls could see cleared subscriptions but non-cleared events. Minor since the code is single-threaded.


Test Coverage Gaps

T1 (aligns with #1, P0) — No scenario for register-after-close. The Behave suite has 14 scenarios but none calls register() after close_all(). This is the most critical missing test — it would have caught the P0 immediately.

T2 (aligns with #4, P1) — No scenario for state mutation after close. The tests verify is_closed is set, but never call update_state(), reset(), or time_travel() after close() to verify the guard.

T3 (aligns with #10, P2) — No scenario for publish-after-close on AcpEventQueue. Same pattern — verify behavior when the object is used after close().

T4 (aligns with #3, P1) — No scenario for cancellation_reasons cleanup. No test verifies that completed tasks are removed from the dict.

T5 — asyncio.new_event_loop() pattern in steps is fragile. Steps at lines 103, 141, 171, 177, 253, 265 create event loops that are not always closed on failure. Consider a shared fixture or after_scenario hook.

T6 — Log handler never removed. The _CapturingHandler attached in the Background step (line 45) is never removed, so handlers accumulate across scenarios. Add cleanup in after_scenario.

Benchmark Issue

B1 — TimeRegisterBatch crashes on second ASV iteration. time_register_100() registers names batch-0 through batch-99. Since number is not set to 1 (unlike TimeCloseAll), ASV will call this method multiple times per repeat. The second call raises ValueError on duplicate names. Fix: add number = 1 or reset the tracker in each call.

Doc Accuracy (async_safety.md)

D1 (aligns with #4) — Line 84 claims close() "prevents further state updates" — this is false. The code does not guard update_state() et al. Either fix the code (recommended) or correct the doc.

D2 — Thread Safety section (lines 90–92) doesn't mention the register-after-close gap. Readers will assume register() is safe at all times.


Summary

Severity Count Verdict
P0 1 Must fix
P1 3 Must fix
P2 7 Follow-up PR (3 days)
P3 5 Author discretion
Test gaps 6 T1 + T2 critical
Bench bug 1 Quick fix
Doc fixes 2 Align with code changes

The P0 (#1) and P1s (#2, #3, #4) should be resolved before merge. Everything else can go in a follow-up. Happy to re-review once those are addressed.

cc @hamza.khyari

## Code Review — PR #435: fix(security): close async resources and leaks **Reviewer:** @brent.edwards | **Review type:** Comment-only (not blocking) Good work on this, Hamza. The `AsyncResourceTracker` pattern is solid and the test/bench/doc coverage is well above average for a first pass. I found a few issues that should be addressed, organized by severity per the review playbook. --- ### P0 — Critical (must fix before merge) **#1 · `src/cleveragents/core/async_cleanup.py:59–78` — `register()` accepts resources after `close_all()`** `close_all()` sets `self._closed = True` and clears `_resources`, but `register()` never checks `_closed`. Any resource registered after shutdown is silently accepted into the (now permanently empty) tracker and will never be closed — a permanent leak. ```python # Suggested fix — add at the top of register(): with self._lock: if self._closed: raise RuntimeError("Cannot register resource after tracker is closed") ... ``` The docs (`async_safety.md` line 42) and the idempotent `close_all` scenario both assume this can't happen, but nothing enforces it. --- ### P1 — High (must fix before merge) **#2 · `async_cleanup.py:104–123` — `except Exception` misses `CancelledError`** On Python ≥ 3.9, `CancelledError` inherits from `BaseException`, not `Exception`. If a resource's `close()` is cancelled (e.g., during interpreter shutdown or outer task cancellation), the `except Exception` on line 119 won't catch it — the exception propagates up, skipping all remaining resources. Consider: ```python except (Exception, asyncio.CancelledError): ``` or catching `BaseException` with appropriate re-raise logic. **#3 · `langgraph/bridge.py:32,97` — `cancellation_reasons` dict leaks memory** `cancellation_reasons` uses `Task` objects as keys (strong references). Completed/cancelled tasks are never removed from this dict, so they (and their coroutine frames) are kept alive indefinitely. This is an unbounded memory leak proportional to the number of ever-cancelled tasks. Consider using a `WeakKeyDictionary` or explicitly discarding entries in `cleanup_tasks_async()` and the `done_callback`. **#4 · `langgraph/state.py:109–125,151,169,182` — State mutation after `close()` silently corrupts** `update_state()`, `reset()`, `load_checkpoint()`, and `time_travel()` all call `self.state_stream.on_next()` without checking `self.is_closed`. After `close()` completes the `BehaviorSubject` via `on_completed()`, subsequent `on_next()` calls are silently dropped by RxPY. This means the in-memory `self.state` is mutated but subscribers never see the update — silent data corruption. The docs say `close()` "prevents further state updates" (`async_safety.md:84`) but the code doesn't enforce this. Add a guard: ```python def update_state(self, ...): if self.is_closed: raise RuntimeError("StateManager is closed") ... ``` Same for `reset()`, `load_checkpoint()`, and `time_travel()`. --- ### P2 — Medium (fix in follow-up PR within 3 days) **#5 · `async_cleanup.py:145–148` — `__del__` risks `AttributeError` on partial construction** If `__init__` fails partway through, `__del__` will fire and `self._resources` may not exist yet, raising `AttributeError` during GC. Wrap in `try/except AttributeError` or use `getattr(self, '_resources', {})`. **#6 · `async_cleanup.py:104–106` — Sequential close is O(n × timeout)** Resources are closed one-by-one. With 500 resources and a 30s timeout, worst case is 4+ hours. Consider `asyncio.gather(*[wait_for(r.close(), timeout) for r in ...])` for concurrent close. **#7 · `bridge.py:62,70–77` — Pending tasks orphaned after timeout** In `cleanup_tasks_async()`, tasks that land in `pending` after `asyncio.wait()` are logged but never re-cancelled, then `_active_tasks.clear()` drops all references. These orphaned tasks continue running in the background with no owner. **#8 · `bridge.py:54,62,77` — Tasks added during `await asyncio.wait()` window are lost** `tasks = list(self._active_tasks)` snapshots at line 54, but new tasks can be added to `_active_tasks` during the `await asyncio.wait()` call. The `_active_tasks.clear()` on line 77 then silently drops them. **#9 · `bridge.py:35–36` — `__del__` lacks try/except guard** Same pattern as #5 — if `__init__` fails partway or during interpreter shutdown, `cleanup_tasks()` may raise. **#10 · `acp/events.py:42` — `publish()` after `close()` leaks events** `close()` clears `_events` and `_subscriptions`, but doesn't set any closed flag. Subsequent `publish()` calls will re-accumulate events in `_events` with no subscribers — an unbounded memory leak. Add an `_is_closed` flag consistent with `StateManager`. **#11 · `acp/events.py:27–92` — No `is_closed` attribute** `StateManager` exposes `is_closed` for callers to check, but `AcpEventQueue` has no equivalent. Inconsistent API across the cleanup surface. --- ### P3 — Low (author discretion) **#12 · `async_cleanup.py:95–100` — Lock design note** The lock correctly serializes `register()` vs `close_all()` — but only contingent on fixing #1. Currently a thread can `register()` while another thread's `close_all()` has already set `_closed` and cleared the dict. **#13 · `bridge.py:31,212,214` — `_active_tasks` is a plain `set` (GIL-dependent)** Thread safety relies on the GIL. If this code ever runs on a free-threaded build (PEP 703), the set mutations will race. Low priority since ≥3.13 still has GIL by default. **#14 · `bridge.py:79–103` — `cancel_task_with_reason` doesn't discard from `_active_tasks`** After cancelling a task, it remains in `_active_tasks` until the done callback fires. If `cleanup_tasks_async()` runs before the callback, the task appears twice. Minor, since `asyncio.wait` handles this, but worth a comment. **#15 · `state.py:190–192` — `close()` idempotency relies on plain `bool`** The `if self.is_closed: return` check is TOCTOU-racy if called from two threads simultaneously. Low priority since `StateManager` is typically single-threaded. **#16 · `acp/events.py:88–89` — Non-atomic clear of two structures** `_subscriptions.clear()` then `_events.clear()` — a concurrent `publish()` between the two calls could see cleared subscriptions but non-cleared events. Minor since the code is single-threaded. --- ### Test Coverage Gaps **T1 (aligns with #1, P0) — No scenario for register-after-close.** The Behave suite has 14 scenarios but none calls `register()` after `close_all()`. This is the most critical missing test — it would have caught the P0 immediately. **T2 (aligns with #4, P1) — No scenario for state mutation after close.** The tests verify `is_closed` is set, but never call `update_state()`, `reset()`, or `time_travel()` after `close()` to verify the guard. **T3 (aligns with #10, P2) — No scenario for publish-after-close on AcpEventQueue.** Same pattern — verify behavior when the object is used after `close()`. **T4 (aligns with #3, P1) — No scenario for cancellation_reasons cleanup.** No test verifies that completed tasks are removed from the dict. **T5 — `asyncio.new_event_loop()` pattern in steps is fragile.** Steps at lines 103, 141, 171, 177, 253, 265 create event loops that are not always closed on failure. Consider a shared fixture or `after_scenario` hook. **T6 — Log handler never removed.** The `_CapturingHandler` attached in the Background step (line 45) is never removed, so handlers accumulate across scenarios. Add cleanup in `after_scenario`. ### Benchmark Issue **B1 — `TimeRegisterBatch` crashes on second ASV iteration.** `time_register_100()` registers names `batch-0` through `batch-99`. Since `number` is not set to `1` (unlike `TimeCloseAll`), ASV will call this method multiple times per repeat. The second call raises `ValueError` on duplicate names. Fix: add `number = 1` or reset the tracker in each call. ### Doc Accuracy (`async_safety.md`) **D1 (aligns with #4) — Line 84 claims `close()` "prevents further state updates" — this is false.** The code does not guard `update_state()` et al. Either fix the code (recommended) or correct the doc. **D2 — Thread Safety section (lines 90–92) doesn't mention the register-after-close gap.** Readers will assume `register()` is safe at all times. --- ### Summary | Severity | Count | Verdict | |----------|-------|---------| | P0 | 1 | Must fix | | P1 | 3 | Must fix | | P2 | 7 | Follow-up PR (3 days) | | P3 | 5 | Author discretion | | Test gaps | 6 | T1 + T2 critical | | Bench bug | 1 | Quick fix | | Doc fixes | 2 | Align with code changes | The P0 (#1) and P1s (#2, #3, #4) should be resolved before merge. Everything else can go in a follow-up. Happy to re-review once those are addressed. cc @hamza.khyari
hamza.khyari force-pushed feature/m4-security-async-cleanup from d68171fd2c
All checks were successful
CI / lint (pull_request) Successful in 29s
CI / security (pull_request) Successful in 36s
CI / typecheck (pull_request) Successful in 37s
CI / quality (pull_request) Successful in 42s
CI / benchmark-publish (pull_request) Has been skipped
CI / build (pull_request) Successful in 24s
CI / integration_tests (pull_request) Successful in 4m59s
CI / unit_tests (pull_request) Successful in 15m13s
CI / docker (pull_request) Successful in 1m4s
CI / benchmark-regression (pull_request) Successful in 22m15s
CI / coverage (pull_request) Successful in 1h2m38s
to 376b681d70
Some checks failed
CI / lint (pull_request) Successful in 24s
CI / typecheck (pull_request) Successful in 1m0s
CI / security (pull_request) Successful in 53s
CI / quality (pull_request) Successful in 33s
CI / benchmark-publish (pull_request) Has been skipped
CI / build (pull_request) Successful in 24s
CI / integration_tests (pull_request) Has been cancelled
CI / unit_tests (pull_request) Has been cancelled
CI / docker (pull_request) Has been cancelled
CI / benchmark-regression (pull_request) Has been cancelled
CI / coverage (pull_request) Has been cancelled
2026-02-26 00:57:54 +00:00
Compare
hamza.khyari dismissed brent.edwards's review 2026-02-26 00:57:54 +00:00
Reason:

New commits pushed, approval review dismissed automatically according to repository settings

hamza.khyari force-pushed feature/m4-security-async-cleanup from 00793532ae
All checks were successful
CI / lint (pull_request) Successful in 22s
CI / typecheck (pull_request) Successful in 58s
CI / security (pull_request) Successful in 49s
CI / quality (pull_request) Successful in 29s
CI / benchmark-publish (pull_request) Has been skipped
CI / build (pull_request) Successful in 24s
CI / integration_tests (pull_request) Successful in 4m35s
CI / unit_tests (pull_request) Successful in 13m56s
CI / benchmark-regression (pull_request) Successful in 23m29s
CI / docker (pull_request) Successful in 1m3s
CI / coverage (pull_request) Successful in 1h13m19s
to c406781b86
All checks were successful
CI / benchmark-publish (pull_request) Has been skipped
CI / lint (pull_request) Successful in 15s
CI / quality (pull_request) Successful in 17s
CI / build (pull_request) Successful in 20s
CI / typecheck (pull_request) Successful in 41s
CI / security (pull_request) Successful in 57s
CI / integration_tests (pull_request) Successful in 2m53s
CI / unit_tests (pull_request) Successful in 19m17s
CI / docker (pull_request) Successful in 1m1s
CI / benchmark-regression (pull_request) Successful in 26m11s
CI / coverage (pull_request) Successful in 47m20s
2026-02-26 14:31:12 +00:00
Compare
hamza.khyari scheduled this pull request to auto merge when all checks succeed 2026-02-26 14:32:21 +00:00
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
3 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Blocks
Reference
cleveragents/cleveragents-core!435
No description provided.