fix(security): close async resources and leaks #435
No reviewers
Labels
No labels
auto/needs-reevaluation
controller-managed
auto/blocked-by-deps
auto/ci-timeout
auto/claimed-implementer
auto/claimed-merge
auto/claimed-reviewer
auto/driver-down
auto/invariant-violation
auto/last-attempt-tier-0
auto/last-attempt-tier-1
auto/last-attempt-tier-2
auto/last-attempt-tier-min
Automation Tracking
auto/needs-conflict-resolution
auto/needs-implementer
auto/postmortem
auto/ready-to-merge
auto/restart-throttled
auto/revert
auto/sentinel
auto/stale-inactivity
auto/unstable
Blocked
Bounty
$100
Bounty
$1000
Bounty
$10000
Bounty
$20
Bounty
$2000
Bounty
$250
Bounty
$50
Bounty
$500
Bounty
$5000
Bounty
$750
MoSCoW
Could have
MoSCoW
Must have
MoSCoW
Should have
Needs Feedback
Points
1
Points
13
Points
2
Points
21
Points
3
Points
34
Points
5
Points
55
Points
8
Points
88
Priority
Backlog
Priority
CI Blocker
Priority
Critical
Priority
High
Priority
Low
Priority
Medium
Signed-off: Owner
Signed-off: Scrum Master
Signed-off: Tech Lead
Spike
State
Completed
State
Duplicate
State
In Progress
State
In Review
State
Paused
State
Unverified
State
Verified
State
Wont Do
Type
Automation
Type
Bug
Type
Discussion
Type
Documentation
Type
Epic
Type
Feature
Type
Legendary
Type
Refactor
Type
Support
Type
Task
Type
Testing
No project
No assignees
3 participants
Notifications
Due date
No due date set.
Blocks
#321 fix(security): close async resources and leaks
cleveragents/cleveragents-core
Reference
cleveragents/cleveragents-core!435
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "feature/m4-security-async-cleanup"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
Closes #321. Adds a unified async resource lifecycle and cleanup subsystem.
AsyncResourceTracker(core/async_cleanup.py) — central registry for async resources with timeout-boundedclose_all(), async context manager, and__del__finalizer that logs leaked resources by nameLangGraphBridge.cleanup_tasks_async()— awaits in-flight tasks with a deadline instead of fire-and-forget cancel; trackscancellation_reasonsStateManager.close()— properly releases checkpoint file handles and completes the RxPY BehaviorSubjectAcpEventQueue.close()— disposes all subscriptions with logged countFiles Changed
src/cleveragents/core/async_cleanup.pysrc/cleveragents/langgraph/bridge.pysrc/cleveragents/langgraph/state.pysrc/cleveragents/acp/events.pyfeatures/security_async.featurefeatures/steps/security_async_steps.pydocs/reference/async_safety.mdrobot/security_async.robotbenchmarks/security_async_cleanup_bench.pyQuality
Dependencies
Approved so that you can merge.
Code Review — PR #435: fix(security): close async resources and leaks
Reviewer: @brent.edwards | Review type: Comment-only (not blocking)
Good work on this, Hamza. The
AsyncResourceTrackerpattern is solid and the test/bench/doc coverage is well above average for a first pass. I found a few issues that should be addressed, organized by severity per the review playbook.P0 — Critical (must fix before merge)
#1 ·
src/cleveragents/core/async_cleanup.py:59–78—register()accepts resources afterclose_all()close_all()setsself._closed = Trueand clears_resources, butregister()never checks_closed. Any resource registered after shutdown is silently accepted into the (now permanently empty) tracker and will never be closed — a permanent leak.The docs (
async_safety.mdline 42) and the idempotentclose_allscenario both assume this can't happen, but nothing enforces it.P1 — High (must fix before merge)
#2 ·
async_cleanup.py:104–123—except ExceptionmissesCancelledErrorOn Python ≥ 3.9,
CancelledErrorinherits fromBaseException, notException. If a resource'sclose()is cancelled (e.g., during interpreter shutdown or outer task cancellation), theexcept Exceptionon line 119 won't catch it — the exception propagates up, skipping all remaining resources. Consider:or catching
BaseExceptionwith appropriate re-raise logic.#3 ·
langgraph/bridge.py:32,97—cancellation_reasonsdict leaks memorycancellation_reasonsusesTaskobjects as keys (strong references). Completed/cancelled tasks are never removed from this dict, so they (and their coroutine frames) are kept alive indefinitely. This is an unbounded memory leak proportional to the number of ever-cancelled tasks. Consider using aWeakKeyDictionaryor explicitly discarding entries incleanup_tasks_async()and thedone_callback.#4 ·
langgraph/state.py:109–125,151,169,182— State mutation afterclose()silently corruptsupdate_state(),reset(),load_checkpoint(), andtime_travel()all callself.state_stream.on_next()without checkingself.is_closed. Afterclose()completes theBehaviorSubjectviaon_completed(), subsequenton_next()calls are silently dropped by RxPY. This means the in-memoryself.stateis mutated but subscribers never see the update — silent data corruption.The docs say
close()"prevents further state updates" (async_safety.md:84) but the code doesn't enforce this. Add a guard:Same for
reset(),load_checkpoint(), andtime_travel().P2 — Medium (fix in follow-up PR within 3 days)
#5 ·
async_cleanup.py:145–148—__del__risksAttributeErroron partial constructionIf
__init__fails partway through,__del__will fire andself._resourcesmay not exist yet, raisingAttributeErrorduring GC. Wrap intry/except AttributeErroror usegetattr(self, '_resources', {}).#6 ·
async_cleanup.py:104–106— Sequential close is O(n × timeout)Resources are closed one-by-one. With 500 resources and a 30s timeout, worst case is 4+ hours. Consider
asyncio.gather(*[wait_for(r.close(), timeout) for r in ...])for concurrent close.#7 ·
bridge.py:62,70–77— Pending tasks orphaned after timeoutIn
cleanup_tasks_async(), tasks that land inpendingafterasyncio.wait()are logged but never re-cancelled, then_active_tasks.clear()drops all references. These orphaned tasks continue running in the background with no owner.#8 ·
bridge.py:54,62,77— Tasks added duringawait asyncio.wait()window are losttasks = list(self._active_tasks)snapshots at line 54, but new tasks can be added to_active_tasksduring theawait asyncio.wait()call. The_active_tasks.clear()on line 77 then silently drops them.#9 ·
bridge.py:35–36—__del__lacks try/except guardSame pattern as #5 — if
__init__fails partway or during interpreter shutdown,cleanup_tasks()may raise.#10 ·
acp/events.py:42—publish()afterclose()leaks eventsclose()clears_eventsand_subscriptions, but doesn't set any closed flag. Subsequentpublish()calls will re-accumulate events in_eventswith no subscribers — an unbounded memory leak. Add an_is_closedflag consistent withStateManager.#11 ·
acp/events.py:27–92— Nois_closedattributeStateManagerexposesis_closedfor callers to check, butAcpEventQueuehas no equivalent. Inconsistent API across the cleanup surface.P3 — Low (author discretion)
#12 ·
async_cleanup.py:95–100— Lock design noteThe lock correctly serializes
register()vsclose_all()— but only contingent on fixing #1. Currently a thread canregister()while another thread'sclose_all()has already set_closedand cleared the dict.#13 ·
bridge.py:31,212,214—_active_tasksis a plainset(GIL-dependent)Thread safety relies on the GIL. If this code ever runs on a free-threaded build (PEP 703), the set mutations will race. Low priority since ≥3.13 still has GIL by default.
#14 ·
bridge.py:79–103—cancel_task_with_reasondoesn't discard from_active_tasksAfter cancelling a task, it remains in
_active_tasksuntil the done callback fires. Ifcleanup_tasks_async()runs before the callback, the task appears twice. Minor, sinceasyncio.waithandles this, but worth a comment.#15 ·
state.py:190–192—close()idempotency relies on plainboolThe
if self.is_closed: returncheck is TOCTOU-racy if called from two threads simultaneously. Low priority sinceStateManageris typically single-threaded.#16 ·
acp/events.py:88–89— Non-atomic clear of two structures_subscriptions.clear()then_events.clear()— a concurrentpublish()between the two calls could see cleared subscriptions but non-cleared events. Minor since the code is single-threaded.Test Coverage Gaps
T1 (aligns with #1, P0) — No scenario for register-after-close. The Behave suite has 14 scenarios but none calls
register()afterclose_all(). This is the most critical missing test — it would have caught the P0 immediately.T2 (aligns with #4, P1) — No scenario for state mutation after close. The tests verify
is_closedis set, but never callupdate_state(),reset(), ortime_travel()afterclose()to verify the guard.T3 (aligns with #10, P2) — No scenario for publish-after-close on AcpEventQueue. Same pattern — verify behavior when the object is used after
close().T4 (aligns with #3, P1) — No scenario for cancellation_reasons cleanup. No test verifies that completed tasks are removed from the dict.
T5 —
asyncio.new_event_loop()pattern in steps is fragile. Steps at lines 103, 141, 171, 177, 253, 265 create event loops that are not always closed on failure. Consider a shared fixture orafter_scenariohook.T6 — Log handler never removed. The
_CapturingHandlerattached in the Background step (line 45) is never removed, so handlers accumulate across scenarios. Add cleanup inafter_scenario.Benchmark Issue
B1 —
TimeRegisterBatchcrashes on second ASV iteration.time_register_100()registers namesbatch-0throughbatch-99. Sincenumberis not set to1(unlikeTimeCloseAll), ASV will call this method multiple times per repeat. The second call raisesValueErroron duplicate names. Fix: addnumber = 1or reset the tracker in each call.Doc Accuracy (
async_safety.md)D1 (aligns with #4) — Line 84 claims
close()"prevents further state updates" — this is false. The code does not guardupdate_state()et al. Either fix the code (recommended) or correct the doc.D2 — Thread Safety section (lines 90–92) doesn't mention the register-after-close gap. Readers will assume
register()is safe at all times.Summary
The P0 (#1) and P1s (#2, #3, #4) should be resolved before merge. Everything else can go in a follow-up. Happy to re-review once those are addressed.
cc @hamza.khyari
d68171fd2c376b681d70New commits pushed, approval review dismissed automatically according to repository settings
00793532aec406781b86