Skip to content

How to Aggregate Agent Results

Step-by-step guide to combine results from multiple parallel agents for consensus, voting, or best-answer selection.

Prerequisites

What You'll Accomplish

By the end of this guide, you'll have:

✅ Parallel agent execution
✅ Result aggregation strategies (consensus, voting, best-of-N)
✅ Conflict resolution logic
✅ Quality scoring for agent outputs
✅ Complete multi-agent coordination pipeline

Estimated time: 25 minutes


Step 1: Execute Agents in Parallel

1.1 Basic Parallel Execution

import asyncio
from orchestrator.a2a.client import A2AClient

async def execute_agents_parallel(task: dict, agent_ids: list):
    """Execute multiple agents in parallel on same task."""

    client = A2AClient(config_path="agents.yaml")

    # Create tasks for all agents
    tasks = [
        client.delegate(
            agent_id=agent_id,
            request=task,
            idempotency_key=f"{agent_id}-{task['id']}"
        )
        for agent_id in agent_ids
    ]

    # Execute in parallel
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return results

# Usage
task = {"query": "Analyze Q4 revenue trends", "id": "task-123"}
agent_ids = ["analyst_1", "analyst_2", "analyst_3"]

results = await execute_agents_parallel(task, agent_ids)

Step 2: Consensus Aggregation

2.1 Majority Voting

from collections import Counter

def aggregate_by_voting(results: list, extract_answer_fn):
    """Select answer by majority vote."""

    # Extract answers from results
    answers = [extract_answer_fn(r) for r in results if not isinstance(r, Exception)]

    if not answers:
        raise ValueError("No valid answers from agents")

    # Count votes
    vote_counts = Counter(answers)

    # Get majority answer
    winner, votes = vote_counts.most_common(1)[0]
    confidence = votes / len(answers)

    return {
        "answer": winner,
        "confidence": confidence,
        "votes": dict(vote_counts),
        "total_agents": len(answers)
    }

# Usage
def extract_risk_level(result):
    \"\"\"Extract risk level from agent response.\"\"\"
    return result.get("risk_level", "unknown")

# Get consensus
consensus = aggregate_by_voting(results, extract_risk_level)
print(f"Consensus: {consensus['answer']} ({consensus['confidence']:.0%} agreement)")
# Output: Consensus: medium (67% agreement)

2.2 Weighted Voting

def aggregate_by_weighted_voting(results: list, agent_weights: dict):
    """Weight votes by agent reliability."""

    vote_scores = {}

    for agent_id, result in zip(agent_ids, results):
        if isinstance(result, Exception):
            continue

        answer = result.get("risk_level")
        weight = agent_weights.get(agent_id, 1.0)

        vote_scores[answer] = vote_scores.get(answer, 0.0) + weight

    # Select answer with highest weighted score
    winner = max(vote_scores, key=vote_scores.get)
    total_weight = sum(vote_scores.values())
    confidence = vote_scores[winner] / total_weight

    return {
        "answer": winner,
        "confidence": confidence,
        "weighted_scores": vote_scores
    }

# Usage with agent reliability weights
agent_weights = {
    "analyst_1": 1.5,  # 50% more weight (more reliable)
    "analyst_2": 1.0,  # Standard weight
    "analyst_3": 0.8   # 20% less weight (less reliable)
}

consensus = aggregate_by_weighted_voting(results, agent_weights)

Step 3: Best-of-N Selection

3.1 Score-Based Selection

def select_best_result(results: list, scoring_fn):
    """Select best result based on quality score."""

    scored_results = []

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            continue

        score = scoring_fn(result)
        scored_results.append({
            "agent_index": i,
            "result": result,
            "score": score
        })

    if not scored_results:
        raise ValueError("No valid results to score")

    # Sort by score (descending)
    scored_results.sort(key=lambda x: x["score"], reverse=True)

    return scored_results[0]

# Usage
def score_analysis(result: dict) -> float:
    \"\"\"Score analysis based on completeness and confidence.\"\"\"

    score = 0.0

    # Completeness (0-50 points)
    if "summary" in result:
        score += 20
    if "recommendations" in result:
        score += 20
    if "data_points" in result:
        score += 10

    # Confidence (0-50 points)
    confidence = result.get("confidence", 0.0)
    score += confidence * 50

    return score

best = select_best_result(results, score_analysis)
print(f"Best result from agent {best['agent_index']} (score: {best['score']:.1f})")

3.2 Multi-Criteria Selection

def select_best_multicriteria(results: list, criteria_weights: dict):
    """Select best result using multiple weighted criteria."""

    def score_result(result: dict) -> float:
        total_score = 0.0

        # Criterion 1: Completeness
        completeness = len(result.get("sections", [])) / 5.0  # Normalize to 0-1
        total_score += completeness * criteria_weights.get("completeness", 0.3)

        # Criterion 2: Confidence
        confidence = result.get("confidence", 0.0)
        total_score += confidence * criteria_weights.get("confidence", 0.3)

        # Criterion 3: Detail level
        detail = len(result.get("details", "")) / 1000.0  # Normalize
        total_score += min(detail, 1.0) * criteria_weights.get("detail", 0.2)

        # Criterion 4: Response time (favor faster)
        latency = result.get("latency", 10.0)
        speed_score = max(0, 1.0 - (latency / 30.0))  # 30s = 0 score
        total_score += speed_score * criteria_weights.get("speed", 0.2)

        return total_score

    best = select_best_result(results, score_result)
    return best

