Async Execution
Queue long-running AI tasks with webhook callbacks and status polling
Async Execution
Queue long-running AI tasks with webhook callbacks and status polling
Queue long-running AI tasks and receive immediate confirmation. Agentfield returns an execution_id instantly while processing continues in the background. Get notified via webhook when complete, or poll for status updates.
Why Async Matters for Autonomous Software
AI reasoning, multi-step workflows, and LLM calls create unpredictable latency—from seconds to minutes, or even hours to days for complex nested workflows. Synchronous execution blocks your application during this time. Async execution lets you queue the work, return immediately to users, and handle completion asynchronously. This pattern is essential for realistic autonomous software where AI agents coordinate complex tasks.
Architecture: How It Works Under the Hood
Agentfield's control plane handles async execution natively, supporting arbitrarily long-running executions without timeout limits. This is critical for nested agent workflows where Agent A calls Agent B, which calls Agent C—all running asynchronously.
The Flow:
-
Client → Control Plane: Client POSTs to
/api/v1/execute/async/{target}. Control plane creates execution record (status:running), forwards request to agent with headers (X-Run-ID,X-Execution-ID), and returns202 Acceptedimmediately. -
Control Plane → Agent: Agent receives request with
X-Execution-IDheader, returns202 Acceptedimmediately, and starts reasoner in background. Execution record stays inrunningstate. -
Agent Processing: Reasoner runs as long as needed (hours/days). Nested
app.call()operations work seamlessly. All state tracked in control plane. -
Agent → Control Plane: When finished, agent POSTs to
POST /api/v1/executions/{execution_id}/statuswith status (succeeded/failed), result payload, duration, and optional progress updates. -
Control Plane Callback: Status handler updates execution record, writes result to storage, emits SSE events, updates workflow state, and triggers registered webhooks.
-
Clients Poll/SSE/Webhooks: Original caller polls
/api/v1/executions/{id}, listens to SSE, or receives webhook. No "context deadline exceeded" even for extremely long runs.
SDK Coverage: Python and Go SDKs implement this automatically—no configuration needed. Agents inherit async semantics as soon as they receive control plane headers.
HTTP API
Queue Execution
Submit a long-running task and receive an execution ID immediately:
curl -X POST http://localhost:8080/api/v1/execute/async/research-agent.deep_analysis \
-H "Content-Type: application/json" \
-d '{
"input": {
"topic": "autonomous software trends",
"depth": "comprehensive"
},
"webhook": {
"url": "https://app.example.com/webhooks/agentfield",
"secret": "your-webhook-secret"
}
}'Response (202 Accepted):
{
"execution_id": "exec_abc123",
"workflow_id": "wf_xyz789",
"run_id": "run_def456",
"status": "queued",
"target": "research-agent.deep_analysis",
"type": "reasoner",
"created_at": "2024-01-15T10:30:45Z",
"webhook_registered": true,
"webhook_error": null
}The execution processes in the background while your application remains responsive. Use the execution_id to track progress or wait for webhook notification. If webhook validation fails (for example, invalid URL scheme), Agentfield still returns 202 Accepted but sets webhook_registered to false and includes a message in webhook_error so you can surface the issue to users.
Check Status
Poll for execution status when webhooks aren't available:
curl http://localhost:8080/api/v1/executions/exec_abc123Response (200 OK):
{
"execution_id": "exec_abc123",
"run_id": "run_def456",
"status": "running",
"started_at": "2024-01-15T10:30:50Z",
"completed_at": null,
"duration_ms": null,
"result": null,
"error": null,
"webhook_registered": true
}Status Values:
queued- Accepted and waiting to startpending- Scheduled but not yet dispatched (legacy name in older responses)running- Currently executingsucceeded- Completed successfullyfailed- Execution failed (inspect theerrorfield)cancelled- Execution was cancelledtimeout- Execution exceeded time limit
Batch Status Check
Monitor multiple concurrent AI agents efficiently:
curl -X POST http://localhost:8080/api/v1/executions/batch-status \
-H "Content-Type: application/json" \
-d '{
"execution_ids": ["exec_abc123", "exec_def456", "exec_ghi789"]
}'Response (200 OK):
{
"exec_abc123": {
"execution_id": "exec_abc123",
"run_id": "run_def456",
"status": "succeeded",
"result": {
"summary": "Comprehensive analysis of 23 sources...",
"sources": 23
},
"error": null,
"started_at": "2024-01-15T10:30:45Z",
"completed_at": "2024-01-15T10:33:15Z",
"duration_ms": 150000,
"webhook_registered": true
},
"exec_def456": {
"execution_id": "exec_def456",
"run_id": "run_def456",
"status": "running",
"result": null,
"error": null,
"started_at": "2024-01-15T10:32:10Z",
"completed_at": null,
"duration_ms": null,
"webhook_registered": false
}
}Webhook Callbacks
When execution completes, Agentfield POSTs to your webhook URL with the result:
{
"event": "execution.completed",
"execution_id": "exec_abc123",
"workflow_id": "wf_xyz789",
"status": "succeeded",
"target": "research-agent.deep_analysis",
"type": "reasoner",
"duration_ms": 127450,
"result": {
"summary": "Comprehensive analysis of 23 sources...",
"sources": 23
},
"timestamp": "2024-01-15T10:33:15Z"
}Webhook Configuration
Configure webhooks when submitting async executions:
{
"input": {
"topic": "market analysis"
},
"webhook": {
"url": "https://app.example.com/webhooks/agentfield",
"secret": "your-webhook-secret",
"headers": {
"X-Custom-ID": "research-123",
"X-User-ID": "user_456"
}
}
}Webhook Security
Agentfield signs webhook payloads with HMAC-SHA256 whenever you supply a webhook secret. Verify the signature to ensure authenticity:
import hmac
import hashlib
def verify_webhook(request_body: bytes, signature: str, secret: str) -> bool:
"""Verify Agentfield webhook signature"""
if not signature.startswith("sha256="):
return False
expected = hmac.new(
secret.encode(),
request_body,
hashlib.sha256
).hexdigest()
# Signature format: "sha256=<hex_digest>"
received = signature.replace("sha256=", "")
return hmac.compare_digest(expected, received)
# In your webhook handler
@app.post("/webhooks/agentfield")
async def handle_agentfield_webhook(request: Request):
body = await request.body()
signature = request.headers.get("X-Agentfield-Signature", "")
if not verify_webhook(body, signature, "your-webhook-secret"):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = await request.json()
# Process completion
if payload["event"] == "execution.completed":
execution_id = payload["execution_id"]
result = payload["result"]
# Update your application state
await notify_user(execution_id, result)
return {"status": "received"}Webhook Headers:
X-Agentfield-Signature:sha256=<hex_signature>(present only when a webhook secret is provided)- Custom headers from
webhook.headersconfiguration
Rust Example
Using the reqwest crate for async HTTP requests:
use reqwest;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
// Submit async execution
let response = client
.post("http://localhost:8080/api/v1/execute/async/research-agent.deep_analysis")
.header("Content-Type", "application/json")
.header("X-Session-ID", "session_123")
.json(&json!({
"input": {
"topic": "autonomous software trends",
"depth": "comprehensive"
},
"webhook": {
"url": "https://app.example.com/webhooks/agentfield",
"secret": "your-webhook-secret"
}
}))
.send()
.await?;
let result: serde_json::Value = response.json().await?;
let execution_id = result["execution_id"].as_str().unwrap();
println!("Queued: {}", execution_id);
// Poll for status
loop {
let status_response = client
.get(format!("http://localhost:8080/api/v1/executions/{}", execution_id))
.send()
.await?;
let status: serde_json::Value = status_response.json().await?;
match status["status"].as_str().unwrap() {
"succeeded" => {
println!("Result: {:?}", status["result"]);
break;
}
"failed" | "cancelled" | "timeout" => {
eprintln!("Error: {:?}", status["error"]);
break;
}
_ => {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
}
}
Ok(())
}Autonomous Software Patterns
Multi-Agent Coordination
Coordinate multiple AI agents asynchronously for complex workflows using REST:
# Agent A: Sentiment analysis (fast, 2-3 seconds)
curl -X POST http://localhost:8080/api/v1/execute/async/sentiment-agent.analyze \
-H "Content-Type: application/json" \
-H "X-Workflow-ID: wf_customer_support_001" \
-d '{
"input": {
"messages": ["This is frustrating!", "Third time calling"]
}
}' | jq -r '.execution_id' > sentiment_exec.txt
# Agent B: Deep research (slow, 2-5 minutes)
curl -X POST http://localhost:8080/api/v1/execute/async/research-agent.comprehensive_analysis \
-H "Content-Type: application/json" \
-H "X-Workflow-ID: wf_customer_support_001" \
-d '{
"input": {
"customer_id": "cust_123",
"context": "escalation"
}
}' | jq -r '.execution_id' > research_exec.txt
# Agent C: Report generation (medium, 30-60 seconds)
curl -X POST http://localhost:8080/api/v1/execute/async/report-agent.generate \
-H "Content-Type: application/json" \
-H "X-Workflow-ID: wf_customer_support_001" \
-d '{
"input": {
"sentiment_id": "'$(cat sentiment_exec.txt)'",
"research_id": "'$(cat research_exec.txt)'"
}
}'
# All agents work in parallel - use webhooks or poll for completionWebhook-First Architecture
Design autonomous systems around webhook notifications:
# Submit long-running AI task with webhook
curl -X POST http://localhost:8080/api/v1/execute/async/research-agent.deep_analysis \
-H "Content-Type: application/json" \
-H "X-Session-ID: session_user_456" \
-H "X-Actor-ID: user_456" \
-d '{
"input": {
"topic": "competitive landscape",
"depth": "comprehensive"
},
"webhook": {
"url": "https://app.example.com/webhooks/agentfield",
"secret": "your-webhook-secret",
"headers": {
"X-User-ID": "user_456"
}
}
}'
# Response (202 Accepted):
# {
# "execution_id": "exec_abc123",
# "run_id": "run_xyz789",
# "workflow_id": "run_xyz789",
# "status": "queued",
# "webhook_registered": true
# }
# Return immediately to user - webhook will notify when completeWebhook Handler (Python example):
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
app = FastAPI()
@app.post("/webhooks/agentfield")
async def handle_completion(request: Request):
body = await request.body()
signature = request.headers.get("X-Agentfield-Signature", "")
# Verify signature
expected = hmac.new(
b"your-webhook-secret",
body,
hashlib.sha256
).hexdigest()
if not signature.startswith("sha256="):
raise HTTPException(status_code=401)
if not hmac.compare_digest(f"sha256={expected}", signature):
raise HTTPException(status_code=401)
payload = await request.json()
if payload["event"] == "execution.completed":
# Update database
await db.update_analysis(
execution_id=payload["execution_id"],
result=payload["result"],
status="succeeded"
)
# Notify user (look up the recipient using your application data)
await notify_user_for_run(
run_id=payload["workflow_id"],
execution_id=payload["execution_id"],
result_url=f"/results/{payload['execution_id']}"
)
return {"status": "received"}Handling Backpressure
Agentfield returns 429 Too Many Requests when the execution queue is full. Handle with retry logic:
#!/bin/bash
# Function to submit with retry
submit_with_retry() {
local target=$1
local input=$2
local max_retries=3
local attempt=0
while [ $attempt -lt $max_retries ]; do
response=$(curl -s -w "\n%{http_code}" -X POST \
"http://localhost:8080/api/v1/execute/async/${target}" \
-H "Content-Type: application/json" \
-d "${input}")
http_code=$(echo "$response" | tail -n1)
body=$(echo "$response" | head -n-1)
if [ "$http_code" = "202" ]; then
echo "$body" | jq -r '.execution_id'
return 0
elif [ "$http_code" = "429" ]; then
wait_time=$((2 ** attempt))
echo "Queue full, retrying in ${wait_time}s..." >&2
sleep $wait_time
attempt=$((attempt + 1))
else
echo "Error: HTTP $http_code" >&2
return 1
fi
done
echo "Failed after $max_retries attempts" >&2
return 1
}
# Usage
execution_id=$(submit_with_retry \
"research-agent.deep_analysis" \
'{"input": {"topic": "market analysis"}}')
echo "Queued: $execution_id"Rust Example with Retry:
use reqwest;
use serde_json::json;
use tokio::time::{sleep, Duration};
async fn submit_with_retry(
client: &reqwest::Client,
target: &str,
input: serde_json::Value,
max_retries: u32,
) -> Result<String, Box<dyn std::error::Error>> {
for attempt in 0..max_retries {
let response = client
.post(format!("http://localhost:8080/api/v1/execute/async/{}", target))
.json(&json!({"input": input}))
.send()
.await?;
match response.status().as_u16() {
202 => {
let result: serde_json::Value = response.json().await?;
return Ok(result["execution_id"].as_str().unwrap().to_string());
}
429 if attempt < max_retries - 1 => {
let wait_time = 2_u64.pow(attempt);
eprintln!("Queue full, retrying in {}s...", wait_time);
sleep(Duration::from_secs(wait_time)).await;
}
code => {
return Err(format!("HTTP error: {}", code).into());
}
}
}
Err("Failed after max retries".into())
}JavaScript/TypeScript
Use the HTTP API directly from JavaScript applications:
// Submit async execution
const response = await fetch(
"http://localhost:8080/api/v1/execute/async/research-agent.deep_analysis",
{
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Session-ID": "session_123",
},
body: JSON.stringify({
input: {
topic: "autonomous software trends",
depth: "comprehensive",
},
webhook: {
url: "https://app.example.com/webhooks/agentfield",
secret: "your-webhook-secret",
},
}),
}
);
const { execution_id, webhook_registered, webhook_error } = await response.json();
console.log("Queued:", execution_id);
console.log("Webhook registered:", webhook_registered);
if (!webhook_registered && webhook_error) {
console.warn("Webhook registration error:", webhook_error);
}
// Poll for status
async function pollStatus(executionId) {
const response = await fetch(
`http://localhost:8080/api/v1/executions/${executionId}`
);
const status = await response.json();
if (status.status === "succeeded") {
console.log("Result:", status.result);
return status.result;
}
if (["failed", "cancelled", "timeout"].includes(status.status)) {
throw new Error(status.error ?? "Execution did not complete");
}
// Poll again after delay
await new Promise((resolve) => setTimeout(resolve, 5000));
return pollStatus(executionId);
}
// Wait for completion
const result = await pollStatus(execution_id);Related
- REST API - Complete HTTP endpoint reference
- app.call - Cross-agent communication (uses async internally)
- Webhooks - Webhook configuration and patterns
- Server-Sent Events - Real-time execution updates via SSE