Memory Events

Real-time pub/sub system for memory changes with pattern-based subscriptions and event history

Memory Events

Real-time pub/sub system for memory changes with pattern-based subscriptions and event history

Real-time event system that notifies agents when memory changes occur. Subscribe to memory changes with wildcard patterns, filter by scope, and query historical events.

Zero-Config Pub/Sub: Unlike traditional message brokers (Redis, RabbitMQ), Agentfield's event system requires no setup. Events are automatically published when memory changes, with automatic scope filtering based on execution context.

Basic Example

from agentfield import Agent

app = Agent(node_id="notification-agent")

# Subscribe to customer data changes
@app.memory.on_change("customer_*")
async def handle_customer_update(event):
    print(f"Customer {event.key} changed")
    print(f"Action: {event.action}")  # "set" or "delete"
    print(f"New value: {event.data}")
    print(f"Previous value: {event.previous_data}")

    # Trigger notification
    if event.action == "set":
        await send_notification(event.data)

# Memory changes trigger the event
await app.memory.set("customer_123", {"name": "Alice", "tier": "premium"})
# → Triggers handle_customer_update

Pattern Matching

Subscribe to memory changes using wildcard patterns for flexible event filtering.

Single Pattern

# Match all keys starting with "order_"
@app.memory.on_change("order_*")
async def handle_order_changes(event):
    order_id = event.key.replace("order_", "")
    await process_order_update(order_id, event.data)

Multiple Patterns

# Autonomous fraud detection - AI agents coordinate without human rules
@app.memory.on_change(["transaction_*", "user_behavior_*", "risk_signal_*"])
async def autonomous_fraud_prevention(event):
    """Multi-agent AI system detects and prevents fraud autonomously.

    Replaces: Traditional fraud detection with hardcoded rules,
    separate risk scoring services, and manual review queues.
    """

    # AI analyzes the event in context of fraud patterns
    fraud_assessment = await app.ai(
        system="You are a fraud detection specialist. Analyze events for suspicious patterns.",
        user=f"Event: {event.key}\nData: {event.data}\nPrevious: {event.previous_data}",
        schema=FraudAssessment
    )

    if fraud_assessment.risk_level == "high":
        # AI decides response strategy - no manual intervention
        response_plan = await app.ai(
            system="Determine fraud prevention actions based on risk assessment.",
            user=f"Assessment: {fraud_assessment}",
            schema=FraudResponse
        )

        # Execute AI's autonomous decision
        await app.memory.set(f"fraud_action_{event.id}", {
            "action": response_plan.action,
            "reasoning": response_plan.reasoning,
            "auto_executed": True
        })

Pattern Rules

Prop

Type

# Pattern matching examples
@app.memory.on_change("customer_*")           # customer_123, customer_456
@app.memory.on_change("user_*.preferences")   # user_123.preferences
@app.memory.on_change("order_*.*.status")     # order_123.payment.status
@app.memory.on_change("*_priority")           # ticket_priority, task_priority

Scoped Subscriptions

Filter events by memory scope to receive only relevant changes.

Session-scoped event subscriptions - Only receive events for a specific user session.

# Subscribe to events in a specific session
session_mem = app.memory.session("user_123")

@session_mem.on_change("preferences.*")
async def handle_session_preferences(event):
    # Only receives events for session "user_123"
    await update_user_ui(event.data)

# Trigger event
await session_mem.set("preferences.theme", "dark")
# → Triggers handle_session_preferences

Use cases:

  • User preference changes
  • Session-specific notifications
  • User activity tracking

Actor-scoped event subscriptions - Only receive events for a specific agent instance.

# Subscribe to events for a specific agent
actor_mem = app.memory.actor("support-agent-001")

@actor_mem.on_change("queue_*")
async def handle_queue_changes(event):
    # Only receives events for actor "support-agent-001"
    await update_agent_dashboard(event.data)

Use cases:

  • Agent-specific configuration changes
  • Internal cache invalidation
  • Agent health monitoring

Workflow-scoped event subscriptions - Only receive events for a specific workflow execution.

# Subscribe to events in a specific workflow
workflow_mem = app.memory.workflow("wf-abc123")

@workflow_mem.on_change("task_*")
async def handle_workflow_tasks(event):
    # Only receives events for workflow "wf-abc123"
    await track_workflow_progress(event.data)

Use cases:

  • Workflow progress tracking
  • Task completion notifications
  • Cross-agent coordination within workflow

Global event subscriptions - Receive events for system-wide memory changes.

# Subscribe to global memory changes
@app.memory.global_scope.on_change("feature_flags")
async def handle_feature_flag_changes(event):
    # Receives events for global scope changes
    await reload_feature_flags(event.data)

Use cases:

  • Feature flag updates
  • System configuration changes
  • Global rate limit adjustments

