feat(async): add async command execution and workers #312

Closed
opened 2026-02-22 23:41:12 +00:00 by freemo · 2 comments
Owner

Metadata

  • Commit Message: feat(async): add async command execution and workers
  • Branch: feature/m6-async-infra

Background

Async command execution follows ADR-002 with cancellation and timeout handling. AsyncJob model and async_jobs table track job lifecycle. AsyncJobStatus enum enforces valid transitions (queued → running → succeeded/failed/cancelled).

Acceptance Criteria

  • Implement async command execution per ADR-002 with cancellation and timeout handling.
  • Add AsyncJob model and async_jobs table (plan_id, phase, status, payload_json, created_at, started_at, finished_at).
  • Add AsyncJobStatus enum and enforce valid transitions (queued -> running -> succeeded/failed/cancelled).
  • Add worker_id and last_heartbeat fields; mark stuck jobs failed after TTL.
  • Add AsyncWorker orchestrator with polling loop, max_workers config, and graceful shutdown hooks.

Definition of Done

This issue is complete when:

  • All subtasks below are completed and checked off.
  • A Git commit is created where the first line of the commit message matches
    the Commit Message in Metadata exactly, followed by a blank line, then
    additional lines providing relevant details about the implementation. The
    commit body should be appropriate in size for a commit message and relatively
    complete in describing what was done.
  • The commit is pushed to the remote on the branch matching the Branch in
    Metadata exactly.
  • The commit is submitted as a pull request to master, reviewed, and
    merged before this issue is marked done.

Subtasks

  • Implement async command execution per ADR-002 with cancellation and timeout handling.
  • Add AsyncJob model and async_jobs table (plan_id, phase, status, payload_json, created_at, started_at, finished_at).
  • Add AsyncJobStatus enum and enforce valid transitions (queued -> running -> succeeded/failed/cancelled).
  • Add worker_id and last_heartbeat fields; mark stuck jobs failed after TTL.
  • Add AsyncWorker orchestrator with polling loop, max_workers config, and graceful shutdown hooks.
  • Add job enqueue hooks for plan execute/apply when async is enabled via config flag (no new CLI flags).
  • Add cancellation token support and ensure cancellation propagates to tool execution.
  • Add config keys async.enabled, async.max_workers, async.poll_interval, async.job_timeout, and async.job_ttl.
  • Add job cleanup routine to prune completed jobs older than a retention threshold.
  • Add worker health report (last_heartbeat, jobs processed) surfaced in agents diagnostics.
  • Add serialization of async payloads with schema version for forward compatibility.
  • Update docs/reference/async_architecture.md with execution flow, job states, and shutdown rules.
  • Document async config defaults and job retention policy.
  • Tests (Behave): Add features/async_execution.feature for async command handling (enqueue, worker pick-up, cancel).
  • Tests (Robot): Add robot/async_execution.robot smoke tests.
  • Tests (ASV): Add benchmarks/async_execution_bench.py for worker scheduling overhead.
  • Verify coverage >=97% via nox -s coverage_report.
  • Run nox (all default sessions, including benchmark), fix any errors.

Section: ### Section 8: Large Project Autonomy & Context [M6]
Status: Open

