feat(acms): implement pipeline orchestrator and Phase 1 components (StrategySelector, BudgetAllocator, StrategyExecutor) #539

Closed
opened 2026-03-04 00:59:24 +00:00 by freemo · 1 comment
Owner

Metadata

  • Commit Message: feat(acms): implement pipeline orchestrator and Phase 1 components
  • Branch: feature/m5-pipeline-orchestrator-phase1
Field Value
Type Feature
Priority Critical
MoSCoW Must Have
Points 13
Milestone v3.4.0
Assignee freemo
Parent Epic #396 (Epic: ACMS Context Pipeline)
Depends On #538 (ContextFragment model), #191 (context strategy registry), #188 (ACMS v1 pipeline)
Blocks Phase 2 pipeline components, all context strategy implementations

Background

The specification (§ Architecture > ACMS > Context Assembly Pipeline, lines 42613-42653) defines a 10-component pluggable pipeline orchestrated by a ContextAssemblyPipeline mediator. Each component has a Protocol and a default implementation. This issue covers the orchestrator and the first 3 components that form "Phase 1" (strategy selection and execution):

  1. ContextAssemblyPipeline — Mediator that wires and invokes all 10 components in sequence.
  2. StrategySelector (StrategySelectorProtocolConfidenceWeightedSelector) — Selects which context strategies to run based on confidence weights and available backends.
  3. BudgetAllocator (BudgetAllocatorProtocolProportionalBudgetAllocator) — Splits the total token budget proportionally across selected strategies.
  4. StrategyExecutor (StrategyExecutorProtocolParallelStrategyExecutor) — Executes selected strategies in parallel with circuit breaking and timeout.

Existing issue #192 ("strategy coordinator and fusion engine") predates the 10-component pipeline spec and covers related but different terminology. This issue supersedes #192's orchestrator aspect.

Acceptance Criteria

  1. ContextAssemblyPipeline class exists implementing the full 10-stage pipeline flow.
  2. Pipeline stages are injected via Protocol-typed constructor params (pluggable).
  3. StrategySelectorProtocol defined; ConfidenceWeightedSelector default impl selects strategies by confidence score × backend availability.
  4. BudgetAllocatorProtocol defined; ProportionalBudgetAllocator splits budget proportional to strategy weights.
  5. StrategyExecutorProtocol defined; ParallelStrategyExecutor runs strategies concurrently with per-strategy timeout and circuit breaker.
  6. Pipeline accepts a ContextRequest and returns an assembled context string.
  7. Each component is independently replaceable via configuration (scope chain: plan > project > global).
  8. Config keys in config.toml for each component's implementation class.

Subtasks

1. Design

  • Define the 10-stage pipeline flow diagram
  • Design Protocol interfaces for StrategySelector, BudgetAllocator, StrategyExecutor
  • Design config resolution for pluggable components

2. Implementation

  • Create ContextAssemblyPipeline mediator class
  • Create StrategySelectorProtocol and ConfidenceWeightedSelector
  • Create BudgetAllocatorProtocol and ProportionalBudgetAllocator
  • Create StrategyExecutorProtocol and ParallelStrategyExecutor
  • Wire default implementations via DI/config

3. Testing

  • Unit tests for each component in isolation
  • Integration test for full pipeline flow
  • Test pluggability (swap a component, verify behavior change)
  • Test circuit breaker behavior under strategy failure

4. Documentation

  • Protocol docstrings with spec references
  • Config.toml example entries