Event Object

Every event handler receives a MemoryChangeEvent object with complete change information.

Prop

Type

Event Object Example

@app.memory.on_change("customer_*")
async def handle_customer_event(event):
    # Access event properties
    print(f"Event ID: {event.id}")
    print(f"Timestamp: {event.timestamp}")
    print(f"Scope: {event.scope}/{event.scope_id}")
    print(f"Key: {event.key}")
    print(f"Action: {event.action}")

    # Compare old and new values
    if event.action == "set":
        if event.previous_data:
            print(f"Changed from: {event.previous_data}")
        print(f"Changed to: {event.data}")

    # Access metadata
    agent_id = event.metadata.get("agent_node_id")
    print(f"Changed by agent: {agent_id}")

Event History

Query historical memory change events with filtering and time-based queries.

Basic History Query

from datetime import datetime, timedelta

# Get last 100 events
events = await app.memory.events.history(limit=100)

for event in events:
    print(f"{event.timestamp}: {event.key} {event.action}")

Filtered History

# Get events matching patterns
events = await app.memory.events.history(
    patterns=["customer_*", "order_*"],
    limit=50
)

# Get events since specific time
since_time = datetime.now() - timedelta(hours=1)
events = await app.memory.events.history(
    since=since_time,
    limit=100
)

# Get events for specific scope
events = await app.memory.events.history(
    scope="session",
    scope_id="user_123",
    limit=50
)

Scoped History

# Get history for specific session
session_mem = app.memory.session("user_123")
events = await session_mem.events.history(
    patterns=["preferences.*"],
    limit=20
)

# Get history for specific workflow
workflow_mem = app.memory.workflow("wf-abc123")
events = await workflow_mem.events.history(
    since=datetime.now() - timedelta(minutes=30)
)

Common Patterns

Autonomous Content Moderation

AI agents autonomously moderate content across platforms without human-defined rules.

@app.memory.on_change("user_content_*")
async def autonomous_content_moderation(event):
    """AI moderates content in real-time, learning from context.

    Replaces: Manual moderation queues, hardcoded filter lists,
    separate ML inference services, and rule-based content policies.
    """

    # AI analyzes content with full context awareness
    moderation_decision = await app.ai(
        system="You are a content moderator. Analyze content for policy violations considering context, intent, and cultural nuances.",
        user=f"Content: {event.data}\nUser history: {event.metadata.get('user_history')}\nContext: {event.metadata.get('context')}",
        schema=ModerationDecision
    )

    if moderation_decision.action != "approve":
        # AI determines appropriate response - no manual escalation
        response_strategy = await app.ai(
            system="Determine how to handle flagged content - remove, warn, educate, or escalate.",
            user=f"Decision: {moderation_decision}",
            schema=ModerationResponse
        )

        # Execute autonomous moderation action
        await app.memory.set(f"moderation_action_{event.key}", {
            "action": response_strategy.action,
            "reasoning": moderation_decision.reasoning,
            "user_message": response_strategy.user_notification,
            "auto_executed": True
        })

Autonomous Trading System

AI agents coordinate trading decisions based on real-time market intelligence.

# Agent A: Market Intelligence
@app.reasoner
async def analyze_market_conditions(symbol: str):
    """AI analyzes market and stores intelligence for other agents."""

    market_analysis = await app.ai(
        system="You are a quantitative analyst. Analyze market conditions and identify opportunities.",
        user=f"Symbol: {symbol}\nMarket data: {await fetch_market_data(symbol)}",
        schema=MarketAnalysis
    )

    await app.memory.set(f"market_intel_{symbol}", {
        "analysis": market_analysis,
        "timestamp": datetime.now().isoformat()
    })
    # → Triggers autonomous trading agent

# Agent B: Autonomous Trading
@app.memory.on_change("market_intel_*")
async def autonomous_trading_decision(event):
    """AI makes trading decisions autonomously - no human intervention.

    Replaces: Traditional algorithmic trading with hardcoded strategies,
    separate risk management systems, and manual trade approval workflows.
    """

    symbol = event.key.replace("market_intel_", "")

    # AI decides whether to trade based on market intelligence
    trading_decision = await app.ai(
        system="You are an autonomous trader. Make trading decisions based on market analysis and risk parameters.",
        user=f"Market analysis: {event.data}\nPortfolio: {await get_portfolio()}\nRisk limits: {await get_risk_limits()}",
        schema=TradingDecision
    )

    if trading_decision.should_trade:
        # AI executes trade autonomously
        await app.memory.set(f"trade_execution_{symbol}", {
            "action": trading_decision.action,
            "quantity": trading_decision.quantity,
            "reasoning": trading_decision.reasoning,
            "confidence": trading_decision.confidence,
            "executed_at": datetime.now().isoformat()
        })

Intelligent Resource Scaling

