Skip to content

How to Configure A2A Agents

Step-by-step guide to set up Agent-to-Agent (A2A) protocol for delegating tasks to external agents.

Prerequisites

  • Working ToolWeaver project
  • Basic understanding of Agent Delegation
  • External agent endpoint (or use sample agent for testing)

What You'll Accomplish

By the end of this guide, you'll have:

✅ Created agents.yaml configuration file
✅ Registered external agents with capabilities
✅ Implemented agent discovery
✅ Delegated tasks to agents
✅ Set up error handling and fallbacks

Estimated time: 20 minutes


Step 1: Create agents.yaml Configuration

1.1 Basic Configuration Structure

File: agents.yaml

# Global A2A settings
a2a:
  enabled: true
  default_timeout: 300  # 5 minutes
  discovery_cache_ttl: 3600  # Cache agent list for 1 hour

agents:
  # Agent 1: Data Analyst
  - agent_id: data_analyst
    name: Data Analysis Agent
    description: Analyzes datasets and provides statistical insights
    endpoint: http://localhost:8001/analyze
    protocol: http
    capabilities:
      - data_analysis
      - statistical_modeling
      - visualization

    # Input/output schema
    input_schema:
      type: object
      properties:
        dataset:
          type: string
          description: Path or ID of dataset to analyze
        metrics:
          type: array
          description: Metrics to compute (e.g., mean, median, variance)
      required: [dataset]

    output_schema:
      type: object
      properties:
        results:
          type: object
        summary:
          type: string

    # Cost and performance estimates
    cost_estimate: 0.05  # $0.05 per call
    latency_estimate: 120  # ~120 seconds

    # Metadata
    metadata:
      tags: [analytics, statistics]
      version: 1.0.0
      sla: 99.5

1.2 Multiple Agents Example

a2a:
  enabled: true
  default_timeout: 300

agents:
  # Agent 1: Data Analyst
  - agent_id: data_analyst
    name: Data Analysis Agent
    endpoint: http://localhost:8001/analyze
    capabilities: [data_analysis]
    cost_estimate: 0.05

  # Agent 2: Code Generator
  - agent_id: code_generator
    name: Code Generation Agent
    endpoint: http://localhost:8002/generate
    capabilities: [code_generation, testing]
    cost_estimate: 0.10

  # Agent 3: Legal Reviewer
  - agent_id: legal_reviewer
    name: Legal Document Reviewer
    endpoint: http://localhost:8003/review
    capabilities: [legal_analysis, compliance]
    cost_estimate: 0.20

Step 2: Environment Variable Configuration

2.1 Use Environment Variables for Production

agents.yaml (with env var placeholders):

