No description
.docker | ||
ci | ||
docs | ||
examples | ||
src/cleveragents | ||
tests | ||
.bumpversion.cfg | ||
.cookiecutterrc | ||
.coveragerc | ||
.cz-config.js | ||
.cz.json | ||
.editorconfig | ||
.gitattributes | ||
.gitignore | ||
.python-version | ||
ATTRIBUTIONS.rst | ||
CHANGELOG.rst | ||
CODE_OF_CONDUCT.rst | ||
CONTRIBUTING.rst | ||
CONTRIBUTORS.rst | ||
docker-compose.yml | ||
Dockerfile | ||
Dockerfile.dev | ||
LICENSE | ||
MANIFEST.in | ||
NOTICE | ||
README.rst | ||
setup.cfg | ||
setup.py | ||
tox.ini |
============ CleverAgents ============ A powerful, reactive Agent Framework using RxPy streams for complex AI agent orchestration and message routing. .. image:: https://img.shields.io/pypi/v/cleveragents.svg :target: https://pypi.org/project/cleveragents/ :alt: PyPI Package .. image:: https://img.shields.io/travis/cleverthis/cleveragents.svg :target: https://travis-ci.org/cleverthis/cleveragents :alt: Travis-CI Build Status 🌊 Reactive Architecture ======================== CleverAgents is built on **RxPy reactive streams**, providing powerful stream processing capabilities: - **Full RxPy Integration**: Access to all RxPy operators (map, filter, merge, split, buffer, throttle, etc.) - **Hot & Cold Streams**: Support for different stream types with replay capabilities - **Async/Await Support**: Native async processing throughout the pipeline - **Stream Composition**: Complex routing patterns with merge and split operations 🚀 Unified Routes System ======================== CleverAgents provides a unified "routes" system that combines streams and graphs under a single configuration section. This provides a consistent interface for all data flow patterns. **Key Benefits:** - **Unified Interface**: Single "routes" section for all data flow - **Clear Type System**: Explicit type field makes intent clear - **Dynamic Adaptation**: Routes can convert between types at runtime - **Better Organization**: Related concepts in one place - **Complexity Guidance**: Built-in analyzer helps choose route type **Route Configuration Format:** .. code-block:: yaml routes: # Stream route my_stream: type: stream # REQUIRED field stream_type: cold operators: - type: map params: agent: my_agent # Graph route my_graph: type: graph # REQUIRED field nodes: process: type: agent agent: processor edges: - source: start target: process - source: process target: end **Route Types:** 1. **Stream Routes**: For reactive, stateless processing 2. **Graph Routes**: For stateful workflows with conditional logic 3. **Bridge Routes**: For dynamic type conversion **Dynamic Type Conversion:** .. code-block:: yaml routes: adaptive_route: type: stream operators: - type: map params: agent: processor bridge: upgrade_conditions: needs_state: true message_count: 5 downgrade_conditions: idle_time: 300 **Important Note**: The system does NOT maintain backward compatibility. The old ``streams:`` and ``graphs:`` sections are completely removed. Only the new ``routes:`` section is supported. 🔄 Why RxPy + LangGraph? ======================== CleverAgents uniquely combines RxPy's reactive streams with LangGraph's stateful workflows. This integration is more powerful than either library alone: **What Only RxPy Can Do:** 1. **Real-time Stream Processing** .. code-block:: yaml # Throttle API calls to 10/second while buffering bursts operators: - type: throttle params: {duration: 0.1} - type: buffer params: {time: 1.0} 2. **Hot Streams with Live Updates** .. code-block:: yaml # Dashboard that shows latest value to new subscribers dashboard_feed: type: hot initial_value: "System OK" 3. **Time-Window Aggregations** .. code-block:: yaml # Calculate 5-minute rolling averages operators: - type: buffer params: {time: 300} - type: scan params: {accumulator: {type: average}} **What Only LangGraph Can Do:** 1. **Stateful Conversations with Memory** .. code-block:: yaml # Remember context across messages graphs: chat: checkpointing: true enable_time_travel: true 2. **Conditional Workflow Routing** .. code-block:: yaml # Route based on runtime decisions edges: - source: classify target: urgent_handler condition: {type: priority_check} 3. **Iterative Refinement Loops** .. code-block:: yaml # Retry until quality threshold met edges: - source: review target: refine condition: {type: needs_improvement} - source: refine target: review # Loop back **The Power of Both Together:** 1. **Streaming LangGraph Results**: Process partial results from long-running graphs in real-time 2. **Reactive Graph Triggers**: Use RxPy's debounce to prevent graph overload while maintaining responsiveness 3. **Windowed State Checkpoints**: Checkpoint graph state only when stream windows complete 4. **Parallel Stream-Graph Pipelines**: Run multiple graphs in parallel based on stream splits This combination enables building AI systems that are both **reactive** (responding to real-time events) and **stateful** (maintaining context and memory) - something neither library can achieve alone. 🎯 LangGraph Integration ======================== CleverAgents provides full LangGraph integration, allowing you to combine stateful workflows with reactive stream processing: **Key Features:** - **Stateful Workflows**: Build complex, stateful agent graphs with memory and context - **Conditional Routing**: Dynamic flow control based on state and conditions - **Checkpointing**: Save and restore graph state with time travel capabilities - **Cycles and Loops**: Support for iterative refinement and feedback loops - **Subgraphs**: Compose complex workflows from reusable components - **Parallel Execution**: Run multiple nodes concurrently for better performance - **Seamless RxPy Integration**: Use LangGraphs as RxPy operators and vice versa **LangGraph Definition in YAML:** .. code-block:: yaml routes: my_workflow: type: graph # Required field name: my_workflow entry_point: start checkpointing: true enable_time_travel: true nodes: process: type: agent agent: my_agent validate: type: function function: validate route: type: conditional condition: type: has_messages edges: - source: start target: process - source: process target: validate - source: validate target: route - source: route target: end condition: type: metadata_check key: valid value: true **Node Types:** - **Agent Nodes**: Execute CleverAgents agents - **Function Nodes**: Run built-in or custom functions - **Tool Nodes**: Execute tools with parallel support - **Conditional Nodes**: Make routing decisions - **Subgraph Nodes**: Embed other graphs **State Management:** LangGraph state flows through the graph and can be: - Checkpointed for persistence - Restored from checkpoints - Time-traveled to previous states - Updated incrementally or replaced **Using LangGraphs as RxPy Operators:** .. code-block:: yaml routes: my_stream: type: stream stream_type: cold operators: - type: graph_execute params: graph: my_workflow - type: state_checkpoint params: graph: my_workflow **Hybrid Pipelines:** Combine RxPy streams and LangGraphs in complex pipelines: .. code-block:: yaml pipelines: hybrid_flow: stages: - type: stream name: preprocess operators: - type: debounce params: duration: 0.5 - type: graph config: name: process_graph nodes: analyze: type: agent agent: analyzer input_from: preprocess output_to: postprocess - type: stream name: postprocess operators: - type: buffer params: count: 5 **CLI Graph Commands:** .. code-block:: bash # Execute graphs interactively cleveragents interactive -c my_config.yaml # Then in the interactive session: /graph my_workflow "Process this message" **Programmatic Usage:** .. code-block:: python from cleveragents import ReactiveCleverAgentsApp app = ReactiveCleverAgentsApp() app.load_configuration([Path("my_config.yaml")]) # Get a graph graph = app.langgraph_bridge.get_graph("my_workflow") # Execute with input result = await graph.execute({ "messages": [{"role": "user", "content": "Hello"}] }) # Access state state = graph.get_state() print(state.messages) **Advanced Patterns:** 1. **Iterative Refinement**: Use cycles to refine outputs until quality threshold is met 2. **Parallel Research**: Execute multiple research tasks in parallel and combine results 3. **Streaming Responses**: Process streaming LLM responses with real-time updates 4. **A/B Testing**: Route traffic between different graph variants for experimentation 5. **Error Recovery**: Implement retry policies and fallback paths for robustness **Best Practices:** 1. **State Design**: Keep state minimal and well-structured 2. **Checkpointing**: Enable for long-running or critical workflows 3. **Parallel Execution**: Mark independent nodes as parallel 4. **Error Handling**: Use retry policies and error edges 5. **Monitoring**: Connect state streams to monitoring systems **Integration with Existing Features:** - **Templates**: Use Jinja2/Mustache templates in prompts - **Tools**: Integrate existing CleverAgents tools in graphs - **Agents**: All agent types work seamlessly in graphs - **Streams**: Graphs can publish to and subscribe from streams - **Context**: Global context is available in all graph nodes **Troubleshooting:** *Graph Not Executing:* - Check edge definitions connect all nodes - Verify entry point exists - Ensure agents referenced in nodes are defined *State Not Persisting:* - Enable checkpointing in graph config - Specify checkpoint directory - Check filesystem permissions *Parallel Execution Issues:* - Mark nodes with ``parallel: true`` - Enable ``parallel_execution`` in graph config - Check for dependencies between parallel nodes *Stream Integration Problems:* - Ensure stream router has LangGraph bridge reference - Use correct operator names (graph_execute, state_update, etc.) - Verify graph exists before referencing in streams 🤖 Agent Types ============== **LLM Agents (via LangChain)** - OpenAI GPT models (GPT-4, GPT-4o, GPT-3.5-turbo, o1-preview, o1-mini) - Anthropic Claude models (Claude-3.5-Sonnet, Claude-3-Opus, Claude-3-Haiku) - Google Gemini models (Gemini-1.5-Pro, Gemini-1.5-Flash, Gemini-2.0-Flash) - All LangChain-supported LLM providers - Structured output with JSON schema enforcement - Memory management and conversation history - Built-in error handling and retry logic **Tool Agents** - Built-in tools: math, HTTP requests, file operations, JSON parsing - Shell command execution (with safety controls) - Custom tool integration **Reactive Processing** - Agents work as stream processors - Can be used in map operations or connect between streams - Support for both sync and async processing 📋 Quick Start ============== Installation ------------ .. code-block:: bash pip install cleveragents Basic Usage ----------- 1. **Create a configuration file** (``config.yaml``): .. code-block:: yaml agents: chat_agent: type: llm config: provider: openai model: gpt-4 temperature: 0.7 routes: chat_stream: type: stream # Required field stream_type: cold operators: - type: map params: agent: chat_agent publications: - __output__ merges: - sources: [__input__] target: chat_stream 2. **Run single-shot processing**: .. code-block:: bash cleveragents run -c config.yaml -p "Hello, how are you?" 3. **Start interactive session**: .. code-block:: bash cleveragents interactive -c config.yaml 🔧 API Key Configuration ======================== CleverAgents uses LangChain for LLM integration, supporting all LangChain-compatible providers. Configure API keys in two ways: 1. **In Agent Configuration**: .. code-block:: yaml agents: my_agent: type: llm config: provider: openai model: gpt-4 api_key: "sk-your-key-here" 2. **Via Environment Variables** (LangChain handles these automatically): - **OpenAI**: ``OPENAI_API_KEY`` - **Anthropic**: ``ANTHROPIC_API_KEY`` - **Google Gemini**: ``GOOGLE_API_KEY`` or ``GOOGLE_GEMINI_API_KEY`` - **Plus**: All other LangChain-supported provider environment variables **Supported Models:** - **OpenAI**: gpt-4, gpt-4o, gpt-3.5-turbo, o1-preview, o1-mini, and more - **Anthropic**: claude-3-5-sonnet-20241022, claude-3-opus-20240229, claude-3-haiku-20240307 - **Google**: gemini-1.5-pro, gemini-1.5-flash, gemini-2.0-flash-exp - **Plus**: Any model supported by LangChain providers 🌊 Stream Processing ==================== **Stream Types** .. code-block:: yaml routes: cold_stream: type: stream stream_type: cold # Starts when subscribed hot_stream: type: stream stream_type: hot # Always active, replays last value replay_stream: type: stream stream_type: replay # Replays all previous values **RxPy Operators** .. code-block:: yaml routes: processing_stream: type: stream stream_type: cold operators: - type: map params: agent: my_agent - type: filter params: condition: type: content_contains text: "important" - type: debounce params: duration: 1.0 - type: buffer params: count: 5 - type: throttle params: duration: 2.0 **Stream Operations** .. code-block:: yaml # Merge multiple streams merges: - sources: [stream1, stream2, stream3] target: combined_stream # Split stream based on conditions splits: - source: input_stream targets: questions: type: content_contains text: "?" commands: type: content_contains text: "execute" 🏗️ Advanced Patterns ==================== **Multi-Agent Collaboration** .. code-block:: yaml # Research → Analysis → Writing → Editing pipeline routes: research_stream: type: stream stream_type: cold operators: - type: map params: agent: researcher publications: - analysis_stream analysis_stream: type: stream stream_type: cold operators: - type: map params: agent: analyzer publications: - writing_stream **Real-time Analytics** .. code-block:: yaml routes: analytics_stream: type: stream stream_type: hot operators: - type: scan params: accumulator: type: collect - type: sample params: interval: 10.0 **Error Handling & Retry** .. code-block:: yaml routes: robust_processing: type: stream stream_type: cold operators: - type: map params: agent: my_agent - type: catch - type: retry params: count: 3 📊 Route Complexity Ladder ========================== This guide helps you choose the right route type for your use case. Routes in CleverAgents can be either **streams** (reactive, stateless) or **graphs** (stateful, conditional). Quick Decision Guide -------------------- Ask yourself these questions: 1. **Do you need to maintain state between messages?** → Use a graph 2. **Do you need conditional logic or branching?** → Use a graph 3. **Is your processing linear and stateless?** → Use a stream 4. **Do you need real-time reactive processing?** → Use a stream 5. **Unsure or requirements might change?** → Use a stream with bridge configuration Complexity Levels ----------------- **Level 1: Simple Stream** Use when: Basic data transformation with a single agent .. code-block:: yaml routes: simple_processor: type: stream stream_type: cold operators: - type: map params: agent: my_agent Good for: - Simple chat interfaces - Basic text transformation - Single-step processing **Level 2: Stream with Multiple Operators** Use when: Multi-step processing pipeline without state .. code-block:: yaml routes: pipeline: type: stream stream_type: cold operators: - type: debounce params: duration: 0.5 - type: map params: agent: preprocessor - type: filter params: condition: valid_input - type: map params: agent: main_processor Good for: - Data preprocessing pipelines - Multi-agent workflows (sequential) - Stream filtering and transformation **Level 3: Stream with Routing** Use when: Need to split/merge data flows .. code-block:: yaml routes: router_stream: type: stream stream_type: cold operators: - type: map params: agent: classifier publications: - questions_stream - commands_stream splits: - source: router_stream targets: questions: questions_stream commands: commands_stream Good for: - Content routing based on classification - Parallel processing paths - Fan-out patterns **Level 4: Basic Graph** Use when: Need conditional logic or simple state .. code-block:: yaml routes: conditional_workflow: type: graph nodes: classify: type: agent agent: classifier handle_a: type: agent agent: handler_a handle_b: type: agent agent: handler_b edges: - source: start target: classify - source: classify target: handle_a condition: "type == 'A'" - source: classify target: handle_b condition: "type == 'B'" Good for: - Workflows with decision points - Conditional processing - Simple state machines **Level 5: Stateful Graph** Use when: Need to maintain conversation or process state .. code-block:: yaml routes: stateful_workflow: type: graph checkpointing: true state_class: "myapp.states.ConversationState" nodes: update_context: type: function function: update_conversation_state process_with_context: type: agent agent: contextual_processor edges: - source: start target: update_context - source: update_context target: process_with_context Good for: - Conversational agents with memory - Multi-turn interactions - Complex state management **Level 6: Graph with Persistence** Use when: Need to save/restore state, handle failures .. code-block:: yaml routes: persistent_workflow: type: graph checkpointing: true checkpoint_dir: "./checkpoints" enable_time_travel: true nodes: # Complex node structure edges: # Complex conditional routing Good for: - Long-running workflows - Fault-tolerant processing - Auditable state changes Dynamic Type Conversion ----------------------- Use bridging when requirements might change: .. code-block:: yaml routes: adaptive_route: type: stream # Start simple stream_type: cold operators: - type: map params: agent: processor bridge: # Automatically upgrade to graph when needed upgrade_conditions: needs_state: true message_count: 10 # Downgrade back to stream when possible downgrade_conditions: idle_time: 300 state_size: 1 Best Practices -------------- 1. **Start Simple**: Begin with streams and upgrade to graphs only when needed 2. **Use Bridges**: Configure bridge conditions for routes that might need to change 3. **Monitor Complexity**: Use the RouteComplexityAnalyzer to track route complexity 4. **Document Decisions**: Explain why you chose a particular route type Performance Considerations -------------------------- **Streams:** - ✅ Lower memory footprint - ✅ Better for high-throughput scenarios - ✅ Excellent for real-time processing - ❌ No built-in state management - ❌ Limited conditional logic **Graphs:** - ✅ Rich state management - ✅ Complex conditional logic - ✅ Checkpointing and recovery - ❌ Higher memory usage - ❌ More complex to debug Migration Guide --------------- **From Streams to Graphs:** 1. Identify state requirements 2. Define state class 3. Convert operators to nodes 4. Add conditional edges **From Graphs to Streams:** 1. Ensure state can be flattened 2. Convert nodes to operators 3. Replace conditional edges with stream splits 4. Test thoroughly Choose the simplest route type that meets your requirements. Use streams for stateless, reactive processing and graphs for stateful, conditional workflows. Configure bridges for routes that might need to adapt over time. 📊 CLI Commands =============== **Run Commands** .. code-block:: bash # Single-shot processing cleveragents run -c config.yaml -p "Your prompt here" # Interactive session cleveragents interactive -c config.yaml # With verbose output cleveragents run -c config.yaml -p "Hello" --verbose # Unsafe mode (for file operations) cleveragents run -c config.yaml -p "Command" --unsafe **Utility Commands** .. code-block:: bash # Generate example configurations cleveragents generate-examples -o ./my-examples # Visualize stream network cleveragents visualize -c config.yaml -f mermaid -o diagram.md cleveragents visualize -c config.yaml -f dot -o network.dot cleveragents visualize -c config.yaml -f ascii 📚 Examples =========== The repository includes comprehensive examples: **RxPy Stream Examples:** - **basic_chat.yaml**: Simple conversational agent - **advanced_pipeline.yaml**: Complex multi-stage processing with RxPy operators - **multi_agent_collaboration.yaml**: Agent-to-agent communication - **stream_analytics.yaml**: Real-time monitoring and analytics **LangGraph Examples:** - **simple_langgraph.yaml**: Basic linear workflow with a single agent - **langgraph_conditional.yaml**: Conditional routing based on classification - **langgraph_stateful.yaml**: Stateful conversations with checkpointing and time travel - **hybrid_rxpy_langgraph.yaml**: Combine reactive streams with stateful graphs **Advanced LangGraph Examples:** - **advanced_langgraph_cycles.yaml**: Iterative refinement with cycles, parallel execution, and subgraphs - **advanced_multi_graph_system.yaml**: Multi-graph orchestration with stream-based routing - **advanced_streaming_langgraph.yaml**: Real-time streaming with partial results and windowed aggregation 🛡️ Safety & Security ==================== - **Safe Mode**: Blocks dangerous shell commands by default - **Unsafe Flag**: Required for file operations and risky commands - **API Key Protection**: Environment variable support - **Error Boundaries**: Graceful error handling and recovery 🔄 Stream Visualization ======================= Generate diagrams of your stream networks: **Mermaid Format**: .. code-block:: bash cleveragents visualize -c config.yaml -f mermaid **Graphviz DOT**: .. code-block:: bash cleveragents visualize -c config.yaml -f dot **ASCII Diagram**: .. code-block:: bash cleveragents visualize -c config.yaml -f ascii 🎯 Use Cases ============ - **Conversational AI**: Multi-stage chat processing - **Content Generation**: Research → Analysis → Writing workflows - **Data Processing**: ETL pipelines with LLM processing - **Decision Support**: Multi-agent decision making - **Real-time Analytics**: Stream monitoring and alerting - **API Orchestration**: Complex service integration 📋 Template System ================== CleverAgents includes a powerful template system for creating reusable, parameterizable components across agents, graphs, and streams. **Template Features** - **Parameter Support**: Define parameters with types, defaults, and descriptions - **Jinja2 Integration**: Full Jinja2 templating for dynamic content - **Component References**: Reference agents, graphs, and streams across templates - **Inheritance**: Build complex templates from simpler ones - **Type Safety**: Parameter validation and type conversion **Template Definition** .. code-block:: yaml templates: agents: my_template: type: llm parameters: model: description: "LLM model to use" type: string default: "gpt-3.5-turbo" temperature: description: "Temperature setting" type: number default: 0.7 config: provider: openai model: "{{ model }}" temperature: "{{ temperature }}" **Template Instantiation** .. code-block:: yaml agents: my_agent: template: my_template params: model: "gpt-4" temperature: 0.9 **Template Types** 1. **Agent Templates**: Create reusable agent configurations 2. **Graph Templates**: Define parameterizable LangGraph workflows 3. **Stream Templates**: Build configurable stream processing pipelines 4. **Composite Templates**: Combine multiple components into reusable units **Advanced Templating** .. code-block:: yaml templates: graphs: adaptive_workflow: parameters: stages: type: number default: 3 enable_validation: type: boolean default: true nodes: {% for i in range(stages) %} stage_{{ i }}: type: agent agent: processor_{{ i }} {% endfor %} {% if enable_validation %} validator: type: function function: validate {% endif %} edges: - source: start target: stage_0 {% for i in range(stages - 1) %} - source: stage_{{ i }} target: stage_{{ i + 1 }} {% endfor %} {% if enable_validation %} - source: stage_{{ stages - 1 }} target: validator - source: validator target: end {% else %} - source: stage_{{ stages - 1 }} target: end {% endif %} **Composite Agent Templates** .. code-block:: yaml templates: agents: research_assistant: type: composite parameters: research_depth: type: string default: "detailed" enum: ["quick", "standard", "detailed"] components: agents: researcher: type: llm config: model: gpt-4 system_prompt: | Research depth: {{ research_depth }} Provide {{ research_depth }} analysis. summarizer: type: llm config: model: gpt-3.5-turbo graphs: workflow: nodes: research: type: agent agent: researcher summarize: type: agent agent: summarizer edges: - source: start target: research - source: research target: summarize - source: summarize target: end routing: input: workflow output: workflow **Template Registry** .. code-block:: python from cleveragents.templates import TemplateRegistry # Access templates programmatically registry = app.template_registry # Get a template template = registry.get_template("agent", "my_template") # Instantiate with parameters agent_config = template.instantiate({ "model": "gpt-4", "temperature": 0.9 }) **Template Best Practices** 1. **Parameter Naming**: Use descriptive parameter names 2. **Default Values**: Provide sensible defaults for all parameters 3. **Type Constraints**: Define parameter types for validation 4. **Documentation**: Include descriptions for all parameters 5. **Modularity**: Build complex templates from simpler ones **Use Cases** 1. **Multi-Environment Configs**: Same template, different parameters for dev/prod 2. **A/B Testing**: Create variants with different parameter values 3. **Dynamic Workflows**: Generate workflows based on runtime parameters 4. **Agent Libraries**: Build reusable agent components for teams 5. **Configuration Management**: Centralize common patterns **Template Limitations** 1. **YAML Parsing**: Complex Jinja2 may require quoted strings 2. **Type Conversion**: Values rendered as strings need conversion 3. **Debugging**: Template errors may be harder to trace 🧪 Development ============== **Running Tests** .. code-block:: bash # Run all tests python -m pytest # Run with coverage python -m pytest --cov=cleveragents # Run BDD tests python -m behave # Run with tox for multiple environments tox **Contributing** 1. Fork the repository 2. Create a feature branch 3. Add tests for new functionality 4. Ensure all tests pass 5. Submit a pull request 📄 License ========== Apache License 2.0 🔗 Links ======== - **Documentation**: https://cleveragents.readthedocs.io/ - **PyPI**: https://pypi.org/project/cleveragents/ - **GitHub**: https://github.com/cleverthis/cleveragents - **Issues**: https://github.com/cleverthis/cleveragents/issues