AI autonomously scales infrastructure based on predicted demand patterns.

@app.memory.on_change(["system_load_*", "request_rate_*", "latency_*"])
async def autonomous_infrastructure_scaling(event):
    """AI predicts demand and scales infrastructure autonomously.

    Replaces: Manual scaling policies, threshold-based autoscaling,
    separate monitoring systems, and DevOps intervention.
    """

    # AI analyzes system metrics and predicts scaling needs
    scaling_decision = await app.ai(
        system="You are an infrastructure optimizer. Predict demand patterns and determine optimal scaling strategy.",
        user=f"Current metrics: {event.data}\nHistorical patterns: {await get_historical_metrics()}\nCost constraints: {await get_budget_limits()}",
        schema=ScalingDecision
    )

    if scaling_decision.should_scale:
        # AI determines exact scaling parameters
        scaling_plan = await app.ai(
            system="Determine precise scaling configuration - instance types, counts, regions.",
            user=f"Decision: {scaling_decision}\nCurrent infrastructure: {await get_current_infrastructure()}",
            schema=ScalingPlan
        )

        # Execute autonomous scaling
        await app.memory.set(f"scaling_action_{event.timestamp}", {
            "plan": scaling_plan,
            "predicted_load": scaling_decision.predicted_load,
            "cost_impact": scaling_decision.estimated_cost,
            "reasoning": scaling_decision.reasoning,
            "auto_executed": True
        })

Autonomous Incident Response

AI detects anomalies and responds to incidents without human intervention.

@app.memory.on_change(["error_rate_*", "latency_spike_*", "service_health_*"])
async def autonomous_incident_response(event):
    """AI detects, diagnoses, and resolves incidents autonomously.

    Replaces: Manual on-call rotations, runbook procedures,
    separate incident management tools, and escalation workflows.
    """

    # AI analyzes the incident with full system context
    incident_analysis = await app.ai(
        system="You are an SRE expert. Analyze incidents, determine root cause, and recommend remediation.",
        user=f"Alert: {event.key}\nMetrics: {event.data}\nSystem state: {await get_system_state()}\nRecent changes: {await get_recent_deployments()}",
        schema=IncidentAnalysis
    )

    if incident_analysis.severity in ["critical", "high"]:
        # AI determines remediation strategy
        remediation_plan = await app.ai(
            system="Create incident remediation plan with specific actions and rollback strategies.",
            user=f"Analysis: {incident_analysis}",
            schema=RemediationPlan
        )

        # Execute autonomous remediation
        await app.memory.set(f"incident_response_{event.id}", {
            "analysis": incident_analysis,
            "remediation": remediation_plan,
            "actions_taken": remediation_plan.automated_actions,
            "human_notification": remediation_plan.requires_human_review,
            "executed_at": datetime.now().isoformat()
        })

        # AI can autonomously execute safe remediation actions
        if remediation_plan.safe_to_auto_execute:
            await execute_remediation(remediation_plan)

State Machine Transitions

React to state changes in workflows.

@app.memory.on_change("workflow_state")
async def handle_state_transition(event):
    old_state = event.previous_data
    new_state = event.data

    print(f"Workflow transitioned: {old_state}{new_state}")

    # Trigger state-specific actions
    if new_state == "completed":
        await finalize_workflow()
    elif new_state == "failed":
        await handle_workflow_failure()

Storage & Transport

Agentfield's event system uses different storage backends based on deployment environment.

Storage Backends

BoltDB + SQLite

  • Memory Data: BoltDB (embedded key-value store)
  • Event History: SQLite (embedded relational database)
  • File Location: agentfield.db in working directory
  • Zero Configuration: Works out of the box
  • Use Case: Development, testing, single-server deployments
# Events stored in SQLite table
# Automatic cleanup after 7 days (default TTL)

PostgreSQL

  • Memory Data: PostgreSQL JSONB columns for flexible storage
  • Event History: Dedicated memory_events table with indexing
  • Scalability: Supports distributed Agentfield servers sharing same database
  • Performance: Optimized queries with automatic indexing on scope, scope_id, and key
  • Use Case: Production, multi-server deployments

PostgreSQL configuration and connection details are managed in Agentfield server settings. See Deployment Guide for database setup.

Event TTL and Cleanup

Events are automatically cleaned up after a retention period to prevent unbounded growth.

# Default event retention: 7 days
# Events older than 7 days are automatically deleted

# Query recent events (within retention period)
events = await app.memory.events.history(
    since=datetime.now() - timedelta(days=6),
    limit=1000
)

Event TTL is configured at the Agentfield server level. See Deployment Guide for production configuration options.

Transport Mechanisms

Agentfield supports two real-time transport mechanisms for event delivery.

Prop

Type

