Shared Memory

Zero-config distributed state management for multi-agent systems

Shared Memory

Zero-config distributed state with automatic scoping

When multiple agents coordinate across process boundaries, they need to share state. Agent A analyzes a customer ticket. Agent B needs that analysis to decide on escalation. Agent C monitors sentiment trends. Agent D generates reports.

Traditional approach: you build the state coordination yourself. Set up Redis or a shared database. Write synchronization logic. Handle race conditions. Implement pub/sub for reactive patterns. Manage memory lifecycle and cleanup.

Agentfield treats memory as infrastructure. Agents share state automatically, with zero configuration. Write in one agent, read in another. It just works.

What You'd Otherwise Build

Traditional State Sharing

What you build:

  • Redis or shared database setup
  • Key naming conventions (manual namespacing)
  • Synchronization logic
  • Pub/sub for reactive patterns
  • Lifecycle management and cleanup
  • Consistency handling
  • Race condition management

Then you write business logic.

Agentfield State Sharing

What you write:

await app.memory.set("key", value)  # Agent A
value = await app.memory.get("key")  # Agent B

Agentfield provides:

  • ✓ Zero-config distributed state
  • ✓ Hierarchical scoping
  • ✓ Real-time change events
  • ✓ Automatic cleanup
  • ✓ Consistency guarantees
  • ✓ Durable storage
  • ✓ Full observability

The State Coordination Problem

Here's what breaks in production multi-agent systems:

Scenario: Customer support workflow with 5 agents:

  1. Triage Agent analyzes ticket sentiment
  2. History Agent checks past issues
  3. Routing Agent decides which team handles it
  4. Escalation Agent creates high-priority cases
  5. Analytics Agent tracks patterns

Each agent needs context from the others. How do they share state?

Traditional approach requires you to:

  • Set up shared database (Redis, PostgreSQL, MongoDB)
  • Design key naming conventions (manual namespacing)
  • Write synchronization code (handle concurrent writes)
  • Implement pub/sub (notify agents of changes)
  • Manage cleanup (prevent memory leaks)
  • Handle failures (what if Redis is down?)

Agentfield's approach: await app.memory.set("key", value) in one agent, await app.memory.get("key") in another. The control plane handles everything else.

How It Works: Automatic State Sharing

Memory in Agentfield works across distributed agent nodes without setup:

# Agent 1: Customer Triage (Team A, Server 1)
from agentfield import Agent
from pydantic import BaseModel

app = Agent("customer-triage")

class SentimentAnalysis(BaseModel):
    mood: str  # "frustrated", "angry", "neutral", "happy"
    urgency: str  # "low", "medium", "high", "critical"
    confidence: float

@app.reasoner()
async def analyze_ticket(ticket_id: str, message: str, customer_id: str):
    """
    Analyzes customer ticket and stores context for other agents.
    Memory is automatically shared across the workflow.
    """

    # AI analyzes sentiment
    sentiment = await app.ai(
        system="You are a customer sentiment expert.",
        user=f"Analyze this support ticket:\n{message}",
        schema=SentimentAnalysis
    )

    # Store in shared memory (other agents can access this)
    await app.memory.set(f"ticket_{ticket_id}_sentiment", {
        "mood": sentiment.mood,
        "urgency": sentiment.urgency,
        "confidence": sentiment.confidence,
        "analyzed_at": datetime.now().isoformat()
    })

    # Also store customer context
    await app.memory.set(f"customer_{customer_id}_last_interaction", {
        "ticket_id": ticket_id,
        "sentiment": sentiment.mood,
        "timestamp": datetime.now().isoformat()
    })

    app.note(f"📊 Sentiment: {sentiment.mood}, Urgency: {sentiment.urgency}", tags=["analysis"])

    return sentiment.dict()
# Agent 2: Escalation Handler (Team B, Server 2, different codebase)
from agentfield import Agent

app = Agent("escalation-handler")

@app.reasoner()
async def check_escalation(ticket_id: str, customer_id: str):
    """
    Decides if escalation is needed.
    Accesses memory set by triage agent automatically.
    """

    # Read sentiment from memory (set by triage agent)
    sentiment = await app.memory.get(f"ticket_{ticket_id}_sentiment")

    # Read customer history
    last_interaction = await app.memory.get(f"customer_{customer_id}_last_interaction")

    # AI decides escalation strategy
    if sentiment and sentiment["urgency"] in ["high", "critical"]:
        strategy = await app.ai(
            system="You determine escalation strategies.",
            user=f"""
            Ticket sentiment: {sentiment['mood']}
            Urgency: {sentiment['urgency']}
            Last interaction: {last_interaction}
            """,
            schema=EscalationStrategy
        )

        if strategy.escalate:
            # Create escalation case
            case = await create_escalation_case(ticket_id, strategy)

            # Store escalation decision in memory
            await app.memory.set(f"ticket_{ticket_id}_escalated", {
                "case_id": case.id,
                "strategy": strategy.dict(),
                "escalated_at": datetime.now().isoformat()
            })

            app.note(f"⚠️ Escalated to case {case.id}", tags=["escalation"])

            return {"escalated": True, "case_id": case.id}

    return {"escalated": False}

