Async Execution & Webhooks
Long-running tasks with reliable callback delivery
Async Execution & Webhooks
Production-grade async execution with cryptographic webhook verification
AI calls don't take a predictable amount of time. Sentiment analysis might finish in 200ms. Document processing might take 10 minutes. Report generation could run for an hour.
You can't block HTTP requests for 10 minutes. You can't tie up worker threads waiting for AI. Traditional request-response patterns break.
Production systems need async execution: queue the work, return immediately, get notified when it's done. Agentfield provides this as infrastructure, not as something you build yourself.
How Agentfield's Async Architecture Works
Unlike traditional frameworks that require external job queues or timeout after minutes, Agentfield's control plane natively supports arbitrarily long-running executions—from seconds to hours or even days. This is critical for production autonomous software where agents coordinate complex, nested workflows.
The Key Differentiator
Most agent frameworks require you to build external job queues and timeout handling. They break on long-running tasks or nested agent-to-agent calls.
Agentfield's control plane handles async execution natively with no timeout limits. Agent A can call Agent B, which calls Agent C—all async, all tracked, running for hours or days if needed.
This is a critical production feature that most systems lack, forcing developers to build complex external orchestration.
The Control Plane Flow
Here's what happens under the hood when you execute an agent asynchronously:
Client → Control Plane
Client hits POST /api/v1/execute/async/{target}. The control plane:
- Creates an execution record (status:
running) - Forwards the request to the agent with special headers (
X-Run-ID,X-Execution-ID) - Returns
202 Acceptedimmediately with the execution ID - Network timeout is no longer a limiter—the client is free
Control Plane → Agent
The agent receives the request with X-Execution-ID header and:
- Returns
202 Acceptedimmediately (doesn't block the control plane) - Starts the reasoner in the background
- The execution record stays in
runningstate - Any registered webhook remains pending
Agent Processing
The reasoner runs as long as needed:
- Hours or days for complex workflows
- Nested agent calls via
app.call()work seamlessly - Multi-step reasoning chains execute without timeout
- All execution state is tracked in the control plane
Agent → Control Plane (Status Update)
Once finished (or on error), the agent POSTs to:
POST /api/v1/executions/{execution_id}/statusSending:
- Status (
running|succeeded|failed) - Optional result payload
- Error message (if failed)
- Duration and completion timestamp
- Optional progress updates (for intermediate states)
Control Plane Callback Handling
The status handler:
- Updates the execution record
- Writes result payload to storage
- Marks terminal states (
succeeded,failed) - Emits SSE/stream events for real-time updates
- Updates workflow execution state
- Triggers any registered webhooks
For intermediate updates (status: running + progress), the record stays active but clients receive real-time updates.
Clients Poll/SSE/Webhooks
The original caller (or UI):
- Polls
GET /api/v1/executions/{id}for status - Listens to SSE events for real-time updates
- Receives webhook notifications on completion
- No "context deadline exceeded" even for extremely long runs
Why This Matters for Nested Workflows
Traditional frameworks break when Agent A calls Agent B, which calls Agent C:
# Agent A (traditional framework - breaks after 30s)
@app.reasoner()
async def orchestrate_research(topic: str):
# This times out if agent B takes > 30 seconds
analysis = await call_agent_b(topic) # ❌ Timeout!
return analysis
# Agent B calls Agent C
@app.reasoner()
async def deep_analysis(topic: str):
# This also times out if agent C is slow
data = await call_agent_c(topic) # ❌ Timeout!
return process(data)With Agentfield's async architecture:
# Agent A (Agentfield - no timeout limits)
@app.reasoner()
async def orchestrate_research(topic: str):
# Runs for hours if needed, no timeout
analysis = await app.call("agent-b.deep_analysis", topic=topic) ✅
return analysis
# Agent B calls Agent C
@app.reasoner()
async def deep_analysis(topic: str):
# Also runs for hours, nested calls work seamlessly
data = await app.call("agent-c.gather_data", topic=topic) ✅
return process(data)The control plane tracks the entire workflow DAG, handles all async coordination, and ensures no execution is lost—even if it runs for days.
SDK Coverage
Both Python and Go SDKs implement this pattern automatically:
- No new configuration needed
- Any agent built with these SDKs inherits async semantics
- As soon as the agent receives control plane headers, it handles async execution
- Developers write normal code—the SDK handles the complexity
# Python SDK - async execution is automatic
from agentfield import Agent
app = Agent("research-agent")
@app.reasoner()
async def long_running_analysis(data: dict) -> dict:
"""
This can run for hours or days when executed async.
The SDK handles all control plane communication automatically.
"""
# Your business logic here
# Nested app.call() works seamlessly
result = await app.call("other-agent.process", data=data)
return result
# When called via /execute/async, this runs in the background
# and reports status back to the control plane automaticallyWhat You'd Otherwise Build
Traditional Async
What you build:
- Message queue setup (RabbitMQ, SQS)
- Worker pool management
- Retry logic with exponential backoff
- Webhook delivery system
- Queue monitoring and metrics
- Dead letter queues
- Backpressure controls
Then you write business logic.
Agentfield Async
What you write:
curl -X POST .../execute/async/agent.function \
-d '{"input": {...}, "webhook": {"url": "..."}}'Agentfield provides:
- ✓ Durable queue (PostgreSQL)
- ✓ Automatic retries
- ✓ Webhook delivery with retries
- ✓ Backpressure controls
- ✓ Queue depth limits
- ✓ Fair scheduling
- ✓ Complete observability
The Timeout Problem
Here's what breaks in production:
Scenario: User uploads a 50-page contract. Your system needs to:
- Extract text from PDF (30 seconds)
- Analyze legal compliance (2 minutes)
- Identify financial terms (1 minute)
- Generate executive summary (1 minute)
- Create detailed report (30 seconds)
Total time: ~5 minutes
Traditional approach fails:
- API gateway times out at 30 seconds
- Frontend connection drops
- Worker threads are blocked
- You can't scale (each request holds a thread for 5 minutes)
- Retries create duplicate work
Agentfield's approach: Queue the work, return immediately, deliver results via webhook when done.
How Async Execution Works
Agentfield provides two patterns: webhooks (push) and polling (pull). Choose based on your infrastructure.
Pattern 1: Webhooks (Recommended)
Queue work and get notified when it completes:
# Queue async execution with webhook
curl -X POST http://af-server/api/v1/execute/async/document-processor.analyze_contract \
-H "Content-Type: application/json" \
-d '{
"input": {
"contract_id": "contract-12345",
"document_url": "https://storage.example.com/contract.pdf"
},
"webhook": {
"url": "https://your-app.com/api/agentfield/callback",
"secret": "your-webhook-secret-key",
"headers": {
"X-Custom-Auth": "Bearer your-api-token"
}
}
}'Immediate response (202 Accepted):
{
"execution_id": "exec-abc123",
"run_id": "wf-def456",
"workflow_id": "wf-def456",
"status": "queued",
"target": "document-processor.analyze_contract",
"type": "reasoner",
"created_at": "2024-07-08T18:20:00Z",
"webhook_registered": true,
"webhook_error": null
}Your application continues immediately. No blocking. No timeouts.
If the webhook registration is rejected (for example, because the URL has an unsupported scheme) the response still returns 202 Accepted, but webhook_registered is false and webhook_error contains the validation message so you can fall back to polling or prompt the user to correct their configuration.
Python SDK example:
from agentfield.client import AgentFieldClient
from agentfield.types import WebhookConfig
client = AgentFieldClient("https://af-server")
execution_id = await client.execute_async(
target="document-processor.analyze_contract",
input_data={"contract_id": "contract-12345"},
webhook=WebhookConfig(
url="https://your-app.com/api/agentfield/callback",
secret="your-webhook-secret-key",
headers={"X-Custom-Auth": "Bearer your-api-token"},
),
)5 minutes later, Agentfield POSTs to your webhook:
{
"event": "execution.completed",
"execution_id": "exec-abc123",
"workflow_id": "wf-def456",
"status": "completed",
"target": "document-processor.analyze_contract",
"type": "reasoner",
"duration_ms": 305000,
"result": {
"compliance_status": "approved",
"financial_terms": {
"total_value": 500000,
"payment_schedule": "quarterly"
},
"risks_identified": 3,
"report_url": "https://storage.example.com/reports/contract-12345.pdf"
},
"timestamp": "2024-07-08T18:25:05Z"
}Pattern 2: Polling (Alternative)
If webhooks aren't suitable (firewall restrictions, local development), poll for status:
# Queue without webhook
curl -X POST http://af-server/api/v1/execute/async/document-processor.analyze_contract \
-H "Content-Type: application/json" \
-d '{
"input": {
"contract_id": "contract-12345",
"document_url": "https://storage.example.com/contract.pdf"
}
}'Poll for status:
curl http://af-server/api/v1/execute/status/exec-abc123Response:
{
"execution_id": "exec-abc123",
"workflow_id": "wf-def456",
"status": "completed",
"target": "document-processor.analyze_contract",
"result": {
/* execution result */
},
"duration_ms": 305000,
"created_at": "2024-07-08T18:20:00Z",
"completed_at": "2024-07-08T18:25:05Z"
}Status values:
queued- Waiting for workerrunning- Currently executingcompleted- Finished successfullyfailed- Execution failed
What Agentfield Handles Automatically
When you queue async execution, Agentfield provides production infrastructure:
Durable Queue Management
Agentfield uses PostgreSQL with FOR UPDATE SKIP LOCKED for lease-based processing:
- Work is persisted before execution starts
- Workers lease executions atomically (no duplicate processing)
- Failed workers release leases automatically
- Queue depth limits prevent resource exhaustion
If a worker crashes mid-execution, the lease expires and another worker picks it up. No lost work.
Automatic Retries
Transient failures (network issues, temporary service unavailability) are retried automatically:
- Exponential backoff between retries
- Configurable retry limits
- Detailed error tracking in execution notes
Your code doesn't handle retries. Agentfield does.
Webhook Delivery with Retries
Agentfield retries failed webhook deliveries:
Retry schedule:
- Immediate
- 1 second later
- 2 seconds later
- 4 seconds later
- 8 seconds later
- 16 seconds later
Total: Up to 6 attempts over ~31 seconds
Delivery tracking: Agentfield records delivery status in execution notes:
✅ Webhook delivery succeeded on attempt 1or
❌ Webhook delivery failed after 6 attempts: connection timeoutBackpressure Controls
Agentfield prevents queue overload:
- Queue depth limits per agent
- Waiter caps (max queued executions)
- Fair scheduling (prevents one tenant from monopolizing workers)
Your system stays stable under load.
Cryptographic Webhook Verification
Agentfield signs webhook payloads using HMAC-SHA256 (GitHub-style):
Signature header:
X-Agentfield-Signature: sha256=a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3Verify in your application:
import hmac
import hashlib
from fastapi import Request, HTTPException
WEBHOOK_SECRET = "your-webhook-secret-key"
def verify_webhook_signature(body: bytes, signature: str) -> bool:
"""Verify Agentfield webhook signature."""
# Compute expected signature
expected = hmac.new(
WEBHOOK_SECRET.encode(),
body,
hashlib.sha256
).hexdigest()
# Extract signature from header (format: "sha256=<hex>")
received = signature.split("=")[1] if "=" in signature else signature
# Constant-time comparison (prevents timing attacks)
return hmac.compare_digest(expected, received)
@app.post("/api/agentfield/callback")
async def handle_agentfield_webhook(request: Request):
"""Handle Agentfield webhook with signature verification."""
body = await request.body()
signature = request.headers.get("X-Agentfield-Signature")
if not signature or not verify_webhook_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Signature verified, process webhook
data = await request.json()
if data["event"] == "execution.completed":
# Handle successful completion
await process_completion(data)
elif data["event"] == "execution.failed":
# Handle failure
await process_failure(data)
return {"status": "processed"}const crypto = require('crypto');
const express = require('express');
const WEBHOOK_SECRET = 'your-webhook-secret-key';
function verifyWebhookSignature(body, signature) {
// Compute expected signature
const expected = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(body)
.digest('hex');
// Extract signature from header
const received = signature.includes('=')
? signature.split('=')[1]
: signature;
// Constant-time comparison
return crypto.timingSafeEqual(
Buffer.from(expected),
Buffer.from(received)
);
}
// Express handler (use raw body parser)
app.post('/api/agentfield/callback',
express.raw({type: 'application/json'}),
(req, res) => {
const signature = req.headers['x-agentfield-signature'];
if (!signature || !verifyWebhookSignature(req.body, signature)) {
return res.status(401).json({ error: 'Invalid signature' });
}
// Signature verified, process webhook
const data = JSON.parse(req.body);
if (data.event === 'execution.completed') {
// Handle completion
processCompletion(data);
} else if (data.event === 'execution.failed') {
// Handle failure
processFailure(data);
}
res.json({ status: 'processed' });
}
);package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"strings"
)
const webhookSecret = "your-webhook-secret-key"
func verifyWebhookSignature(body []byte, signature string) bool {
// Compute expected signature
h := hmac.New(sha256.New, []byte(webhookSecret))
h.Write(body)
expected := hex.EncodeToString(h.Sum(nil))
// Extract signature from header
received := signature
if strings.Contains(signature, "=") {
parts := strings.Split(signature, "=")
received = parts[1]
}
// Constant-time comparison
return hmac.Equal([]byte(expected), []byte(received))
}
func handleAgentfieldWebhook(w http.ResponseWriter, r *http.Request) {
// Read body
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
}
// Verify signature
signature := r.Header.Get("X-Agentfield-Signature")
if !verifyWebhookSignature(body, signature) {
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
// Parse webhook data
var data map[string]interface{}
if err := json.Unmarshal(body, &data); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// Process webhook
event := data["event"].(string)
if event == "execution.completed" {
// Handle completion
processCompletion(data)
} else if event == "execution.failed" {
// Handle failure
processFailure(data)
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "processed"})
}require 'sinatra'
require 'openssl'
require 'json'
WEBHOOK_SECRET = 'your-webhook-secret-key'
def verify_webhook_signature(body, signature)
# Compute expected signature
expected = OpenSSL::HMAC.hexdigest('SHA256', WEBHOOK_SECRET, body)
# Extract signature from header
received = signature.include?('=') ? signature.split('=')[1] : signature
# Constant-time comparison
Rack::Utils.secure_compare(expected, received)
end
post '/api/agentfield/callback' do
# Read raw body
request.body.rewind
body = request.body.read
# Verify signature
signature = request.env['HTTP_X_BRAIN_SIGNATURE']
halt 401, 'Invalid signature' unless verify_webhook_signature(body, signature)
# Parse webhook data
data = JSON.parse(body)
# Process webhook
case data['event']
when 'execution.completed'
process_completion(data)
when 'execution.failed'
process_failure(data)
end
{ status: 'processed' }.to_json
endWhy signature verification matters:
- Prevents webhook spoofing
- Ensures payloads haven't been tampered with
- Required for compliance and security audits
Production Integration Patterns
Event-Driven Workflows
Chain multiple agents together using webhooks:
from fastapi import FastAPI, Request, HTTPException
import httpx
app = FastAPI()
@app.post("/api/agentfield/callback")
async def handle_agentfield_webhook(request: Request):
"""
Webhook handler that chains multi-agent workflows.
Each completion triggers the next step.
"""
body = await request.body()
signature = request.headers.get("X-Agentfield-Signature")
# Verify signature
if not verify_webhook_signature(body, signature):
raise HTTPException(status_code=401)
data = await request.json()
if data["event"] == "execution.completed":
target = data["target"]
# Step 1 complete: Start step 2
if target == "document-processor.extract_text":
await queue_async_execution(
"legal-analyzer.check_compliance",
{"text": data["result"]["extracted_text"]}
)
# Step 2 complete: Start step 3
elif target == "legal-analyzer.check_compliance":
await queue_async_execution(
"report-generator.create_summary",
{"compliance_data": data["result"]}
)
# Step 3 complete: Notify user
elif target == "report-generator.create_summary":
await notify_user(
user_id=data["result"]["user_id"],
report_url=data["result"]["report_url"]
)
elif data["event"] == "execution.failed":
# Handle failure at any step
await log_error(data["error_message"])
await notify_admin(data)
return {"status": "processed"}
async def queue_async_execution(target: str, input_data: dict):
"""Queue next step in workflow."""
async with httpx.AsyncClient() as client:
await client.post(
f"http://af-server/api/v1/execute/async/{target}",
json={
"input": input_data,
"webhook": {
"url": "https://your-app.com/api/agentfield/callback",
"secret": WEBHOOK_SECRET
}
}
)Background Job Processing
Integrate with existing job queues:
from celery import Celery
import httpx
celery_app = Celery('tasks')
@celery_app.task
def process_user_data(user_id: int):
"""
Celery task that queues Agentfield execution.
Agentfield handles the heavy AI work asynchronously.
"""
# Queue async Agentfield execution
response = httpx.post(
"http://af-server/api/v1/execute/async/analytics.deep_analysis",
json={
"input": {"user_id": user_id},
"webhook": {
"url": f"{CALLBACK_URL}/agentfield/analytics-complete",
"secret": WEBHOOK_SECRET
}
}
)
execution_id = response.json()["execution_id"]
# Store execution_id for tracking
db.jobs.update(
job_id=current_task.request.id,
agentfield_execution_id=execution_id,
status="queued"
)
return {"execution_id": execution_id}Frontend Integration
Build responsive UIs that don't block on AI:
// React component with async execution
import { useState, useEffect } from "react";
function DocumentAnalyzer({ documentId }) {
const [executionId, setExecutionId] = useState(null);
const [status, setStatus] = useState("idle");
const [result, setResult] = useState(null);
const startAnalysis = async () => {
// Queue async execution
const response = await fetch("/api/agentfield/analyze", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
document_id: documentId,
}),
});
const data = await response.json();
setExecutionId(data.execution_id);
setStatus("queued");
// Poll for status (or use WebSocket for real-time updates)
pollStatus(data.execution_id);
};
const pollStatus = async (execId) => {
const interval = setInterval(async () => {
const response = await fetch(`/api/v1/execute/status/${execId}`);
const data = await response.json();
setStatus(data.status);
if (data.status === "completed") {
setResult(data.result);
clearInterval(interval);
} else if (data.status === "failed") {
clearInterval(interval);
}
}, 2000); // Poll every 2 seconds
};
return (
<div>
<button onClick={startAnalysis} disabled={status !== "idle"}>
Analyze Document
</button>
{status === "queued" && <p>Queued for processing...</p>}
{status === "running" && <p>Analysis in progress...</p>}
{status === "completed" && (
<div>
<h3>Analysis Complete</h3>
<pre>{JSON.stringify(result, null, 2)}</pre>
</div>
)}
{status === "failed" && <p>Analysis failed. Please try again.</p>}
</div>
);
}Best Practices
Always Verify Signatures
Never trust webhook payloads without signature verification. Use constant-time comparison to prevent timing attacks.
Handle Idempotency
Webhooks may be delivered multiple times (retries). Design handlers to be idempotent using execution_id as deduplication key.
Respond Quickly
Webhook handlers should respond within 5 seconds. Queue heavy processing for later to avoid delivery timeouts.
Log Everything
Log all webhook deliveries with execution_id, timestamp, and status for debugging and audit trails.
What This Enables
For Long-Running Tasks:
- Process large datasets without timeout issues
- Generate reports that take minutes or hours
- Run complex AI analysis without blocking requests
- Handle batch processing efficiently
For Event-Driven Systems:
- Chain multi-agent workflows automatically
- Trigger downstream processes on completion
- Build reactive, scalable architectures
- Integrate with existing job queues
For Production Reliability:
- Automatic retries handle transient failures
- Durable queue prevents lost work
- Cryptographic verification prevents spoofing
- Complete audit trail for compliance
For User Experience:
- Responsive frontends that don't block
- Progress tracking for long operations
- Graceful handling of failures
- Real-time status updates
Next Steps
You now understand how Agentfield handles async execution:
- Cross-Agent Communication - Chain agents in async workflows
- Shared Memory - Share state across async executions
- Production Features - Complete infrastructure overview
Or start building with the Quick Start Guide.