# WebSocket (default) - automatic in Python SDK
@app.memory.on_change("customer_*")
async def handle_event(event):
    # Uses WebSocket connection automatically
    pass

# For transport details, see:
# - WebSocket documentation: /docs/websockets
# - SSE documentation: /docs/server-sent-events

The Python SDK automatically manages WebSocket connections with reconnection logic. See WebSockets and Server-Sent Events for transport-specific details.

Memory Retention Coming Soon

Automatic memory cleanup policies based on scope and age.

Planned Features:

  • Configurable retention periods per scope (workflow: 1 day, session: 30 days, etc.)
  • Automatic cleanup of expired memory entries
  • Retention policy configuration in agent config
  • Manual retention overrides per memory key
# Future API (not yet implemented)
app = Agent(
    node_id="my-agent",
    memory_config={
        "retention": {
            "workflow": "1d",    # 1 day
            "session": "30d",    # 30 days
            "actor": "90d",      # 90 days
            "global": "never"    # Never expire
        }
    }
)

Advanced

Direct MemoryEventClient Usage

For advanced use cases, you can access the underlying MemoryEventClient directly.

from agentfield.memory_events import MemoryEventClient
from agentfield.execution_context import ExecutionContext

# Create event client manually
event_client = MemoryEventClient(
    base_url="http://localhost:8080",
    execution_context=ExecutionContext(
        workflow_id="wf-123",
        session_id="session-456"
    )
)

# Connect to event stream
await event_client.connect(
    patterns=["customer_*"],
    scope="workflow",
    scope_id="wf-123"
)

# Programmatic subscription
def my_handler(event):
    print(f"Event: {event.key}")

subscription = event_client.subscribe(
    patterns=["order_*"],
    callback=my_handler,
    scope="session",
    scope_id="session-456"
)

# Unsubscribe
subscription.unsubscribe()

# Close connection
await event_client.close()

Reconnection Behavior

The event client automatically reconnects when connections drop.

# Automatic reconnection with exponential backoff
# - Max attempts: 5
# - Delays: 1s, 2s, 4s, 8s, 16s
# - Subscriptions preserved across reconnections

# Connection lifecycle
event_client = MemoryEventClient(base_url, context)
await event_client.connect()  # Initial connection

# If connection drops:
# 1. Client detects disconnection
# 2. Waits with exponential backoff
# 3. Reconnects automatically
# 4. Resubscribes to all patterns
# 5. Resumes event delivery

Pattern Matcher

Custom pattern matching for advanced filtering.

from agentfield.memory_events import PatternMatcher

# Check if key matches pattern
matches = PatternMatcher.matches_pattern("customer_*", "customer_123")
# → True

matches = PatternMatcher.matches_pattern("user_*.preferences", "user_456.preferences")
# → True

# Use in custom logic
def should_process_event(event, patterns):
    for pattern in patterns:
        if PatternMatcher.matches_pattern(pattern, event.key):
            return True
    return False

Best Practices

Event Handler Performance

Keep event handlers fast to avoid blocking event delivery.

# ✅ Good - async and non-blocking
@app.memory.on_change("customer_*")
async def handle_customer_update(event):
    # Quick processing
    await send_notification(event.data)

# ❌ Bad - blocking operations
@app.memory.on_change("customer_*")
async def handle_customer_update(event):
    # Slow synchronous operation blocks event loop
    time.sleep(5)  # Don't do this!
    process_data(event.data)

Pattern Specificity

Use specific patterns to reduce unnecessary event processing.

# ✅ Good - specific pattern
@app.memory.on_change("customer_*.payment_method")
async def handle_payment_changes(event):
    await update_billing(event.data)

# ❌ Bad - overly broad pattern
@app.memory.on_change("*")
async def handle_all_changes(event):
    # Receives ALL memory changes - very inefficient
    if "payment" in event.key:
        await update_billing(event.data)

Error Handling

Handle errors gracefully to prevent event handler failures.

@app.memory.on_change("order_*")
async def handle_order_event(event):
    try:
        await process_order(event.data)
    except Exception as e:
        # Log error but don't crash
        print(f"Error processing order event: {e}")

        # Optionally store failed event for retry
        await app.memory.set(f"failed_event_{event.id}", {
            "event": event.to_dict(),
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        })

Scope Selection

Choose appropriate scopes for event subscriptions.

# Workflow scope - for task coordination
@app.memory.on_change("task_*")
async def handle_workflow_tasks(event):
    # Only receives events in current workflow
    pass

# Session scope - for user-specific events
session_mem = app.memory.session(user_id)
@session_mem.on_change("preferences.*")
async def handle_user_preferences(event):
    # Only receives events for this user's session
    pass

# Global scope - for system-wide events
@app.memory.global_scope.on_change("feature_flags")
async def handle_feature_flags(event):
    # Receives global configuration changes
    pass