Memory Events
Real-time pub/sub system for memory changes with pattern-based subscriptions and event history
Memory Events
Real-time pub/sub system for memory changes with pattern-based subscriptions and event history
Real-time event system that notifies agents when memory changes occur. Subscribe to memory changes with wildcard patterns, filter by scope, and query historical events.
Zero-Config Pub/Sub: Unlike traditional message brokers (Redis, RabbitMQ), Agentfield's event system requires no setup. Events are automatically published when memory changes, with automatic scope filtering based on execution context.
Basic Example
from agentfield import Agent
app = Agent(node_id="notification-agent")
# Subscribe to customer data changes
@app.memory.on_change("customer_*")
async def handle_customer_update(event):
print(f"Customer {event.key} changed")
print(f"Action: {event.action}") # "set" or "delete"
print(f"New value: {event.data}")
print(f"Previous value: {event.previous_data}")
# Trigger notification
if event.action == "set":
await send_notification(event.data)
# Memory changes trigger the event
await app.memory.set("customer_123", {"name": "Alice", "tier": "premium"})
# → Triggers handle_customer_updatePattern Matching
Subscribe to memory changes using wildcard patterns for flexible event filtering.
Single Pattern
# Match all keys starting with "order_"
@app.memory.on_change("order_*")
async def handle_order_changes(event):
order_id = event.key.replace("order_", "")
await process_order_update(order_id, event.data)Multiple Patterns
# Autonomous fraud detection - AI agents coordinate without human rules
@app.memory.on_change(["transaction_*", "user_behavior_*", "risk_signal_*"])
async def autonomous_fraud_prevention(event):
"""Multi-agent AI system detects and prevents fraud autonomously.
Replaces: Traditional fraud detection with hardcoded rules,
separate risk scoring services, and manual review queues.
"""
# AI analyzes the event in context of fraud patterns
fraud_assessment = await app.ai(
system="You are a fraud detection specialist. Analyze events for suspicious patterns.",
user=f"Event: {event.key}\nData: {event.data}\nPrevious: {event.previous_data}",
schema=FraudAssessment
)
if fraud_assessment.risk_level == "high":
# AI decides response strategy - no manual intervention
response_plan = await app.ai(
system="Determine fraud prevention actions based on risk assessment.",
user=f"Assessment: {fraud_assessment}",
schema=FraudResponse
)
# Execute AI's autonomous decision
await app.memory.set(f"fraud_action_{event.id}", {
"action": response_plan.action,
"reasoning": response_plan.reasoning,
"auto_executed": True
})Pattern Rules
Prop
Type
# Pattern matching examples
@app.memory.on_change("customer_*") # customer_123, customer_456
@app.memory.on_change("user_*.preferences") # user_123.preferences
@app.memory.on_change("order_*.*.status") # order_123.payment.status
@app.memory.on_change("*_priority") # ticket_priority, task_priorityScoped Subscriptions
Filter events by memory scope to receive only relevant changes.
Session-scoped event subscriptions - Only receive events for a specific user session.
# Subscribe to events in a specific session
session_mem = app.memory.session("user_123")
@session_mem.on_change("preferences.*")
async def handle_session_preferences(event):
# Only receives events for session "user_123"
await update_user_ui(event.data)
# Trigger event
await session_mem.set("preferences.theme", "dark")
# → Triggers handle_session_preferencesUse cases:
- User preference changes
- Session-specific notifications
- User activity tracking
Actor-scoped event subscriptions - Only receive events for a specific agent instance.
# Subscribe to events for a specific agent
actor_mem = app.memory.actor("support-agent-001")
@actor_mem.on_change("queue_*")
async def handle_queue_changes(event):
# Only receives events for actor "support-agent-001"
await update_agent_dashboard(event.data)Use cases:
- Agent-specific configuration changes
- Internal cache invalidation
- Agent health monitoring
Workflow-scoped event subscriptions - Only receive events for a specific workflow execution.
# Subscribe to events in a specific workflow
workflow_mem = app.memory.workflow("wf-abc123")
@workflow_mem.on_change("task_*")
async def handle_workflow_tasks(event):
# Only receives events for workflow "wf-abc123"
await track_workflow_progress(event.data)Use cases:
- Workflow progress tracking
- Task completion notifications
- Cross-agent coordination within workflow
Global event subscriptions - Receive events for system-wide memory changes.
# Subscribe to global memory changes
@app.memory.global_scope.on_change("feature_flags")
async def handle_feature_flag_changes(event):
# Receives events for global scope changes
await reload_feature_flags(event.data)Use cases:
- Feature flag updates
- System configuration changes
- Global rate limit adjustments
Event Object
Every event handler receives a MemoryChangeEvent object with complete change information.
Prop
Type
Event Object Example
@app.memory.on_change("customer_*")
async def handle_customer_event(event):
# Access event properties
print(f"Event ID: {event.id}")
print(f"Timestamp: {event.timestamp}")
print(f"Scope: {event.scope}/{event.scope_id}")
print(f"Key: {event.key}")
print(f"Action: {event.action}")
# Compare old and new values
if event.action == "set":
if event.previous_data:
print(f"Changed from: {event.previous_data}")
print(f"Changed to: {event.data}")
# Access metadata
agent_id = event.metadata.get("agent_node_id")
print(f"Changed by agent: {agent_id}")Event History
Query historical memory change events with filtering and time-based queries.
Basic History Query
from datetime import datetime, timedelta
# Get last 100 events
events = await app.memory.events.history(limit=100)
for event in events:
print(f"{event.timestamp}: {event.key} {event.action}")Filtered History
# Get events matching patterns
events = await app.memory.events.history(
patterns=["customer_*", "order_*"],
limit=50
)
# Get events since specific time
since_time = datetime.now() - timedelta(hours=1)
events = await app.memory.events.history(
since=since_time,
limit=100
)
# Get events for specific scope
events = await app.memory.events.history(
scope="session",
scope_id="user_123",
limit=50
)Scoped History
# Get history for specific session
session_mem = app.memory.session("user_123")
events = await session_mem.events.history(
patterns=["preferences.*"],
limit=20
)
# Get history for specific workflow
workflow_mem = app.memory.workflow("wf-abc123")
events = await workflow_mem.events.history(
since=datetime.now() - timedelta(minutes=30)
)Common Patterns
Autonomous Content Moderation
AI agents autonomously moderate content across platforms without human-defined rules.
@app.memory.on_change("user_content_*")
async def autonomous_content_moderation(event):
"""AI moderates content in real-time, learning from context.
Replaces: Manual moderation queues, hardcoded filter lists,
separate ML inference services, and rule-based content policies.
"""
# AI analyzes content with full context awareness
moderation_decision = await app.ai(
system="You are a content moderator. Analyze content for policy violations considering context, intent, and cultural nuances.",
user=f"Content: {event.data}\nUser history: {event.metadata.get('user_history')}\nContext: {event.metadata.get('context')}",
schema=ModerationDecision
)
if moderation_decision.action != "approve":
# AI determines appropriate response - no manual escalation
response_strategy = await app.ai(
system="Determine how to handle flagged content - remove, warn, educate, or escalate.",
user=f"Decision: {moderation_decision}",
schema=ModerationResponse
)
# Execute autonomous moderation action
await app.memory.set(f"moderation_action_{event.key}", {
"action": response_strategy.action,
"reasoning": moderation_decision.reasoning,
"user_message": response_strategy.user_notification,
"auto_executed": True
})Autonomous Trading System
AI agents coordinate trading decisions based on real-time market intelligence.
# Agent A: Market Intelligence
@app.reasoner
async def analyze_market_conditions(symbol: str):
"""AI analyzes market and stores intelligence for other agents."""
market_analysis = await app.ai(
system="You are a quantitative analyst. Analyze market conditions and identify opportunities.",
user=f"Symbol: {symbol}\nMarket data: {await fetch_market_data(symbol)}",
schema=MarketAnalysis
)
await app.memory.set(f"market_intel_{symbol}", {
"analysis": market_analysis,
"timestamp": datetime.now().isoformat()
})
# → Triggers autonomous trading agent
# Agent B: Autonomous Trading
@app.memory.on_change("market_intel_*")
async def autonomous_trading_decision(event):
"""AI makes trading decisions autonomously - no human intervention.
Replaces: Traditional algorithmic trading with hardcoded strategies,
separate risk management systems, and manual trade approval workflows.
"""
symbol = event.key.replace("market_intel_", "")
# AI decides whether to trade based on market intelligence
trading_decision = await app.ai(
system="You are an autonomous trader. Make trading decisions based on market analysis and risk parameters.",
user=f"Market analysis: {event.data}\nPortfolio: {await get_portfolio()}\nRisk limits: {await get_risk_limits()}",
schema=TradingDecision
)
if trading_decision.should_trade:
# AI executes trade autonomously
await app.memory.set(f"trade_execution_{symbol}", {
"action": trading_decision.action,
"quantity": trading_decision.quantity,
"reasoning": trading_decision.reasoning,
"confidence": trading_decision.confidence,
"executed_at": datetime.now().isoformat()
})Intelligent Resource Scaling
AI autonomously scales infrastructure based on predicted demand patterns.
@app.memory.on_change(["system_load_*", "request_rate_*", "latency_*"])
async def autonomous_infrastructure_scaling(event):
"""AI predicts demand and scales infrastructure autonomously.
Replaces: Manual scaling policies, threshold-based autoscaling,
separate monitoring systems, and DevOps intervention.
"""
# AI analyzes system metrics and predicts scaling needs
scaling_decision = await app.ai(
system="You are an infrastructure optimizer. Predict demand patterns and determine optimal scaling strategy.",
user=f"Current metrics: {event.data}\nHistorical patterns: {await get_historical_metrics()}\nCost constraints: {await get_budget_limits()}",
schema=ScalingDecision
)
if scaling_decision.should_scale:
# AI determines exact scaling parameters
scaling_plan = await app.ai(
system="Determine precise scaling configuration - instance types, counts, regions.",
user=f"Decision: {scaling_decision}\nCurrent infrastructure: {await get_current_infrastructure()}",
schema=ScalingPlan
)
# Execute autonomous scaling
await app.memory.set(f"scaling_action_{event.timestamp}", {
"plan": scaling_plan,
"predicted_load": scaling_decision.predicted_load,
"cost_impact": scaling_decision.estimated_cost,
"reasoning": scaling_decision.reasoning,
"auto_executed": True
})Autonomous Incident Response
AI detects anomalies and responds to incidents without human intervention.
@app.memory.on_change(["error_rate_*", "latency_spike_*", "service_health_*"])
async def autonomous_incident_response(event):
"""AI detects, diagnoses, and resolves incidents autonomously.
Replaces: Manual on-call rotations, runbook procedures,
separate incident management tools, and escalation workflows.
"""
# AI analyzes the incident with full system context
incident_analysis = await app.ai(
system="You are an SRE expert. Analyze incidents, determine root cause, and recommend remediation.",
user=f"Alert: {event.key}\nMetrics: {event.data}\nSystem state: {await get_system_state()}\nRecent changes: {await get_recent_deployments()}",
schema=IncidentAnalysis
)
if incident_analysis.severity in ["critical", "high"]:
# AI determines remediation strategy
remediation_plan = await app.ai(
system="Create incident remediation plan with specific actions and rollback strategies.",
user=f"Analysis: {incident_analysis}",
schema=RemediationPlan
)
# Execute autonomous remediation
await app.memory.set(f"incident_response_{event.id}", {
"analysis": incident_analysis,
"remediation": remediation_plan,
"actions_taken": remediation_plan.automated_actions,
"human_notification": remediation_plan.requires_human_review,
"executed_at": datetime.now().isoformat()
})
# AI can autonomously execute safe remediation actions
if remediation_plan.safe_to_auto_execute:
await execute_remediation(remediation_plan)State Machine Transitions
React to state changes in workflows.
@app.memory.on_change("workflow_state")
async def handle_state_transition(event):
old_state = event.previous_data
new_state = event.data
print(f"Workflow transitioned: {old_state} → {new_state}")
# Trigger state-specific actions
if new_state == "completed":
await finalize_workflow()
elif new_state == "failed":
await handle_workflow_failure()Storage & Transport
Agentfield's event system uses different storage backends based on deployment environment.
Storage Backends
BoltDB + SQLite
- Memory Data: BoltDB (embedded key-value store)
- Event History: SQLite (embedded relational database)
- File Location:
agentfield.dbin working directory - Zero Configuration: Works out of the box
- Use Case: Development, testing, single-server deployments
# Events stored in SQLite table
# Automatic cleanup after 7 days (default TTL)PostgreSQL
- Memory Data: PostgreSQL JSONB columns for flexible storage
- Event History: Dedicated
memory_eventstable with indexing - Scalability: Supports distributed Agentfield servers sharing same database
- Performance: Optimized queries with automatic indexing on scope, scope_id, and key
- Use Case: Production, multi-server deployments
PostgreSQL configuration and connection details are managed in Agentfield server settings. See Deployment Guide for database setup.
Event TTL and Cleanup
Events are automatically cleaned up after a retention period to prevent unbounded growth.
# Default event retention: 7 days
# Events older than 7 days are automatically deleted
# Query recent events (within retention period)
events = await app.memory.events.history(
since=datetime.now() - timedelta(days=6),
limit=1000
)Event TTL is configured at the Agentfield server level. See Deployment Guide for production configuration options.
Transport Mechanisms
Agentfield supports two real-time transport mechanisms for event delivery.
Prop
Type
# WebSocket (default) - automatic in Python SDK
@app.memory.on_change("customer_*")
async def handle_event(event):
# Uses WebSocket connection automatically
pass
# For transport details, see:
# - WebSocket documentation: /docs/websockets
# - SSE documentation: /docs/server-sent-eventsThe Python SDK automatically manages WebSocket connections with reconnection logic. See WebSockets and Server-Sent Events for transport-specific details.
Memory Retention Coming Soon
Automatic memory cleanup policies based on scope and age.
Planned Features:
- Configurable retention periods per scope (workflow: 1 day, session: 30 days, etc.)
- Automatic cleanup of expired memory entries
- Retention policy configuration in agent config
- Manual retention overrides per memory key
# Future API (not yet implemented)
app = Agent(
node_id="my-agent",
memory_config={
"retention": {
"workflow": "1d", # 1 day
"session": "30d", # 30 days
"actor": "90d", # 90 days
"global": "never" # Never expire
}
}
)Advanced
Direct MemoryEventClient Usage
For advanced use cases, you can access the underlying MemoryEventClient directly.
from agentfield.memory_events import MemoryEventClient
from agentfield.execution_context import ExecutionContext
# Create event client manually
event_client = MemoryEventClient(
base_url="http://localhost:8080",
execution_context=ExecutionContext(
workflow_id="wf-123",
session_id="session-456"
)
)
# Connect to event stream
await event_client.connect(
patterns=["customer_*"],
scope="workflow",
scope_id="wf-123"
)
# Programmatic subscription
def my_handler(event):
print(f"Event: {event.key}")
subscription = event_client.subscribe(
patterns=["order_*"],
callback=my_handler,
scope="session",
scope_id="session-456"
)
# Unsubscribe
subscription.unsubscribe()
# Close connection
await event_client.close()Reconnection Behavior
The event client automatically reconnects when connections drop.
# Automatic reconnection with exponential backoff
# - Max attempts: 5
# - Delays: 1s, 2s, 4s, 8s, 16s
# - Subscriptions preserved across reconnections
# Connection lifecycle
event_client = MemoryEventClient(base_url, context)
await event_client.connect() # Initial connection
# If connection drops:
# 1. Client detects disconnection
# 2. Waits with exponential backoff
# 3. Reconnects automatically
# 4. Resubscribes to all patterns
# 5. Resumes event deliveryPattern Matcher
Custom pattern matching for advanced filtering.
from agentfield.memory_events import PatternMatcher
# Check if key matches pattern
matches = PatternMatcher.matches_pattern("customer_*", "customer_123")
# → True
matches = PatternMatcher.matches_pattern("user_*.preferences", "user_456.preferences")
# → True
# Use in custom logic
def should_process_event(event, patterns):
for pattern in patterns:
if PatternMatcher.matches_pattern(pattern, event.key):
return True
return FalseBest Practices
Event Handler Performance
Keep event handlers fast to avoid blocking event delivery.
# ✅ Good - async and non-blocking
@app.memory.on_change("customer_*")
async def handle_customer_update(event):
# Quick processing
await send_notification(event.data)
# ❌ Bad - blocking operations
@app.memory.on_change("customer_*")
async def handle_customer_update(event):
# Slow synchronous operation blocks event loop
time.sleep(5) # Don't do this!
process_data(event.data)Pattern Specificity
Use specific patterns to reduce unnecessary event processing.
# ✅ Good - specific pattern
@app.memory.on_change("customer_*.payment_method")
async def handle_payment_changes(event):
await update_billing(event.data)
# ❌ Bad - overly broad pattern
@app.memory.on_change("*")
async def handle_all_changes(event):
# Receives ALL memory changes - very inefficient
if "payment" in event.key:
await update_billing(event.data)Error Handling
Handle errors gracefully to prevent event handler failures.
@app.memory.on_change("order_*")
async def handle_order_event(event):
try:
await process_order(event.data)
except Exception as e:
# Log error but don't crash
print(f"Error processing order event: {e}")
# Optionally store failed event for retry
await app.memory.set(f"failed_event_{event.id}", {
"event": event.to_dict(),
"error": str(e),
"timestamp": datetime.now().isoformat()
})Scope Selection
Choose appropriate scopes for event subscriptions.
# Workflow scope - for task coordination
@app.memory.on_change("task_*")
async def handle_workflow_tasks(event):
# Only receives events in current workflow
pass
# Session scope - for user-specific events
session_mem = app.memory.session(user_id)
@session_mem.on_change("preferences.*")
async def handle_user_preferences(event):
# Only receives events for this user's session
pass
# Global scope - for system-wide events
@app.memory.global_scope.on_change("feature_flags")
async def handle_feature_flags(event):
# Receives global configuration changes
passRelated
- app.memory - Memory operations and scope management
- WebSockets - WebSocket transport details
- Server-Sent Events - SSE transport alternative
- Execution Context - Understanding automatic scope propagation
- Deployment - Production event storage configuration