What just happened?

  1. Triage Agent (Server 1) analyzed sentiment and stored it in memory
  2. Escalation Agent (Server 2, different team) read that sentiment
  3. No configuration. No shared database setup. No manual synchronization.
  4. Agentfield's control plane handled the state coordination automatically

Hierarchical Memory Scoping

Agentfield organizes memory in a hierarchy that mirrors how production systems actually work:

Global Scope          ← System-wide configuration

Actor Scope          ← Agent-private state

Session Scope        ← User session data

Workflow Scope       ← Task-specific state (most specific)

When you call app.memory.get("key"), Agentfield searches scopes in order: workflow → session → actor → global. First match wins.

Workflow Scope (Most Common)

Shared across all agents in a single task

@app.reasoner()
async def process_order(order_id: str):
    # Store in workflow scope (default)
    await app.memory.set(f"order_{order_id}_status", "processing")

    # Call another agent (same workflow)
    inventory = await app.call("inventory-agent.check_stock", order_id=order_id)

    # That agent can read the status
    # status = await app.memory.get(f"order_{order_id}_status")  # Gets "processing"

Use for:

  • Multi-step workflows (order processing, document analysis)
  • Agent coordination within a task
  • Temporary state that expires with the workflow

Session Scope

Shared across multiple workflows for one user

@app.reasoner()
async def handle_user_request(user_id: str, request: str):
    # Store in session scope
    await app.memory.set(
        f"user_{user_id}_preferences",
        {"language": "en", "timezone": "UTC"},
        scope="session"
    )

    # Available across all workflows for this user

Use for:

  • User preferences and settings
  • Conversation history
  • Session-specific context

Actor Scope

Private to a specific agent instance

@app.reasoner()
async def process_batch(batch_id: str):
    # Store in actor scope (private to this agent)
    await app.memory.set(
        "processing_queue",
        {"current_batch": batch_id, "items_processed": 0},
        scope="actor"
    )

    # Other agents can't see this

Use for:

  • Agent's internal state
  • Processing queues
  • Cache and optimization data

Global Scope

System-wide configuration

@app.reasoner()
async def check_feature_flag(feature: str):
    # Read from global scope
    flags = await app.memory.get("feature_flags", scope="global")

    if flags and flags.get(feature):
        # Feature is enabled system-wide
        return True

Use for:

  • Feature flags
  • System configuration
  • Shared datasets

Reactive Patterns with Memory Events

Agentfield publishes real-time events for all memory changes. Build agents that react to state changes automatically.

# Agent A: Monitoring System
from agentfield import Agent

app = Agent("system-monitor")

@app.reasoner()
async def check_system_health(metrics: dict):
    """Monitors system and sets state when issues detected."""

    if metrics['error_rate'] > 0.05:
        # Set state in memory
        await app.memory.set("system_status", "degraded")
        await app.memory.set("error_details", {
            'error_rate': metrics['error_rate'],
            'timestamp': metrics['timestamp'],
            'affected_services': metrics['services']
        })

        app.note("⚠️ System degraded - error rate exceeded threshold", tags=["alert"])
# Agent B: Incident Response (different process, different team)
from agentfield import Agent

app = Agent("incident-response")

@app.memory.on_change("system_status")
async def handle_system_status_change(event):
    """
    Reacts to system status changes automatically.
    Triggered when any agent writes to "system_status".
    """

    if event.data == "degraded":
        # Get error details from memory
        details = await app.memory.get("error_details")

        # AI determines incident severity and response
        assessment = await app.ai(
            system="You assess incident severity and recommend actions.",
            user=f"System degraded. Error rate: {details['error_rate']}. Affected: {details['affected_services']}",
            schema=IncidentAssessment
        )

        # Create incident automatically
        incident = await app.call(
            "incident-management.create_incident",
            severity=assessment.severity,
            details=details,
            recommended_actions=assessment.actions
        )

        # Notify on-call team
        await app.call(
            "notification-agent.page_oncall",
            incident_id=incident['id'],
            severity=assessment.severity
        )

        app.note(f"🚨 Incident {incident['id']} created, on-call notified", tags=["incident"])

Pattern matching:

# React to multiple patterns
@app.memory.on_change(["customer_*_sentiment", "customer_*_escalated"])
async def track_customer_events(event):
    """Reacts to any customer sentiment or escalation change."""
    customer_id = event.key.split("_")[1]

    # AI analyzes the pattern
    analysis = await app.ai(
        f"Analyze customer {customer_id} event: {event.key} changed to {event.data}",
        schema=CustomerEventAnalysis
    )

    await log_customer_event(customer_id, analysis)