## Metadata - **Commit Message**: `feat(async): add async command execution and workers` - **Branch**: `feature/m6-async-infra` ## Background Async command execution follows ADR-002 with cancellation and timeout handling. `AsyncJob` model and `async_jobs` table track job lifecycle. `AsyncJobStatus` enum enforces valid transitions (queued → running → succeeded/failed/cancelled). ## Acceptance Criteria - [x] Implement async command execution per ADR-002 with cancellation and timeout handling. - [x] Add `AsyncJob` model and `async_jobs` table (plan_id, phase, status, payload_json, created_at, started_at, finished_at). - [x] Add `AsyncJobStatus` enum and enforce valid transitions (queued -> running -> succeeded/failed/cancelled). - [x] Add `worker_id` and `last_heartbeat` fields; mark stuck jobs failed after TTL. - [x] Add AsyncWorker orchestrator with polling loop, max_workers config, and graceful shutdown hooks. ## Definition of Done This issue is complete when: - All subtasks below are completed and checked off. - A Git commit is created where the **first line** of the commit message matches the Commit Message in Metadata exactly, followed by a blank line, then additional lines providing relevant details about the implementation. The commit body should be appropriate in size for a commit message and relatively complete in describing what was done. - The commit is pushed to the remote on the branch matching the **Branch** in Metadata exactly. - The commit is submitted as a **pull request** to `master`, reviewed, and **merged** before this issue is marked done. ## Subtasks - [x] Implement async command execution per ADR-002 with cancellation and timeout handling. - [x] Add `AsyncJob` model and `async_jobs` table (plan_id, phase, status, payload_json, created_at, started_at, finished_at). - [x] Add `AsyncJobStatus` enum and enforce valid transitions (queued -> running -> succeeded/failed/cancelled). - [x] Add `worker_id` and `last_heartbeat` fields; mark stuck jobs failed after TTL. - [x] Add AsyncWorker orchestrator with polling loop, max_workers config, and graceful shutdown hooks. - [x] Add job enqueue hooks for plan execute/apply when async is enabled via config flag (no new CLI flags). - [x] Add cancellation token support and ensure cancellation propagates to tool execution. - [x] Add config keys `async.enabled`, `async.max_workers`, `async.poll_interval`, `async.job_timeout`, and `async.job_ttl`. - [x] Add job cleanup routine to prune completed jobs older than a retention threshold. - [x] Add worker health report (last_heartbeat, jobs processed) surfaced in `agents diagnostics`. - [x] Add serialization of async payloads with schema version for forward compatibility. - [x] Update `docs/reference/async_architecture.md` with execution flow, job states, and shutdown rules. - [x] Document async config defaults and job retention policy. - [x] Tests (Behave): Add `features/async_execution.feature` for async command handling (enqueue, worker pick-up, cancel). - [x] Tests (Robot): Add `robot/async_execution.robot` smoke tests. - [x] Tests (ASV): Add `benchmarks/async_execution_bench.py` for worker scheduling overhead. - [x] Verify coverage >=97% via `nox -s coverage_report`. - [x] Run `nox` (all default sessions, including benchmark), fix any errors. **Section**: ### Section 8: Large Project Autonomy & Context [M6] **Status**: Open
freemo added this to the v3.5.0 milestone 2026-02-22 23:41:12 +00:00
Author
Owner

Expected completion updated (Day 15 rebaseline): Day 41 / 2026-03-21 (previously Day 36 / 2026-03-16)

**Expected completion updated (Day 15 rebaseline):** Day 41 / 2026-03-21 (previously Day 36 / 2026-03-16)
freemo added the due date 2026-03-07 2026-02-23 18:41:51 +00:00
freemo self-assigned this 2026-02-24 21:53:09 +00:00
freemo removed their assignment 2026-03-02 16:26:07 +00:00
Member

Implementation Notes — Issue #312

Design Decisions

  1. AsyncJob Domain Model: Implemented as a frozen dataclass in cleveragents.domain.models.core.async_job with AsyncJobStatus enum enforcing valid state transitions via a VALID_TRANSITIONS mapping. The model uses ULIDs for job identification, consistent with the project's identity strategy. Status transitions are validated at the domain level rather than the service level, following DDD principles.

  2. Database Schema: Added async_jobs table to the SQLAlchemy models in cleveragents.infrastructure.database.models with columns: job_id (PK), plan_id, phase, status, payload_json, created_at, started_at, finished_at, worker_id, last_heartbeat, schema_version. Created corresponding Alembic migration.

  3. AsyncWorker Service: Implemented in cleveragents.application.services.async_worker with:

    • Polling loop using configurable async.poll_interval (default 1.0s)
    • max_workers concurrency control (default 4)
    • Graceful shutdown via signal handler hooks
    • Stuck job detection: jobs with no heartbeat update beyond TTL are marked as failed
    • Cancellation token support with propagation to tool execution
    • Job cleanup routine pruning completed jobs beyond retention threshold
    • Worker health report surfaced through agents diagnostics
  4. Config Integration: Added settings under async.* namespace: enabled, max_workers, poll_interval, job_timeout, job_ttl. Settings follow the existing pattern in cleveragents.config.settings.

  5. Payload Serialization: Async payloads include schema_version field for forward compatibility. Version 1 schema defined.