# Usage
criteria_weights = {
    "completeness": 0.4,  # 40%
    "confidence": 0.3,    # 30%
    "detail": 0.2,        # 20%
    "speed": 0.1          # 10%
}

best = select_best_multicriteria(results, criteria_weights)

Step 4: Ensemble Aggregation

4.1 Average Numerical Results

import statistics

def aggregate_numerical_results(results: list, field: str):
    """Average numerical predictions from multiple agents."""

    values = []

    for result in results:
        if isinstance(result, Exception):
            continue

        value = result.get(field)
        if value is not None and isinstance(value, (int, float)):
            values.append(value)

    if not values:
        raise ValueError(f"No valid numerical values for field '{field}'")

    return {
        "mean": statistics.mean(values),
        "median": statistics.median(values),
        "stdev": statistics.stdev(values) if len(values) > 1 else 0,
        "min": min(values),
        "max": max(values),
        "count": len(values),
        "individual_values": values
    }

# Usage - aggregate revenue predictions
task = {"query": "Predict Q1 2026 revenue"}
results = await execute_agents_parallel(task, ["forecaster_1", "forecaster_2", "forecaster_3"])

aggregated = aggregate_numerical_results(results, "predicted_revenue")
print(f"Revenue prediction: ${aggregated['mean']:,.0f} ± ${aggregated['stdev']:,.0f}")
print(f"Range: ${aggregated['min']:,.0f} - ${aggregated['max']:,.0f}")

4.2 Merge Text Responses

def merge_text_responses(results: list, max_length: int = 500):
    """Merge text responses from multiple agents."""

    texts = []

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            continue

        text = result.get("response", "")
        if text:
            texts.append(f"Agent {i+1}: {text}")

    # Combine with separators
    merged = "\n\n".join(texts)

    # Truncate if too long
    if len(merged) > max_length:
        merged = merged[:max_length] + "..."

    return {
        "merged_text": merged,
        "source_count": len(texts),
        "total_length": len(merged)
    }

# Usage
merged = merge_text_responses(results, max_length=1000)
print(merged["merged_text"])

Step 5: Conflict Resolution

5.1 Detect Conflicts

def detect_conflicts(results: list, extract_answer_fn):
    """Detect when agents disagree significantly."""

    answers = [extract_answer_fn(r) for r in results if not isinstance(r, Exception)]

    if not answers:
        return {"has_conflict": False}

    # Count unique answers
    unique_answers = set(answers)

    # Check if unanimous
    if len(unique_answers) == 1:
        return {
            "has_conflict": False,
            "unanimous": True,
            "answer": list(unique_answers)[0]
        }

    # Check vote distribution
    vote_counts = Counter(answers)
    winner, winner_votes = vote_counts.most_common(1)[0]
    confidence = winner_votes / len(answers)

    # Flag as conflict if no clear majority (< 60%)
    has_conflict = confidence < 0.6

    return {
        "has_conflict": has_conflict,
        "unanimous": False,
        "confidence": confidence,
        "winner": winner,
        "vote_distribution": dict(vote_counts),
        "conflict_severity": 1.0 - confidence
    }

# Usage
conflict_info = detect_conflicts(results, extract_risk_level)

if conflict_info["has_conflict"]:
    print(f"⚠️  Conflict detected (severity: {conflict_info['conflict_severity']:.0%})")
    print(f"Vote distribution: {conflict_info['vote_distribution']}")

5.2 Resolve with Tiebreaker Agent

async def resolve_conflict_with_tiebreaker(results: list, tiebreaker_agent: str):
    """Use a tiebreaker agent when results conflict."""

    # Check for conflict
    conflict_info = detect_conflicts(results, extract_risk_level)

    if not conflict_info["has_conflict"]:
        # No conflict, return winner
        return conflict_info["winner"]

    # Call tiebreaker agent with all results
    client = A2AClient(config_path="agents.yaml")

    tiebreaker_result = await client.delegate(
        agent_id=tiebreaker_agent,
        request={
            "task": "resolve_conflict",
            "agent_results": [r for r in results if not isinstance(r, Exception)],
            "conflict_info": conflict_info
        }
    )

    return tiebreaker_result.get("resolution")

# Usage
if conflict_info["has_conflict"]:
    resolution = await resolve_conflict_with_tiebreaker(results, "senior_analyst")
    print(f"Tiebreaker resolution: {resolution}")

Step 6: Real-World Example

Complete multi-agent analysis with aggregation.

File: pipeline/multi_agent_analysis.py

import asyncio
from orchestrator.a2a.client import A2AClient

