Shared Memory
Zero-config distributed state management for multi-agent systems
Shared Memory
Zero-config distributed state with automatic scoping
When multiple agents coordinate across process boundaries, they need to share state. Agent A analyzes a customer ticket. Agent B needs that analysis to decide on escalation. Agent C monitors sentiment trends. Agent D generates reports.
Traditional approach: you build the state coordination yourself. Set up Redis or a shared database. Write synchronization logic. Handle race conditions. Implement pub/sub for reactive patterns. Manage memory lifecycle and cleanup.
Agentfield treats memory as infrastructure. Agents share state automatically, with zero configuration. Write in one agent, read in another. It just works.
What You'd Otherwise Build
Traditional State Sharing
What you build:
- Redis or shared database setup
- Key naming conventions (manual namespacing)
- Synchronization logic
- Pub/sub for reactive patterns
- Lifecycle management and cleanup
- Consistency handling
- Race condition management
Then you write business logic.
Agentfield State Sharing
What you write:
await app.memory.set("key", value) # Agent A
value = await app.memory.get("key") # Agent BAgentfield provides:
- ✓ Zero-config distributed state
- ✓ Hierarchical scoping
- ✓ Real-time change events
- ✓ Automatic cleanup
- ✓ Consistency guarantees
- ✓ Durable storage
- ✓ Full observability
The State Coordination Problem
Here's what breaks in production multi-agent systems:
Scenario: Customer support workflow with 5 agents:
- Triage Agent analyzes ticket sentiment
- History Agent checks past issues
- Routing Agent decides which team handles it
- Escalation Agent creates high-priority cases
- Analytics Agent tracks patterns
Each agent needs context from the others. How do they share state?
Traditional approach requires you to:
- Set up shared database (Redis, PostgreSQL, MongoDB)
- Design key naming conventions (manual namespacing)
- Write synchronization code (handle concurrent writes)
- Implement pub/sub (notify agents of changes)
- Manage cleanup (prevent memory leaks)
- Handle failures (what if Redis is down?)
Agentfield's approach: await app.memory.set("key", value) in one agent, await app.memory.get("key") in another. The control plane handles everything else.
How It Works: Automatic State Sharing
Memory in Agentfield works across distributed agent nodes without setup:
# Agent 1: Customer Triage (Team A, Server 1)
from agentfield import Agent
from pydantic import BaseModel
app = Agent("customer-triage")
class SentimentAnalysis(BaseModel):
mood: str # "frustrated", "angry", "neutral", "happy"
urgency: str # "low", "medium", "high", "critical"
confidence: float
@app.reasoner()
async def analyze_ticket(ticket_id: str, message: str, customer_id: str):
"""
Analyzes customer ticket and stores context for other agents.
Memory is automatically shared across the workflow.
"""
# AI analyzes sentiment
sentiment = await app.ai(
system="You are a customer sentiment expert.",
user=f"Analyze this support ticket:\n{message}",
schema=SentimentAnalysis
)
# Store in shared memory (other agents can access this)
await app.memory.set(f"ticket_{ticket_id}_sentiment", {
"mood": sentiment.mood,
"urgency": sentiment.urgency,
"confidence": sentiment.confidence,
"analyzed_at": datetime.now().isoformat()
})
# Also store customer context
await app.memory.set(f"customer_{customer_id}_last_interaction", {
"ticket_id": ticket_id,
"sentiment": sentiment.mood,
"timestamp": datetime.now().isoformat()
})
app.note(f"📊 Sentiment: {sentiment.mood}, Urgency: {sentiment.urgency}", tags=["analysis"])
return sentiment.dict()# Agent 2: Escalation Handler (Team B, Server 2, different codebase)
from agentfield import Agent
app = Agent("escalation-handler")
@app.reasoner()
async def check_escalation(ticket_id: str, customer_id: str):
"""
Decides if escalation is needed.
Accesses memory set by triage agent automatically.
"""
# Read sentiment from memory (set by triage agent)
sentiment = await app.memory.get(f"ticket_{ticket_id}_sentiment")
# Read customer history
last_interaction = await app.memory.get(f"customer_{customer_id}_last_interaction")
# AI decides escalation strategy
if sentiment and sentiment["urgency"] in ["high", "critical"]:
strategy = await app.ai(
system="You determine escalation strategies.",
user=f"""
Ticket sentiment: {sentiment['mood']}
Urgency: {sentiment['urgency']}
Last interaction: {last_interaction}
""",
schema=EscalationStrategy
)
if strategy.escalate:
# Create escalation case
case = await create_escalation_case(ticket_id, strategy)
# Store escalation decision in memory
await app.memory.set(f"ticket_{ticket_id}_escalated", {
"case_id": case.id,
"strategy": strategy.dict(),
"escalated_at": datetime.now().isoformat()
})
app.note(f"⚠️ Escalated to case {case.id}", tags=["escalation"])
return {"escalated": True, "case_id": case.id}
return {"escalated": False}What just happened?
- Triage Agent (Server 1) analyzed sentiment and stored it in memory
- Escalation Agent (Server 2, different team) read that sentiment
- No configuration. No shared database setup. No manual synchronization.
- Agentfield's control plane handled the state coordination automatically
Hierarchical Memory Scoping
Agentfield organizes memory in a hierarchy that mirrors how production systems actually work:
Global Scope ← System-wide configuration
↓
Actor Scope ← Agent-private state
↓
Session Scope ← User session data
↓
Workflow Scope ← Task-specific state (most specific)When you call app.memory.get("key"), Agentfield searches scopes in order: workflow → session → actor → global. First match wins.
Workflow Scope (Most Common)
Shared across all agents in a single task
@app.reasoner()
async def process_order(order_id: str):
# Store in workflow scope (default)
await app.memory.set(f"order_{order_id}_status", "processing")
# Call another agent (same workflow)
inventory = await app.call("inventory-agent.check_stock", order_id=order_id)
# That agent can read the status
# status = await app.memory.get(f"order_{order_id}_status") # Gets "processing"Use for:
- Multi-step workflows (order processing, document analysis)
- Agent coordination within a task
- Temporary state that expires with the workflow
Session Scope
Shared across multiple workflows for one user
@app.reasoner()
async def handle_user_request(user_id: str, request: str):
# Store in session scope
await app.memory.set(
f"user_{user_id}_preferences",
{"language": "en", "timezone": "UTC"},
scope="session"
)
# Available across all workflows for this userUse for:
- User preferences and settings
- Conversation history
- Session-specific context
Actor Scope
Private to a specific agent instance
@app.reasoner()
async def process_batch(batch_id: str):
# Store in actor scope (private to this agent)
await app.memory.set(
"processing_queue",
{"current_batch": batch_id, "items_processed": 0},
scope="actor"
)
# Other agents can't see thisUse for:
- Agent's internal state
- Processing queues
- Cache and optimization data
Global Scope
System-wide configuration
@app.reasoner()
async def check_feature_flag(feature: str):
# Read from global scope
flags = await app.memory.get("feature_flags", scope="global")
if flags and flags.get(feature):
# Feature is enabled system-wide
return TrueUse for:
- Feature flags
- System configuration
- Shared datasets
Reactive Patterns with Memory Events
Agentfield publishes real-time events for all memory changes. Build agents that react to state changes automatically.
# Agent A: Monitoring System
from agentfield import Agent
app = Agent("system-monitor")
@app.reasoner()
async def check_system_health(metrics: dict):
"""Monitors system and sets state when issues detected."""
if metrics['error_rate'] > 0.05:
# Set state in memory
await app.memory.set("system_status", "degraded")
await app.memory.set("error_details", {
'error_rate': metrics['error_rate'],
'timestamp': metrics['timestamp'],
'affected_services': metrics['services']
})
app.note("⚠️ System degraded - error rate exceeded threshold", tags=["alert"])# Agent B: Incident Response (different process, different team)
from agentfield import Agent
app = Agent("incident-response")
@app.memory.on_change("system_status")
async def handle_system_status_change(event):
"""
Reacts to system status changes automatically.
Triggered when any agent writes to "system_status".
"""
if event.data == "degraded":
# Get error details from memory
details = await app.memory.get("error_details")
# AI determines incident severity and response
assessment = await app.ai(
system="You assess incident severity and recommend actions.",
user=f"System degraded. Error rate: {details['error_rate']}. Affected: {details['affected_services']}",
schema=IncidentAssessment
)
# Create incident automatically
incident = await app.call(
"incident-management.create_incident",
severity=assessment.severity,
details=details,
recommended_actions=assessment.actions
)
# Notify on-call team
await app.call(
"notification-agent.page_oncall",
incident_id=incident['id'],
severity=assessment.severity
)
app.note(f"🚨 Incident {incident['id']} created, on-call notified", tags=["incident"])Pattern matching:
# React to multiple patterns
@app.memory.on_change(["customer_*_sentiment", "customer_*_escalated"])
async def track_customer_events(event):
"""Reacts to any customer sentiment or escalation change."""
customer_id = event.key.split("_")[1]
# AI analyzes the pattern
analysis = await app.ai(
f"Analyze customer {customer_id} event: {event.key} changed to {event.data}",
schema=CustomerEventAnalysis
)
await log_customer_event(customer_id, analysis)Event object:
{
"key": "system_status",
"data": "degraded",
"previous_data": "healthy",
"scope": "workflow",
"timestamp": "2024-07-08T18:20:00Z"
}Production Patterns
Pattern 1: Multi-Agent Coordination
# Agent 1: Document Processor
@app.reasoner()
async def process_document(doc_id: str):
"""Processes document and stores results for other agents."""
# Extract text
text = await extract_text(doc_id)
await app.memory.set(f"doc_{doc_id}_text", text)
# Call legal analyzer
legal = await app.call("legal-agent.analyze", doc_id=doc_id)
# Call financial analyzer
financial = await app.call("financial-agent.analyze", doc_id=doc_id)
return {"legal": legal, "financial": financial}
# Agent 2: Legal Analyzer
@app.reasoner()
async def analyze_legal(doc_id: str):
"""Reads text from memory, analyzes legal compliance."""
# Get text from memory (set by document processor)
text = await app.memory.get(f"doc_{doc_id}_text")
# AI analyzes legal compliance
analysis = await app.ai(
"Analyze legal compliance",
text,
schema=LegalAnalysis
)
# Store results for other agents
await app.memory.set(f"doc_{doc_id}_legal", analysis.dict())
return analysis.dict()
# Agent 3: Financial Analyzer
@app.reasoner()
async def analyze_financial(doc_id: str):
"""Reads text from memory, extracts financial terms."""
# Get text from memory
text = await app.memory.get(f"doc_{doc_id}_text")
# AI extracts financial terms
terms = await app.ai(
"Extract financial terms and obligations",
text,
schema=FinancialTerms
)
# Store results
await app.memory.set(f"doc_{doc_id}_financial", terms.dict())
return terms.dict()All three agents coordinate through memory. No manual state passing. No shared database configuration.
Pattern 2: Event-Driven Workflows
# Agent A: Order Processor
@app.reasoner()
async def process_order(order_id: str):
"""Processes order and updates status."""
await app.memory.set(f"order_{order_id}_status", "processing")
# Process payment
payment = await process_payment(order_id)
if payment.success:
# Update status (triggers event)
await app.memory.set(f"order_{order_id}_status", "paid")
else:
await app.memory.set(f"order_{order_id}_status", "payment_failed")
# Agent B: Fulfillment (reacts to status changes)
@app.memory.on_change("order_*_status")
async def handle_order_status_change(event):
"""Automatically triggered when order status changes."""
order_id = event.key.split("_")[1]
if event.data == "paid":
# AI determines fulfillment strategy
strategy = await app.ai(
f"Determine fulfillment strategy for order {order_id}",
schema=FulfillmentStrategy
)
# Start fulfillment
await app.call(
"warehouse-agent.fulfill_order",
order_id=order_id,
strategy=strategy.dict()
)
# Agent C: Analytics (also reacts to status changes)
@app.memory.on_change("order_*_status")
async def track_order_metrics(event):
"""Tracks order metrics for analytics."""
order_id = event.key.split("_")[1]
await update_metrics({
"order_id": order_id,
"status": event.data,
"previous_status": event.previous_data,
"timestamp": event.timestamp
})Multiple agents react to the same state changes. No manual event wiring. No message queue setup.
What This Enables
For Multi-Agent Systems
Agents coordinate without central orchestration. Triage agent sets sentiment, escalation agent reads it, analytics agent tracks trends. All automatic.
For Team Collaboration
Marketing team's agents share campaign data. Sales team's agents access it. Both teams deploy independently. Memory fabric connects them automatically.
For Complex Workflows
Build workflows where agents respond to state changes intelligently. No manual event wiring. No custom message passing. Reactive patterns built-in.
For Production Systems
Memory is durable, scoped, and observable. Every write is tracked. Full history for audit. No data loss on agent restart. Production-grade guarantees.
Comparison: Traditional vs Agentfield
| Capability | Traditional Multi-Agent | Agentfield |
|---|---|---|
| Setup | Configure Redis, database, or message queue | Zero configuration required |
| Sync | Write custom synchronization logic | Automatic synchronization |
| Scoping | Manual key prefixing and namespacing | Hierarchical scopes built-in |
| Events | Build pub/sub system yourself | Real-time change events included |
| Consistency | Handle race conditions manually | Control plane manages consistency |
| Cleanup | Implement lifecycle management | Automatic scope-based cleanup |
| Debugging | Add logging for every memory operation | Full history and audit trails |
| Failures | Handle Redis/database downtime yourself | Durable, fault-tolerant storage |
Next Steps
You now understand how Agentfield enables zero-config state sharing:
- Cross-Agent Communication - See how memory works with multi-agent calls
- Async Execution - Share state across async workflows
- Identity & Trust - Cryptographic proof of memory operations
Or start building with the Quick Start Guide.