Event object:

{
    "key": "system_status",
    "data": "degraded",
    "previous_data": "healthy",
    "scope": "workflow",
    "timestamp": "2024-07-08T18:20:00Z"
}

Production Patterns

Pattern 1: Multi-Agent Coordination

# Agent 1: Document Processor
@app.reasoner()
async def process_document(doc_id: str):
    """Processes document and stores results for other agents."""

    # Extract text
    text = await extract_text(doc_id)
    await app.memory.set(f"doc_{doc_id}_text", text)

    # Call legal analyzer
    legal = await app.call("legal-agent.analyze", doc_id=doc_id)

    # Call financial analyzer
    financial = await app.call("financial-agent.analyze", doc_id=doc_id)

    return {"legal": legal, "financial": financial}

# Agent 2: Legal Analyzer
@app.reasoner()
async def analyze_legal(doc_id: str):
    """Reads text from memory, analyzes legal compliance."""

    # Get text from memory (set by document processor)
    text = await app.memory.get(f"doc_{doc_id}_text")

    # AI analyzes legal compliance
    analysis = await app.ai(
        "Analyze legal compliance",
        text,
        schema=LegalAnalysis
    )

    # Store results for other agents
    await app.memory.set(f"doc_{doc_id}_legal", analysis.dict())

    return analysis.dict()

# Agent 3: Financial Analyzer
@app.reasoner()
async def analyze_financial(doc_id: str):
    """Reads text from memory, extracts financial terms."""

    # Get text from memory
    text = await app.memory.get(f"doc_{doc_id}_text")

    # AI extracts financial terms
    terms = await app.ai(
        "Extract financial terms and obligations",
        text,
        schema=FinancialTerms
    )

    # Store results
    await app.memory.set(f"doc_{doc_id}_financial", terms.dict())

    return terms.dict()

All three agents coordinate through memory. No manual state passing. No shared database configuration.

Pattern 2: Event-Driven Workflows

# Agent A: Order Processor
@app.reasoner()
async def process_order(order_id: str):
    """Processes order and updates status."""

    await app.memory.set(f"order_{order_id}_status", "processing")

    # Process payment
    payment = await process_payment(order_id)

    if payment.success:
        # Update status (triggers event)
        await app.memory.set(f"order_{order_id}_status", "paid")
    else:
        await app.memory.set(f"order_{order_id}_status", "payment_failed")

# Agent B: Fulfillment (reacts to status changes)
@app.memory.on_change("order_*_status")
async def handle_order_status_change(event):
    """Automatically triggered when order status changes."""

    order_id = event.key.split("_")[1]

    if event.data == "paid":
        # AI determines fulfillment strategy
        strategy = await app.ai(
            f"Determine fulfillment strategy for order {order_id}",
            schema=FulfillmentStrategy
        )

        # Start fulfillment
        await app.call(
            "warehouse-agent.fulfill_order",
            order_id=order_id,
            strategy=strategy.dict()
        )

# Agent C: Analytics (also reacts to status changes)
@app.memory.on_change("order_*_status")
async def track_order_metrics(event):
    """Tracks order metrics for analytics."""

    order_id = event.key.split("_")[1]

    await update_metrics({
        "order_id": order_id,
        "status": event.data,
        "previous_status": event.previous_data,
        "timestamp": event.timestamp
    })

Multiple agents react to the same state changes. No manual event wiring. No message queue setup.

What This Enables

For Multi-Agent Systems

Agents coordinate without central orchestration. Triage agent sets sentiment, escalation agent reads it, analytics agent tracks trends. All automatic.

For Team Collaboration

Marketing team's agents share campaign data. Sales team's agents access it. Both teams deploy independently. Memory fabric connects them automatically.

For Complex Workflows

Build workflows where agents respond to state changes intelligently. No manual event wiring. No custom message passing. Reactive patterns built-in.

For Production Systems

Memory is durable, scoped, and observable. Every write is tracked. Full history for audit. No data loss on agent restart. Production-grade guarantees.

Comparison: Traditional vs Agentfield

CapabilityTraditional Multi-AgentAgentfield
SetupConfigure Redis, database, or message queueZero configuration required
SyncWrite custom synchronization logicAutomatic synchronization
ScopingManual key prefixing and namespacingHierarchical scopes built-in
EventsBuild pub/sub system yourselfReal-time change events included
ConsistencyHandle race conditions manuallyControl plane manages consistency
CleanupImplement lifecycle managementAutomatic scope-based cleanup
DebuggingAdd logging for every memory operationFull history and audit trails
FailuresHandle Redis/database downtime yourselfDurable, fault-tolerant storage

Next Steps

You now understand how Agentfield enables zero-config state sharing:

Or start building with the Quick Start Guide.