5. Integration

  • Wire pipeline into existing ACMS entry points
  • Verify compatibility with existing strategy registry (#191)

6. Observability

  • Structured logging for each pipeline stage
  • Timing metrics per component

7. Security

  • Budget enforcement prevents unbounded context assembly

Definition of Done

  • All acceptance criteria met
  • All subtask checkboxes checked
  • Tests pass in CI
  • Code reviewed and approved
  • No regressions
## Metadata - **Commit Message**: `feat(acms): implement pipeline orchestrator and Phase 1 components` - **Branch**: `feature/m5-pipeline-orchestrator-phase1` | Field | Value | |-------|-------| | **Type** | Feature | | **Priority** | Critical | | **MoSCoW** | Must Have | | **Points** | 13 | | **Milestone** | v3.4.0 | | **Assignee** | freemo | | **Parent Epic** | #396 (Epic: ACMS Context Pipeline) | | **Depends On** | #538 (ContextFragment model), #191 (context strategy registry), #188 (ACMS v1 pipeline) | | **Blocks** | Phase 2 pipeline components, all context strategy implementations | ## Background The specification (§ Architecture > ACMS > Context Assembly Pipeline, lines 42613-42653) defines a **10-component pluggable pipeline** orchestrated by a `ContextAssemblyPipeline` mediator. Each component has a Protocol and a default implementation. This issue covers the orchestrator and the first 3 components that form "Phase 1" (strategy selection and execution): 1. **ContextAssemblyPipeline** — Mediator that wires and invokes all 10 components in sequence. 2. **StrategySelector** (`StrategySelectorProtocol` → `ConfidenceWeightedSelector`) — Selects which context strategies to run based on confidence weights and available backends. 3. **BudgetAllocator** (`BudgetAllocatorProtocol` → `ProportionalBudgetAllocator`) — Splits the total token budget proportionally across selected strategies. 4. **StrategyExecutor** (`StrategyExecutorProtocol` → `ParallelStrategyExecutor`) — Executes selected strategies in parallel with circuit breaking and timeout. Existing issue #192 ("strategy coordinator and fusion engine") predates the 10-component pipeline spec and covers related but different terminology. This issue supersedes #192's orchestrator aspect. ## Acceptance Criteria 1. `ContextAssemblyPipeline` class exists implementing the full 10-stage pipeline flow. 2. Pipeline stages are injected via Protocol-typed constructor params (pluggable). 3. `StrategySelectorProtocol` defined; `ConfidenceWeightedSelector` default impl selects strategies by confidence score × backend availability. 4. `BudgetAllocatorProtocol` defined; `ProportionalBudgetAllocator` splits budget proportional to strategy weights. 5. `StrategyExecutorProtocol` defined; `ParallelStrategyExecutor` runs strategies concurrently with per-strategy timeout and circuit breaker. 6. Pipeline accepts a `ContextRequest` and returns an assembled context string. 7. Each component is independently replaceable via configuration (scope chain: plan > project > global). 8. Config keys in `config.toml` for each component's implementation class. ## Subtasks ### 1. Design - [ ] Define the 10-stage pipeline flow diagram - [ ] Design Protocol interfaces for StrategySelector, BudgetAllocator, StrategyExecutor - [ ] Design config resolution for pluggable components ### 2. Implementation - [ ] Create `ContextAssemblyPipeline` mediator class - [ ] Create `StrategySelectorProtocol` and `ConfidenceWeightedSelector` - [ ] Create `BudgetAllocatorProtocol` and `ProportionalBudgetAllocator` - [ ] Create `StrategyExecutorProtocol` and `ParallelStrategyExecutor` - [ ] Wire default implementations via DI/config ### 3. Testing - [ ] Unit tests for each component in isolation - [ ] Integration test for full pipeline flow - [ ] Test pluggability (swap a component, verify behavior change) - [ ] Test circuit breaker behavior under strategy failure ### 4. Documentation - [ ] Protocol docstrings with spec references - [ ] Config.toml example entries ### 5. Integration - [ ] Wire pipeline into existing ACMS entry points - [ ] Verify compatibility with existing strategy registry (#191) ### 6. Observability - [ ] Structured logging for each pipeline stage - [ ] Timing metrics per component ### 7. Security - [ ] Budget enforcement prevents unbounded context assembly ## Definition of Done - [ ] All acceptance criteria met - [ ] All subtask checkboxes checked - [ ] Tests pass in CI - [ ] Code reviewed and approved - [ ] No regressions
freemo added this to the v3.2.0 milestone 2026-03-04 00:59:50 +00:00
freemo modified the milestone from v3.2.0 to v3.4.0 2026-03-04 01:09:35 +00:00
freemo self-assigned this 2026-03-04 01:41:12 +00:00
Author
Owner

Implementation Notes

PR: #602
Branch: feature/m5-pipeline-orchestrator-phase1
Commit: ee8f74bafeat(acms): implement pipeline orchestrator and Phase 1 components

Source: src/cleveragents/application/services/acms_pipeline.py (680 lines)

Six components implemented per spec §42613-42729 and §42918-42945:

  1. ConfidenceWeightedSelector — Implements StrategySelector protocol. Scores strategies via can_handle(), applies optional preference_boost multiplier for preferred strategies (capped at 1.0), filters zero-confidence entries, returns descending-sorted (strategy, confidence) pairs.

  2. ProportionalBudgetAllocator — Implements BudgetAllocator protocol. Distributes token budget proportionally to confidence scores with largest-remainder integer rounding. Enforces min_useful_budget (default 64 tokens) to prevent fragmentation. Three fallback paths: equal split for zero-confidence, best-candidate for all-excluded, direct pass-through for single candidate.

  3. ParallelStrategyExecutor — Executes strategies via ThreadPoolExecutor(max_workers=4) with configurable per-strategy timeout (default 30s). Single-strategy fast path avoids thread pool overhead. Failed strategies log warnings and return empty — no pipeline failure. Integrates circuit breaker for consecutive failure tracking.

  4. CircuitBreaker — Regular class (NOT dataclass/Pydantic — required by architecture.feature:38). Tracks consecutive failures per strategy name. Opens after failure_threshold (default 5) failures. record_success() resets counter. Supports reset(name) and reset_all().

  5. StageTimings — Frozen Pydantic BaseModel with 10 named float fields (one per pipeline stage), all defaulting to 0.0ms. Fields: strategy_selection_ms, budget_allocation_ms, strategy_execution_ms, deduplication_ms, depth_resolution_ms, scoring_ms, packing_ms, ordering_ms, preamble_generation_ms, total_ms.

  6. ContextAssemblyPipeline — Extends existing ACMSPipeline (inherits strategy registry, dedup, scorer, packer, default strategies). Adds Phase 1 production components (selector, allocator, executor) injected via constructor. assemble() validates plan_id as ULID, runs 3-phase pipeline with per-stage timing, exposes last_timings property.

Design Decisions

  • Extended ACMSPipeline rather than replacing it to maintain backward compatibility with existing 64 scenarios in features/acms_pipeline.feature
  • CircuitBreaker uses regular class (not dataclass) because features/architecture.feature:38 prohibits @dataclass in src/
  • Single-strategy fast path in executor avoids unnecessary ThreadPoolExecutor overhead for the common case
  • Pipeline validates ULID format for plan_id (26 chars, Crockford's base32) to catch misuse early

Test Coverage

  • 28 Behave scenarios (features/acms_pipeline_orchestrator.feature): selector, allocator, circuit breaker, executor (single + parallel + failure), pipeline integration, timings, edge cases
  • 9 Robot Framework tests (robot/acms_pipeline_orchestrator.robot): module imports, component creation, execution flow, pipeline assembly
  • ASV benchmarks (benchmarks/acms_pipeline_orchestrator_bench.py): latency benchmarks for all components

Verification

Check Result
nox -s lint 0 issues
nox -s typecheck 0 errors (pyright strict)
nox -s unit_tests 8528 scenarios, 0 failures
nox -s integration_tests 1203 tests, 0 failures
nox -s coverage_report 97.0% (≥97% threshold)
Architecture test No @dataclass in src/
## Implementation Notes **PR**: #602 **Branch**: `feature/m5-pipeline-orchestrator-phase1` **Commit**: `ee8f74ba` — `feat(acms): implement pipeline orchestrator and Phase 1 components` ### Source: `src/cleveragents/application/services/acms_pipeline.py` (680 lines) Six components implemented per spec §42613-42729 and §42918-42945: 1. **`ConfidenceWeightedSelector`** — Implements `StrategySelector` protocol. Scores strategies via `can_handle()`, applies optional `preference_boost` multiplier for preferred strategies (capped at 1.0), filters zero-confidence entries, returns descending-sorted `(strategy, confidence)` pairs. 2. **`ProportionalBudgetAllocator`** — Implements `BudgetAllocator` protocol. Distributes token budget proportionally to confidence scores with largest-remainder integer rounding. Enforces `min_useful_budget` (default 64 tokens) to prevent fragmentation. Three fallback paths: equal split for zero-confidence, best-candidate for all-excluded, direct pass-through for single candidate. 3. **`ParallelStrategyExecutor`** — Executes strategies via `ThreadPoolExecutor(max_workers=4)` with configurable per-strategy timeout (default 30s). Single-strategy fast path avoids thread pool overhead. Failed strategies log warnings and return empty — no pipeline failure. Integrates circuit breaker for consecutive failure tracking. 4. **`CircuitBreaker`** — Regular class (NOT dataclass/Pydantic — required by `architecture.feature:38`). Tracks consecutive failures per strategy name. Opens after `failure_threshold` (default 5) failures. `record_success()` resets counter. Supports `reset(name)` and `reset_all()`. 5. **`StageTimings`** — Frozen Pydantic BaseModel with 10 named float fields (one per pipeline stage), all defaulting to 0.0ms. Fields: `strategy_selection_ms`, `budget_allocation_ms`, `strategy_execution_ms`, `deduplication_ms`, `depth_resolution_ms`, `scoring_ms`, `packing_ms`, `ordering_ms`, `preamble_generation_ms`, `total_ms`. 6. **`ContextAssemblyPipeline`** — Extends existing `ACMSPipeline` (inherits strategy registry, dedup, scorer, packer, default strategies). Adds Phase 1 production components (selector, allocator, executor) injected via constructor. `assemble()` validates plan_id as ULID, runs 3-phase pipeline with per-stage timing, exposes `last_timings` property. ### Design Decisions - Extended `ACMSPipeline` rather than replacing it to maintain backward compatibility with existing 64 scenarios in `features/acms_pipeline.feature` - `CircuitBreaker` uses regular class (not dataclass) because `features/architecture.feature:38` prohibits `@dataclass` in `src/` - Single-strategy fast path in executor avoids unnecessary `ThreadPoolExecutor` overhead for the common case - Pipeline validates ULID format for plan_id (26 chars, Crockford's base32) to catch misuse early ### Test Coverage - **28 Behave scenarios** (`features/acms_pipeline_orchestrator.feature`): selector, allocator, circuit breaker, executor (single + parallel + failure), pipeline integration, timings, edge cases - **9 Robot Framework tests** (`robot/acms_pipeline_orchestrator.robot`): module imports, component creation, execution flow, pipeline assembly - **ASV benchmarks** (`benchmarks/acms_pipeline_orchestrator_bench.py`): latency benchmarks for all components ### Verification | Check | Result | |-------|--------| | `nox -s lint` | ✅ 0 issues | | `nox -s typecheck` | ✅ 0 errors (pyright strict) | | `nox -s unit_tests` | ✅ 8528 scenarios, 0 failures | | `nox -s integration_tests` | ✅ 1203 tests, 0 failures | | `nox -s coverage_report` | ✅ 97.0% (≥97% threshold) | | Architecture test | ✅ No `@dataclass` in `src/` |
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Reference
cleveragents/cleveragents-core#539
No description provided.