Server-Sent Events

Real-time streaming of workflow and execution updates

Server-Sent Events

Real-time streaming of workflow and execution updates

Real-time HTTP streaming for workflow events, execution updates, memory changes, and workflow notes. SSE provides live progress tracking as AI agents process multi-step workflows.

Workflow Run Events

Stream all events for a workflow run, including historical events and real-time updates.

curl -N "http://localhost:8080/api/v1/workflows/runs/{run_id}/events/stream"

SSE Stream Format:

event: workflow_run_event
data: {"event_id":1,"run_id":"run_abc123","sequence":1,"event_type":"execution_started","status":"running","emitted_at":"2024-01-15T10:30:45Z"}

event: workflow_run_event
data: {"event_id":2,"run_id":"run_abc123","sequence":2,"event_type":"execution_completed","status":"succeeded","payload":{"duration_ms":1247},"emitted_at":"2024-01-15T10:30:47Z"}

: keep-alive

event: workflow_run_event
data: {"event_id":3,"run_id":"run_abc123","sequence":3,"event_type":"workflow_completed","status":"succeeded","emitted_at":"2024-01-15T10:30:48Z"}

Query Parameters:

Prop

Type

Event Payload Structure:

Prop

Type

Use Case: Multi-Agent Workflow Monitoring

Track complex AI workflows where multiple agents coordinate:

// React component tracking multi-agent workflow
const WorkflowMonitor = ({ runId }) => {
  const [events, setEvents] = useState([]);
  const [status, setStatus] = useState('connecting');

  useEffect(() => {
    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/workflows/runs/${runId}/events/stream`
    );

    eventSource.addEventListener('workflow_run_event', (e) => {
      const event = JSON.parse(e.data);
      setEvents(prev => [...prev, event]);

      // Update UI based on event type
      if (event.event_type === 'workflow_completed') {
        setStatus('completed');
        eventSource.close();
      } else if (event.event_type === 'execution_started') {
        setStatus(`Processing: ${event.payload?.target || 'unknown'}`);
      }
    });

    eventSource.onerror = () => {
      setStatus('disconnected');
      eventSource.close();
    };

    return () => eventSource.close();
  }, [runId]);

  return (
    <div>
      <h3>Workflow Status: {status}</h3>
      <ul>
        {events.map(e => (
          <li key={e.sequence}>
            [{e.sequence}] {e.event_type} - {e.status}
          </li>
        ))}
      </ul>
    </div>
  );
};

Workflow Execution Events

Stream events for a specific execution within a workflow.

curl -N "http://localhost:8080/api/v1/workflows/executions/{execution_id}/events/stream"

SSE Stream Format:

event: workflow_execution_event
data: {"event_id":1,"execution_id":"exec_abc123","workflow_id":"wf_xyz789","sequence":1,"event_type":"execution_started","status":"running","emitted_at":"2024-01-15T10:30:45Z"}

event: workflow_execution_event
data: {"event_id":2,"execution_id":"exec_abc123","sequence":2,"event_type":"execution_completed","status":"succeeded","payload":{"duration_ms":1247},"emitted_at":"2024-01-15T10:30:47Z"}

Event Payload Structure:

Prop

Type

Use Case: Execution Debugging

Debug autonomous agents by streaming execution details:

import asyncio
import aiohttp
import json

async def debug_agent_execution(execution_id: str):
    """Debug AI agent execution with detailed event streaming"""
    url = f"http://localhost:8080/api/v1/workflows/executions/{execution_id}/events/stream"

    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for line in response.content:
                line = line.decode('utf-8').strip()

                if line.startswith('event: '):
                    event_name = line[7:]
                elif line.startswith('data: '):
                    event = json.loads(line[6:])

                    print(f"[{event['sequence']}] {event['event_type']}")
                    print(f"  Status: {event.get('status')}")
                    if event.get('payload'):
                        print(f"  Payload: {event['payload']}")

                    if event['event_type'] == 'execution_completed':
                        print("✅ Execution completed")
                        break
                    elif event['event_type'] == 'execution_failed':
                        print(f"❌ Execution failed: {event.get('status_reason')}")
                        break

# Debug a failing autonomous workflow
await debug_agent_execution("exec_abc123")

Workflow Notes Events

Stream real-time notes added to workflows via app.note(). Useful for tracking AI agent decisions and progress.

curl -N "http://localhost:8080/api/ui/v1/workflows/{workflow_id}/notes/events"

SSE Stream Format:

data: {"type":"connected","workflow_id":"wf_xyz789","message":"Workflow node notes stream connected","timestamp":"2024-01-15T10:30:45Z"}

data: {"type":"heartbeat","timestamp":"2024-01-15T10:31:15Z"}

data: {"type":"workflow_note_added","workflow_id":"wf_xyz789","execution_id":"exec_abc123","note":{"message":"Analyzing customer sentiment","tags":["sentiment","analysis"],"timestamp":"2024-01-15T10:31:20Z"}}

Event Types:

  • connected - Initial connection confirmation
  • heartbeat - Keep-alive message (every 30 seconds)
  • workflow_note_added - New note added to workflow

Use Case: Real-Time AI Decision Tracking

Monitor AI agent reasoning and decisions in real-time:

// Track AI agent notes for observability
const NotesMonitor = ({ workflowId }) => {
  const [notes, setNotes] = useState([]);

  useEffect(() => {
    const eventSource = new EventSource(
      `http://localhost:8080/api/ui/v1/workflows/${workflowId}/notes/events`
    );

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);

      if (event.type === 'workflow_note_added') {
        setNotes(prev => [...prev, event.note]);

        // Log AI decisions
        console.log(`🤖 ${event.note.message}`);
        if (event.note.tags) {
          console.log(`   Tags: ${event.note.tags.join(', ')}`);
        }
      }
    };

    return () => eventSource.close();
  }, [workflowId]);

  return (
    <div>
      <h3>AI Agent Notes</h3>
      {notes.map((note, i) => (
        <div key={i}>
          <p>{note.message}</p>
          <small>{note.timestamp}</small>
        </div>
      ))}
    </div>
  );
};

