Workflow API
Automatic workflow tracking and execution context management
The Workflow API provides automatic execution tracking, DAG (Directed Acyclic Graph) building, and parent-child relationship management for all reasoner and skill calls. Every execution is tracked with metadata, timing information, and hierarchical relationships.
Automatic Workflow Tracking
All reasoners decorated with @app.reasoner automatically participate in workflow tracking. No additional configuration required.
from agentfield import Agent
app = Agent(node_id="workflow_agent")
@app.reasoner
async def parent_task(data: str) -> dict:
# This creates a root execution node
result1 = await app.call("agent1.process", data=data)
# This creates a child execution node
result2 = await app.call("agent2.analyze", result=result1)
return {"result1": result1, "result2": result2}Workflow DAG Generated:
{
"workflow_id": "wf-abc123",
"nodes": [
{
"execution_id": "exec-parent",
"agent_node_id": "workflow_agent",
"reasoner_name": "parent_task",
"status": "completed",
"duration_ms": 2500
},
{
"execution_id": "exec-child1",
"agent_node_id": "agent1",
"reasoner_name": "process",
"parent_execution_id": "exec-parent",
"status": "completed",
"duration_ms": 1200
},
{
"execution_id": "exec-child2",
"agent_node_id": "agent2",
"reasoner_name": "analyze",
"parent_execution_id": "exec-parent",
"status": "completed",
"duration_ms": 800
}
],
"edges": [
{ "from": "exec-parent", "to": "exec-child1" },
{ "from": "exec-parent", "to": "exec-child2" }
]
}Workflow tracking is enabled by default for all reasoners and cannot be disabled in the current Python SDK.
Execution Context
The ExecutionContext class carries workflow metadata through the execution chain. It's automatically created and propagated for all reasoner calls.
Context Properties
from agentfield.execution_context import ExecutionContext
@app.reasoner
async def context_aware(
data: str,
execution_context: ExecutionContext = None
) -> dict:
if execution_context:
return {
"execution_id": execution_context.execution_id,
"workflow_id": execution_context.workflow_id,
"parent_execution_id": execution_context.parent_execution_id,
"agent_node_id": execution_context.agent_node_id,
"reasoner_name": execution_context.reasoner_name
}
return {"error": "No execution context"}Key Properties:
execution_id- Unique identifier for this executionworkflow_id- Identifier for the entire workflowparent_execution_id- Parent execution (if this is a child call)agent_node_id- Agent that owns this executionreasoner_name- Name of the reasoner being executed
Parent-Child Relationships
Cross-agent calls automatically create parent-child relationships in the workflow DAG.
@app.reasoner
async def orchestrator(ticket_text: str) -> dict:
"""Parent task that coordinates multiple agents."""
# Child execution 1
sentiment = await app.call(
"sentiment_agent.analyze",
text=ticket_text
)
# Child execution 2
priority = await app.call(
"priority_agent.classify",
ticket_text=ticket_text
)
# Child execution 3 (depends on previous results)
routing = await app.call(
"routing_agent.route",
sentiment=sentiment,
priority=priority
)
return {
"sentiment": sentiment,
"priority": priority,
"routing": routing
}Resulting DAG:
orchestrator (parent)
├── sentiment_agent.analyze (child 1)
├── priority_agent.classify (child 2)
└── routing_agent.route (child 3)Workflow Visualization
The Agentfield UI automatically visualizes workflow DAGs showing:
- Execution flow and dependencies
- Timing information for each node
- Success/failure status
- Input/output data for each execution
@app.reasoner
async def sequential_workflow(data: str) -> dict:
# Step 1
processed = await app.call(
"processor.process",
data=data
)
# Step 2 (depends on step 1)
analyzed = await app.call(
"analyzer.analyze",
data=processed
)
# Step 3 (depends on step 2)
final = await app.call(
"finalizer.finalize",
data=analyzed
)
return finalDAG: processor → analyzer → finalizer
@app.reasoner
async def parallel_workflow(data: str) -> dict:
# Execute in parallel (no dependencies)
import asyncio
results = await asyncio.gather(
app.call("agent1.process", data=data),
app.call("agent2.process", data=data),
app.call("agent3.process", data=data)
)
return {"results": results}DAG: Three parallel branches from root
@app.reasoner
async def complex_workflow(data: str) -> dict:
# Phase 1: Parallel processing
import asyncio
phase1 = await asyncio.gather(
app.call("agent1.extract", data=data),
app.call("agent2.extract", data=data)
)
# Phase 2: Combine results
combined = await app.call(
"combiner.combine",
results=phase1
)
# Phase 3: Final analysis
final = await app.call(
"analyzer.analyze",
data=combined
)
return finalDAG: Diamond pattern with parallel start, merge, then final step
Execution Metadata
Every execution automatically tracks:
- Timing: Start time, end time, duration in milliseconds
- Status: queued, running, completed, failed
- Input/Output: Function arguments and return values
- Errors: Exception messages and stack traces (if failed)
- Context: Workflow ID, parent relationships, agent information
@app.reasoner
async def tracked_execution(data: str) -> dict:
# Execution metadata automatically captured:
# - Start timestamp
# - Input: {"data": "..."}
# - Agent: workflow_agent
# - Reasoner: tracked_execution
result = await app.ai(user=data)
# Execution metadata automatically captured:
# - End timestamp
# - Duration: calculated automatically
# - Output: result
# - Status: completed
return resultWorkflow Tracking Behavior
All @app.reasoner executions participate in workflow tracking, DAG building, and execution context propagation. There is no decorator flag to disable this behavior in the Python SDK.
Cross-Agent Workflow Coordination
Workflows can span multiple agents, with the Agentfield server coordinating execution and maintaining the complete DAG.
# Agent 1: Data Collector
app1 = Agent(node_id="collector")
@app1.reasoner
async def collect_data(source: str) -> dict:
# Collect data from source
data = await fetch_from_source(source)
# Pass to processor agent
processed = await app1.call(
"processor.process",
data=data
)
return processed
# Agent 2: Data Processor
app2 = Agent(node_id="processor")
@app2.reasoner
async def process(data: dict) -> dict:
# Process the data
result = await app2.ai(user=f"Process: {data}")
# Pass to analyzer agent
analyzed = await app2.call(
"analyzer.analyze",
data=result
)
return analyzed
# Agent 3: Data Analyzer
app3 = Agent(node_id="analyzer")
@app3.reasoner
async def analyze(data: dict) -> dict:
# Final analysis
return await app3.ai(user=f"Analyze: {data}")Complete Workflow DAG:
collector.collect_data (Agent 1)
└── processor.process (Agent 2)
└── analyzer.analyze (Agent 3)Workflow Querying
Query workflow execution history via the Agentfield REST API:
# Get workflow details
curl http://localhost:8080/api/v1/workflows/wf-abc123
# Get execution details
curl http://localhost:8080/api/v1/executions/exec-xyz789
# List all executions in a workflow
curl http://localhost:8080/api/v1/workflows/wf-abc123/executionsReal-Time Workflow Updates
Subscribe to workflow events via Server-Sent Events (SSE):
curl -N -H "Accept: text/event-stream" \
http://localhost:8080/api/v1/workflows/wf-abc123/eventsEvent Stream:
event: workflow.started
data: {"workflow_id": "wf-abc123", "status": "running"}
event: execution.started
data: {"execution_id": "exec-1", "reasoner": "process"}
event: execution.completed
data: {"execution_id": "exec-1", "status": "completed", "duration_ms": 1200}
event: workflow.completed
data: {"workflow_id": "wf-abc123", "status": "completed", "total_duration_ms": 2500}Best Practices
1. Use Descriptive Reasoner Names
# Good: Clear, descriptive names
@app.reasoner
async def analyze_customer_sentiment(message: str) -> dict:
pass
# Avoid: Generic names
@app.reasoner
async def process(data: str) -> dict:
pass2. Structure Complex Workflows
# Break complex workflows into logical steps
@app.reasoner
async def ticket_analysis_workflow(ticket: dict) -> dict:
# Step 1: Extract information
extracted = await app.call(
"extractor.extract_ticket_info",
ticket=ticket
)
# Step 2: Classify priority
priority = await app.call(
"classifier.classify_priority",
info=extracted
)
# Step 3: Route to team
routing = await app.call(
"router.route_to_team",
priority=priority,
info=extracted
)
return {
"extracted": extracted,
"priority": priority,
"routing": routing
}3. Handle Errors Gracefully
@app.reasoner
async def robust_workflow(data: str) -> dict:
try:
result = await app.call("agent1.process", data=data)
return {"status": "success", "result": result}
except Exception as e:
# Error automatically tracked in workflow DAG
return {"status": "error", "message": str(e)}4. Use Parallel Execution When Possible
import asyncio
@app.reasoner
async def parallel_analysis(text: str) -> dict:
# Execute independent tasks in parallel
sentiment, entities, summary = await asyncio.gather(
app.call("sentiment_agent.analyze", text=text),
app.call("entity_agent.extract", text=text),
app.call("summary_agent.summarize", text=text)
)
return {
"sentiment": sentiment,
"entities": entities,
"summary": summary
}Related
- Python SDK Overview - Getting started guide
- @app.reasoner - Reasoner decorator with workflow tracking
- app.call() - Cross-agent communication
- Agent Class - Agent initialization
- AgentRouter - Organize reasoners with routers