agents:
  - agent_id: data_analyst
    # Use env var, fallback to localhost
    endpoint: ${DATA_ANALYST_ENDPOINT:-http://localhost:8001/analyze}
    # Authentication token from env
    auth_token: ${DATA_ANALYST_TOKEN}
    cost_estimate: ${DATA_ANALYST_COST:-0.05}

2.2 Set Environment Variables

Linux/macOS:

export DATA_ANALYST_ENDPOINT=https://prod-analyst.company.com/analyze
export DATA_ANALYST_TOKEN=sk-abc123xyz789
export DATA_ANALYST_COST=0.05

Windows PowerShell:

$env:DATA_ANALYST_ENDPOINT = "https://prod-analyst.company.com/analyze"
$env:DATA_ANALYST_TOKEN = "sk-abc123xyz789"
$env:DATA_ANALYST_COST = "0.05"

.env file:

# .env
DATA_ANALYST_ENDPOINT=https://prod-analyst.company.com/analyze
DATA_ANALYST_TOKEN=sk-abc123xyz789
DATA_ANALYST_COST=0.05

Load with python-dotenv:

from dotenv import load_dotenv
load_dotenv()

Step 3: Initialize A2A Client

3.1 Basic Client Setup

File: client/a2a_client.py

from orchestrator.a2a.client import A2AClient

# Load configuration from agents.yaml
client = A2AClient(config_path="agents.yaml")

# Or specify custom path
client = A2AClient(config_path="config/production_agents.yaml")

3.2 Programmatic Configuration (No YAML)

from orchestrator.a2a.client import A2AClient
from orchestrator.a2a.config import AgentConfig, A2AConfig

# Define agent configuration
data_analyst_config = AgentConfig(
    agent_id="data_analyst",
    name="Data Analysis Agent",
    endpoint="http://localhost:8001/analyze",
    capabilities=["data_analysis", "statistics"],
    cost_estimate=0.05,
    latency_estimate=120
)

# Create A2A config
a2a_config = A2AConfig(
    enabled=True,
    default_timeout=300,
    agents=[data_analyst_config]
)

# Initialize client with config object
client = A2AClient(config=a2a_config)

Step 4: Discover Available Agents

4.1 List All Agents

async def discover_agents():
    """Discover all configured agents."""

    client = A2AClient(config_path="agents.yaml")

    agents = await client.discover_agents()

    print(f"Found {len(agents)} agents:\n")

    for agent in agents:
        print(f"Agent: {agent.name}")
        print(f"  ID: {agent.agent_id}")
        print(f"  Endpoint: {agent.endpoint}")
        print(f"  Capabilities: {', '.join(agent.capabilities)}")
        print(f"  Cost: ${agent.cost_estimate}/call")
        print(f"  Latency: ~{agent.latency_estimate}s")
        print()

# Run discovery
await discover_agents()

Output:

Found 3 agents:

Agent: Data Analysis Agent
  ID: data_analyst
  Endpoint: http://localhost:8001/analyze
  Capabilities: data_analysis, statistical_modeling
  Cost: $0.05/call
  Latency: ~120s

Agent: Code Generation Agent
  ID: code_generator
  Endpoint: http://localhost:8002/generate
  Capabilities: code_generation, testing
  Cost: $0.10/call
  Latency: ~60s

4.2 Find Agent by Capability

async def find_agents_by_capability(capability: str):
    """Find agents with specific capability."""

    client = A2AClient(config_path="agents.yaml")
    agents = await client.discover_agents()

    matching = [a for a in agents if capability in a.capabilities]

    if not matching:
        print(f"No agents found with capability: {capability}")
        return None

    # Return cheapest agent
    cheapest = min(matching, key=lambda a: a.cost_estimate)
    print(f"Found {len(matching)} agents, cheapest: {cheapest.name} (${cheapest.cost_estimate})")
    return cheapest

# Usage
agent = await find_agents_by_capability("data_analysis")

Step 5: Delegate Tasks to Agents

5.1 Basic Delegation

async def delegate_analysis(dataset_path: str):
    """Delegate data analysis to external agent."""

    client = A2AClient(config_path="agents.yaml")

    result = await client.delegate(
        agent_id="data_analyst",
        request={
            "dataset": dataset_path,
            "metrics": ["mean", "median", "std_dev"]
        }
    )

    return result

# Usage
result = await delegate_analysis("sales_2024.csv")
print(result)

5.2 Delegation with Idempotency Key

async def delegate_with_idempotency(dataset_path: str):
    """Delegate with idempotency key to prevent duplicate work."""

    client = A2AClient(config_path="agents.yaml")

    # Generate idempotency key
    idempotency_key = f"analysis-{dataset_path}-v1"

    result = await client.delegate(
        agent_id="data_analyst",
        request={"dataset": dataset_path},
        idempotency_key=idempotency_key,  # Agent will cache this
        timeout=300
    )

    return result

5.3 Streaming Delegation

async def delegate_streaming(specification: str):
    """Stream results from agent for long-running tasks."""

    client = A2AClient(config_path="agents.yaml")

    print("Generating code...")

    async for chunk in client.delegate_streaming(
        agent_id="code_generator",
        request={
            "specification": specification,
            "language": "python",
            "include_tests": True
        }
    ):
        print(chunk, end="", flush=True)

    print("\n✓ Code generation complete")

# Usage
await delegate_streaming("REST API for user management")

Step 6: Add Authentication

6.1 Bearer Token Authentication

agents.yaml:

agents:
  - agent_id: secure_agent
    endpoint: https://api.company.com/analyze
    auth:
      type: bearer
      token: ${AGENT_AUTH_TOKEN}  # From environment variable

Python code:

import os

# Set token in environment
os.environ["AGENT_AUTH_TOKEN"] = "sk-abc123xyz789"

# Client will automatically use token from config
client = A2AClient(config_path="agents.yaml")
result = await client.delegate(agent_id="secure_agent", request={...})

6.2 API Key Authentication

agents.yaml:

agents:
  - agent_id: secure_agent
    endpoint: https://api.company.com/analyze
    auth:
      type: api_key
      header: X-API-Key
      key: ${API_KEY}

6.3 Custom Headers

agents.yaml:

agents:
  - agent_id: custom_agent
    endpoint: https://api.company.com/analyze
    headers:
      Authorization: Bearer ${AUTH_TOKEN}
      X-Client-ID: ${CLIENT_ID}
      X-Request-Origin: ToolWeaver

Step 7: Configure Fallback Endpoints

7.1 Multiple Endpoints with Priority

agents.yaml:

agents:
  - agent_id: data_analyst
    name: Data Analysis Agent

    # Primary endpoint
    endpoints:
      - url: https://primary.company.com/analyze
        priority: 1
        region: us-east-1

      # Fallback endpoint
      - url: https://backup.company.com/analyze
        priority: 2
        region: us-west-2

      # Local fallback
      - url: http://localhost:8001/analyze
        priority: 3
        region: local

    fallback_strategy: sequential  # Try in order of priority

7.2 Implement Fallback Logic

async def delegate_with_fallback(dataset: str):
    """Delegate with automatic fallback to alternative endpoints."""

    client = A2AClient(config_path="agents.yaml")

    try:
        result = await client.delegate(
            agent_id="data_analyst",
            request={"dataset": dataset},
            enable_fallback=True  # Try fallback endpoints on failure
        )
        return result

    except Exception as e:
        print(f"All endpoints failed: {e}")
        # Final fallback: local tool
        return await execute_local_analysis(dataset)

Step 8: Error Handling

8.1 Handle Common Errors

from orchestrator.a2a.client import (
    A2ATimeoutError,
    A2AConnectionError,
    A2AValidationError
)

async def delegate_with_error_handling(dataset: str):
    """Delegate with comprehensive error handling."""

    client = A2AClient(config_path="agents.yaml")

    try:
        result = await client.delegate(
            agent_id="data_analyst",
            request={"dataset": dataset},
            timeout=300
        )
        return result

    except A2ATimeoutError:
        print("Agent took too long, trying faster alternative...")
        return await client.delegate(
            agent_id="fast_analyst",
            request={"dataset": dataset},
            timeout=60
        )

    except A2AConnectionError:
        print("Agent unavailable, using local tool...")
        return await execute_local_tool("analysis", {"dataset": dataset})

    except A2AValidationError as e:
        print(f"Invalid request: {e}")
        raise ValueError(f"Dataset format invalid: {dataset}")

8.2 Retry Logic for Transient Failures

async def delegate_with_retry(dataset: str, max_retries: int = 3):
    """Delegate with retry logic for transient failures."""

    client = A2AClient(config_path="agents.yaml")

    for attempt in range(max_retries):
        try:
            result = await client.delegate(
                agent_id="data_analyst",
                request={"dataset": dataset}
            )
            return result

        except A2AConnectionError as e:
            if attempt == max_retries - 1:
                raise

            delay = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
            print(f"Retry {attempt + 1} after {delay}s...")
            await asyncio.sleep(delay)

Step 9: Monitor Agent Performance

9.1 Track Delegation Metrics

from orchestrator.monitoring.metrics import MetricsCollector

metrics = MetricsCollector()

async def delegate_with_monitoring(agent_id: str, request: dict):
    """Delegate with performance monitoring."""

    client = A2AClient(config_path="agents.yaml")

    start_time = time.time()

    try:
        result = await client.delegate(agent_id=agent_id, request=request)

        # Log success
        metrics.record({
            "agent_id": agent_id,
            "status": "success",
            "latency": time.time() - start_time,
            "cost": result.get("cost", 0)
        })

        return result

    except Exception as e:
        # Log failure
        metrics.record({
            "agent_id": agent_id,
            "status": "failed",
            "latency": time.time() - start_time,
            "error": str(e)
        })
        raise

# Analyze metrics
print(f"Agent 'data_analyst':")
print(f"  Avg latency: {metrics.avg_latency('data_analyst'):.1f}s")
print(f"  Success rate: {metrics.success_rate('data_analyst'):.1%}")
print(f"  Total cost: ${metrics.total_cost('data_analyst'):.2f}")

Step 10: Real-World Example

Complete A2A setup for multi-agent pipeline.

File: pipeline/financial_analysis.py

from orchestrator.a2a.client import A2AClient

async def analyze_company_financials(company_id: str):
    """Multi-agent financial analysis pipeline."""

    client = A2AClient(config_path="agents.yaml")

    # Step 1: Extract data (agent)
    print("Step 1: Extracting financial data...")
    raw_data = await client.delegate(
        agent_id="data_extractor",
        request={
            "company_id": company_id,
            "sources": ["sec_filings", "earnings_calls"]
        },
        idempotency_key=f"extract-{company_id}",
        timeout=300
    )

    # Step 2: Financial analysis (agent)
    print("Step 2: Analyzing financials...")
    financial_analysis = await client.delegate(
        agent_id="financial_analyst",
        request={
            "data": raw_data,
            "metrics": ["revenue_growth", "profit_margins", "debt_ratio"]
        },
        idempotency_key=f"analyze-{company_id}",
        timeout=180
    )

    # Step 3: Risk assessment (agent)
    print("Step 3: Assessing risks...")
    risk_analysis = await client.delegate(
        agent_id="risk_analyst",
        request={
            "company_id": company_id,
            "financials": financial_analysis
        },
        idempotency_key=f"risk-{company_id}",
        timeout=120
    )

    # Step 4: Generate report (local tool - deterministic)
    print("Step 4: Generating report...")
    report = await execute_tool(
        "format_report",
        {
            "financials": financial_analysis,
            "risks": risk_analysis,
            "template": "executive_summary"
        }
    )

    return report

# Usage
report = await analyze_company_financials("AAPL")
print(report)

Verification

Test your A2A setup:

async def verify_a2a_setup():
    """Verify A2A configuration is working."""

    client = A2AClient(config_path="agents.yaml")

    # Test 1: Discovery
    print("Test 1: Agent discovery...")
    agents = await client.discover_agents()
    assert len(agents) > 0, "No agents discovered"
    print(f"✓ Discovered {len(agents)} agents")

    # Test 2: Basic delegation (using echo agent for testing)
    print("\nTest 2: Basic delegation...")
    try:
        result = await client.delegate(
            agent_id="data_analyst",
            request={"test": "data"},
            timeout=30
        )
        print("✓ Delegation successful")
    except Exception as e:
        print(f"✗ Delegation failed: {e}")

    # Test 3: Idempotency
    print("\nTest 3: Idempotency...")
    key = "test-idempotency-123"

    result1 = await client.delegate(
        agent_id="data_analyst",
        request={"test": "data"},
        idempotency_key=key
    )

    result2 = await client.delegate(
        agent_id="data_analyst",
        request={"test": "data"},
        idempotency_key=key
    )

    assert result1 == result2, "Idempotency not working"
    print("✓ Idempotency working")

    print("\n✅ All checks passed!")

# Run verification
await verify_a2a_setup()

Common Issues

Issue 1: Agent Not Found

Symptom: AgentNotFoundError: Agent 'data_analyst' not found

Solution: Check agent_id in agents.yaml matches code

# Verify agent_id in config
cat agents.yaml | grep agent_id

Issue 2: Connection Refused

Symptom: A2AConnectionError: Connection refused to http://localhost:8001

Solution: Verify agent endpoint is running

# Check if agent is running
curl http://localhost:8001/health

# Or check with telnet
telnet localhost 8001

Issue 3: Authentication Failed

Symptom: 401 Unauthorized

Solution: Verify auth token is set correctly

# Debug: Print auth token (NEVER in production!)
import os
print(f"Token: {os.getenv('DATA_ANALYST_TOKEN')}")

# Verify token in config
client = A2AClient(config_path="agents.yaml")
agent = await client.get_agent("data_analyst")
print(f"Auth config: {agent.auth}")

Next Steps