Quality Results

Gate Result
lint PASS
typecheck PASS (0 errors, Pyright strict)
unit_tests PASS (256 features, 8141 scenarios)
integration_tests PASS (1141 tests)
coverage_report PASS (97.0%)

Key Code Locations

  • Domain model: cleveragents.domain.models.core.async_job.AsyncJob
  • Worker service: cleveragents.application.services.async_worker.AsyncWorker
  • Database model: cleveragents.infrastructure.database.models (async_jobs table)
  • Config keys: cleveragents.config.settings (async.* section)
  • Diagnostics: cleveragents.cli.commands.system (worker health report)
  • BDD tests: features/async_execution.feature + features/steps/async_execution_steps.py
  • Integration tests: robot/async_execution.robot
  • Benchmarks: benchmarks/async_execution_bench.py
  • Documentation: docs/reference/async_architecture.md

Files Changed (14 files, ~3138 lines added)

  • 9 new files: domain model, service, migration, docs, BDD feature+steps, Robot tests+helper, ASV benchmarks
  • 5 modified files: database models, settings, init, system CLI, vulture whitelist
## Implementation Notes — Issue #312 ### Design Decisions 1. **AsyncJob Domain Model**: Implemented as a frozen dataclass in `cleveragents.domain.models.core.async_job` with `AsyncJobStatus` enum enforcing valid state transitions via a `VALID_TRANSITIONS` mapping. The model uses ULIDs for job identification, consistent with the project's identity strategy. Status transitions are validated at the domain level rather than the service level, following DDD principles. 2. **Database Schema**: Added `async_jobs` table to the SQLAlchemy models in `cleveragents.infrastructure.database.models` with columns: job_id (PK), plan_id, phase, status, payload_json, created_at, started_at, finished_at, worker_id, last_heartbeat, schema_version. Created corresponding Alembic migration. 3. **AsyncWorker Service**: Implemented in `cleveragents.application.services.async_worker` with: - Polling loop using configurable `async.poll_interval` (default 1.0s) - `max_workers` concurrency control (default 4) - Graceful shutdown via signal handler hooks - Stuck job detection: jobs with no heartbeat update beyond TTL are marked as failed - Cancellation token support with propagation to tool execution - Job cleanup routine pruning completed jobs beyond retention threshold - Worker health report surfaced through `agents diagnostics` 4. **Config Integration**: Added settings under `async.*` namespace: `enabled`, `max_workers`, `poll_interval`, `job_timeout`, `job_ttl`. Settings follow the existing pattern in `cleveragents.config.settings`. 5. **Payload Serialization**: Async payloads include `schema_version` field for forward compatibility. Version 1 schema defined. ### Quality Results | Gate | Result | |------|--------| | lint | PASS | | typecheck | PASS (0 errors, Pyright strict) | | unit_tests | PASS (256 features, 8141 scenarios) | | integration_tests | PASS (1141 tests) | | coverage_report | PASS (97.0%) | ### Key Code Locations - Domain model: `cleveragents.domain.models.core.async_job.AsyncJob` - Worker service: `cleveragents.application.services.async_worker.AsyncWorker` - Database model: `cleveragents.infrastructure.database.models` (async_jobs table) - Config keys: `cleveragents.config.settings` (async.* section) - Diagnostics: `cleveragents.cli.commands.system` (worker health report) - BDD tests: `features/async_execution.feature` + `features/steps/async_execution_steps.py` - Integration tests: `robot/async_execution.robot` - Benchmarks: `benchmarks/async_execution_bench.py` - Documentation: `docs/reference/async_architecture.md` ### Files Changed (14 files, ~3138 lines added) - 9 new files: domain model, service, migration, docs, BDD feature+steps, Robot tests+helper, ASV benchmarks - 5 modified files: database models, settings, init, system CLI, vulture whitelist
Sign in to join this conversation.
No milestone
No project
No assignees
2 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

2026-03-07

Blocks
#369 Epic: Large Project Autonomy & Context
cleveragents/cleveragents-core
Depends on
Reference
cleveragents/cleveragents-core#312
No description provided.