class MultiAgentAnalyzer:
    def __init__(self):
        self.client = A2AClient(config_path="agents.yaml")
        self.agent_weights = {
            "analyst_senior": 1.5,
            "analyst_mid": 1.0,
            "analyst_junior": 0.7
        }

    async def analyze_with_consensus(self, company_id: str):
        """Analyze company using multiple agents, aggregate results."""

        # Step 1: Parallel agent execution
        print("Step 1: Executing parallel analysis...")

        agents = ["analyst_senior", "analyst_mid", "analyst_junior"]
        task = {
            "company_id": company_id,
            "analysis_type": "financial_health"
        }

        results = await asyncio.gather(*[
            self.client.delegate(
                agent_id=agent_id,
                request=task,
                idempotency_key=f"{agent_id}-{company_id}"
            )
            for agent_id in agents
        ], return_exceptions=True)

        # Step 2: Check for conflicts
        print("Step 2: Checking for conflicts...")

        conflict_info = detect_conflicts(
            results,
            lambda r: r.get("risk_rating")
        )

        if conflict_info["has_conflict"]:
            print(f"⚠️  Conflict detected ({conflict_info['conflict_severity']:.0%} disagreement)")

            # Use weighted voting for resolution
            consensus = aggregate_by_weighted_voting(results, self.agent_weights)
        else:
            print("✓ Agents in agreement")
            consensus = {"answer": conflict_info["winner"], "confidence": 1.0}

        # Step 3: Aggregate numerical predictions
        print("Step 3: Aggregating predictions...")

        revenue_pred = aggregate_numerical_results(results, "revenue_forecast")

        # Step 4: Select best detailed analysis
        print("Step 4: Selecting best detailed analysis...")

        def score_detail(result):
            return (
                len(result.get("recommendations", [])) * 10 +
                result.get("confidence", 0) * 50
            )

        best_detail = select_best_result(results, score_detail)

        # Step 5: Compile final report
        return {
            "risk_rating": consensus["answer"],
            "risk_confidence": consensus["confidence"],
            "revenue_forecast": {
                "mean": revenue_pred["mean"],
                "range": (revenue_pred["min"], revenue_pred["max"])
            },
            "detailed_analysis": best_detail["result"],
            "conflict_detected": conflict_info["has_conflict"]
        }

# Usage
analyzer = MultiAgentAnalyzer()
report = await analyzer.analyze_with_consensus("AAPL")

print(f"Risk Rating: {report['risk_rating']} ({report['risk_confidence']:.0%} confidence)")
print(f"Revenue Forecast: ${report['revenue_forecast']['mean']:,.0f}")

Verification

Test your aggregation logic:

async def verify_aggregation():
    """Verify aggregation strategies."""

    # Mock results
    results = [
        {"risk_level": "medium", "confidence": 0.85, "revenue_forecast": 1000000},
        {"risk_level": "medium", "confidence": 0.90, "revenue_forecast": 1100000},
        {"risk_level": "low", "confidence": 0.75, "revenue_forecast": 950000}
    ]

    # Test 1: Voting
    consensus = aggregate_by_voting(results, lambda r: r["risk_level"])
    assert consensus["answer"] == "medium", "Voting failed"
    assert consensus["confidence"] == 2/3, "Confidence calculation failed"
    print("✓ Voting aggregation working")

    # Test 2: Numerical aggregation
    agg = aggregate_numerical_results(results, "revenue_forecast")
    assert 950000 <= agg["mean"] <= 1100000, "Mean out of range"
    print("✓ Numerical aggregation working")

    # Test 3: Best selection
    best = select_best_result(results, lambda r: r["confidence"])
    assert best["result"]["confidence"] == 0.90, "Best selection failed"
    print("✓ Best-of-N selection working")

    print("\n✅ All checks passed!")

await verify_aggregation()

Common Issues

Issue 1: All Agents Fail

Symptom: No valid results to aggregate

Solution: Implement graceful degradation

results = await execute_agents_parallel(task, agent_ids)
valid_results = [r for r in results if not isinstance(r, Exception)]

if not valid_results:
    # Fallback to single reliable agent
    return await client.delegate(agent_id="fallback_agent", request=task)

Issue 2: Tie in Voting

Symptom: Equal votes for multiple answers

Solution: Use weighted voting or tiebreaker

vote_counts = Counter(answers)
top_votes = vote_counts.most_common(2)

if len(top_votes) > 1 and top_votes[0][1] == top_votes[1][1]:
    # Tie detected, use tiebreaker
    return await resolve_conflict_with_tiebreaker(results, "tiebreaker_agent")

Issue 3: Outlier Results

Symptom: One agent returns wildly different value

Solution: Filter outliers using standard deviation

import statistics

def filter_outliers(values: list, std_threshold: float = 2.0):
    \"\"\"Remove values more than N standard deviations from mean.\"\"\"

    if len(values) < 3:
        return values

    mean = statistics.mean(values)
    stdev = statistics.stdev(values)

    filtered = [
        v for v in values
        if abs(v - mean) <= std_threshold * stdev
    ]

    return filtered if filtered else values  # Return original if all filtered

Next Steps