Memory Change Events

Stream real-time memory updates across distributed agents. Track when agents read/write shared state.

curl -N "http://localhost:8080/api/v1/memory/events/sse?scope=session&scope_id=session_123"

Query Parameters:

Prop

Type

SSE Stream Format:

event: message
data: {"id":"evt_123","type":"memory_change","timestamp":"2024-01-15T10:30:45Z","scope":"session","scope_id":"session_123","key":"user_preferences","action":"set","data":{"theme":"dark","language":"en"},"metadata":{"agent_id":"support-agent","workflow_id":"wf_xyz789"}}

event: message
data: {"id":"evt_124","type":"memory_change","timestamp":"2024-01-15T10:30:47Z","scope":"session","scope_id":"session_123","key":"conversation_context","action":"set","data":{"last_topic":"billing","sentiment":"frustrated"},"metadata":{"agent_id":"support-agent","workflow_id":"wf_xyz789"}}

Event Payload Structure:

Prop

Type

Use Case: Cross-Agent State Synchronization

Monitor shared state changes across distributed AI agents:

async def monitor_agent_memory(scope: str, scope_id: str):
    """Monitor memory changes for debugging and observability"""
    url = f"http://localhost:8080/api/v1/memory/events/sse?scope={scope}&scope_id={scope_id}"

    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for line in response.content:
                line = line.decode('utf-8').strip()

                if line.startswith('data: '):
                    event = json.loads(line[6:])

                    if event['type'] == 'memory_change':
                        action = event['action']
                        key = event['key']
                        agent = event['metadata'].get('agent_id', 'unknown')

                        if action == 'set':
                            print(f"💾 {agent} set {key} = {event['data']}")
                        elif action == 'delete':
                            print(f"🗑️  {agent} deleted {key}")

# Monitor session memory for autonomous customer service
await monitor_agent_memory("session", "session_user_123")

Execution Events (UI)

Stream all execution events across the system. Useful for building real-time dashboards.

curl -N "http://localhost:8080/api/ui/v1/executions/events"

SSE Stream Format:

data: {"type":"connected","message":"Execution events stream connected","timestamp":"2024-01-15T10:30:45Z"}

data: {"type":"heartbeat","timestamp":"2024-01-15T10:31:15Z"}

data: {"type":"execution_started","execution_id":"exec_abc123","workflow_id":"wf_xyz789","agent_node_id":"support-agent","reasoner_id":"analyze_sentiment","timestamp":"2024-01-15T10:31:20Z"}

data: {"type":"execution_completed","execution_id":"exec_abc123","status":"succeeded","duration_ms":1247,"timestamp":"2024-01-15T10:31:21Z"}

Use Case: System-Wide Monitoring Dashboard

Build real-time dashboards showing all agent activity:

// Real-time execution monitoring
const ExecutionDashboard = () => {
  const [executions, setExecutions] = useState([]);
  const [stats, setStats] = useState({ total: 0, running: 0, succeeded: 0, failed: 0 });

  useEffect(() => {
    const eventSource = new EventSource(
      'http://localhost:8080/api/ui/v1/executions/events'
    );

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);

      if (event.type === 'execution_started') {
        setExecutions(prev => [...prev, { ...event, status: 'running' }]);
        setStats(prev => ({ ...prev, total: prev.total + 1, running: prev.running + 1 }));
      } else if (event.type === 'execution_completed') {
        setExecutions(prev =>
          prev.map(ex =>
            ex.execution_id === event.execution_id
              ? { ...ex, status: event.status, duration_ms: event.duration_ms }
              : ex
          )
        );

        setStats(prev => ({
          ...prev,
          running: prev.running - 1,
          [event.status === 'succeeded' ? 'succeeded' : 'failed']:
            prev[event.status === 'succeeded' ? 'succeeded' : 'failed'] + 1
        }));
      }
    };

    return () => eventSource.close();
  }, []);

  return (
    <div>
      <h3>System Stats</h3>
      <p>Total: {stats.total} | Running: {stats.running} | Succeeded: {stats.succeeded} | Failed: {stats.failed}</p>

      <h3>Recent Executions</h3>
      <ul>
        {executions.slice(-10).map(ex => (
          <li key={ex.execution_id}>
            {ex.agent_node_id}.{ex.reasoner_id} - {ex.status}
            {ex.duration_ms && ` (${ex.duration_ms}ms)`}
          </li>
        ))}
      </ul>
    </div>
  );
};

Resuming from Last Event

Use after_seq to resume streaming from where you left off, avoiding duplicate events:

// Resume SSE connection after disconnect
let lastSequence = 0;

function connectWithResume(runId) {
  const url = lastSequence > 0
    ? `http://localhost:8080/api/v1/workflows/runs/${runId}/events/stream?after_seq=${lastSequence}`
    : `http://localhost:8080/api/v1/workflows/runs/${runId}/events/stream`;

  const eventSource = new EventSource(url);

  eventSource.addEventListener('workflow_run_event', (e) => {
    const event = JSON.parse(e.data);
    lastSequence = event.sequence;  // Track latest sequence

    processEvent(event);
  });

  eventSource.onerror = () => {
    eventSource.close();
    // Reconnect after delay, resuming from last sequence
    setTimeout(() => connectWithResume(runId), 5000);
  };
}

Keep-Alive Mechanism

Agentfield sends : keep-alive\n\n every 30 seconds to prevent connection timeouts. Clients should handle these gracefully:

async def stream_with_keepalive(run_id: str):
    """Handle SSE with keep-alive messages"""
    url = f"http://localhost:8080/api/v1/workflows/runs/{run_id}/events/stream"

    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for line in response.content:
                line = line.decode('utf-8').strip()

                # Ignore keep-alive comments
                if line.startswith(':'):
                    continue

                if line.startswith('event: '):
                    event_type = line[7:]
                elif line.startswith('data: '):
                    event_data = json.loads(line[6:])
                    print(f"Event: {event_type} - {event_data}")

Connection Management

Reconnection Strategy with Exponential Backoff:

class SSEConnection {
  constructor(url, onEvent) {
    this.url = url;
    this.onEvent = onEvent;
    this.lastSequence = 0;
    this.reconnectDelay = 1000;
    this.maxReconnectDelay = 30000;
  }

  connect() {
    const url = this.lastSequence > 0
      ? `${this.url}?after_seq=${this.lastSequence}`
      : this.url;

    this.eventSource = new EventSource(url);

    this.eventSource.addEventListener('workflow_run_event', (e) => {
      const event = JSON.parse(e.data);
      this.lastSequence = event.sequence;
      this.reconnectDelay = 1000;  // Reset delay on success
      this.onEvent(event);
    });

    this.eventSource.onerror = () => {
      this.eventSource.close();
      console.log(`Reconnecting in ${this.reconnectDelay}ms...`);

      setTimeout(() => this.connect(), this.reconnectDelay);

      // Exponential backoff
      this.reconnectDelay = Math.min(
        this.reconnectDelay * 2,
        this.maxReconnectDelay
      );
    };
  }

  close() {
    if (this.eventSource) {
      this.eventSource.close();
    }
  }
}

// Usage
const connection = new SSEConnection(
  'http://localhost:8080/api/v1/workflows/runs/run_abc123/events/stream',
  (event) => console.log('Event:', event)
);

connection.connect();