@app.reasoner

Define AI-powered functions with automatic workflow tracking

@app.reasoner

Define AI-powered functions with automatic workflow tracking

Decorator that transforms Python functions into AI-powered reasoners with automatic REST API endpoints, workflow tracking, and execution context management.

Basic Example

from agentfield import Agent

app = Agent(node_id="support-agent")

@app.reasoner
async def analyze_ticket(ticket_text: str) -> str:
    """Analyze support ticket and suggest resolution."""
    return await app.ai(
        system="You are a support ticket analyzer.",
        user=f"Analyze this ticket: {ticket_text}"
    )
curl -X POST http://localhost:8080/api/v1/execute/support-agent.analyze_ticket \
  -H "Content-Type: application/json" \
  -d '{
    "input": {
      "ticket_text": "Login button not working on mobile app"
    }
  }'
{
  "execution_id": "exec-abc123",
  "workflow_id": "wf-def456",
  "status": "completed",
  "result": "This appears to be a mobile-specific UI issue...",
  "duration_ms": 1250
}
{
  "nodes": [
    {
      "id": "support-agent.analyze_ticket",
      "type": "reasoner",
      "status": "completed"
    }
  ],
  "edges": []
}

Decorator Parameters

Prop

Type

Workflow tracking is always enabled for @app.reasoner in the Python SDK. There is no flag to disable it.

Common Patterns

Structured Output with Pydantic

Return type-safe, validated data using Pydantic models.

from pydantic import BaseModel

class TicketAnalysis(BaseModel):
    category: str
    priority: str
    estimated_resolution_time: int
    suggested_actions: list[str]

@app.reasoner
async def analyze_ticket_structured(ticket_text: str) -> TicketAnalysis:
    """Analyze ticket with structured output."""
    return await app.ai(
        system="Analyze support tickets systematically.",
        user=ticket_text,
        schema=TicketAnalysis  # Enforces structured output
    )

The schema parameter in app.ai() automatically instructs the LLM to format responses according to your Pydantic model. The SDK validates and returns a typed object.

Custom Paths and Tags

Organize reasoners with custom endpoints and metadata.

@app.reasoner(
    path="/tickets/analyze/v2",
    tags=["support", "ai", "v2"],
  description="Advanced ticket analysis with ML"
)
async def analyze_ticket_v2(ticket_text: str, context: dict) -> dict:
  """Enhanced analysis with additional context."""
  return await app.ai(
    user=f"Ticket: {ticket_text}\nContext: {context}"
  )

API endpoint becomes: /api/v1/execute/support-agent/tickets/analyze/v2

Workflow Tracking Behavior

Workflow tracking, DAG building, and execution context propagation are always on for reasoners. There is no decorator flag to turn this off.

Controlling Verifiable Credential Generation

Override the agent-level VC policy for specific reasoners.

# Force VC generation even if agent has it disabled
@app.reasoner(vc_enabled=True)
async def audit_critical_decision(data: dict) -> dict:
    """Critical decision that must have verifiable credentials."""
    return await app.ai(
        system="Make compliance-critical decision",
        user=str(data)
    )

# Disable VC for high-frequency, low-risk operations
@app.reasoner(vc_enabled=False)
async def quick_categorization(text: str) -> str:
    """Fast categorization without VC overhead."""
    return await app.ai(
        system="Categorize: tech, business, or other",
        user=text
    )

# Inherit from agent-level setting (default)
@app.reasoner(vc_enabled=None)
async def standard_analysis(data: dict) -> dict:
    """Uses agent's default VC policy."""
    return await app.ai(user=str(data))

VC generation hierarchy: reasoner decorator → agent node → platform default (enabled). Use vc_enabled=True for compliance-critical operations, False for high-frequency tasks, and None (default) to inherit the agent's policy.

Execution Context Access

Access workflow metadata and execution details.

from agentfield.execution_context import ExecutionContext

@app.reasoner
async def context_aware_analysis(
    ticket_text: str,
    execution_context: ExecutionContext = None
) -> dict:
    """Analysis with execution context awareness."""

    # Access workflow metadata
    workflow_id = execution_context.workflow_id if execution_context else None
    parent_id = execution_context.parent_execution_id if execution_context else None

    analysis = await app.ai(
        user=f"Analyze: {ticket_text}"
    )

    return {
        "analysis": analysis,
        "workflow_id": workflow_id,
        "parent_execution_id": parent_id
    }

The execution_context parameter is automatically injected when the reasoner is called via the Agentfield server. It's None for direct Python function calls.

Router-Based Organization

Group related reasoners with AgentRouter for better namespace management.

from agentfield.router import AgentRouter

app = Agent(node_id="support-agent")
tickets = AgentRouter(prefix="Tickets/Analysis")

@tickets.reasoner(tags=["priority"])
async def classify_priority(ticket_text: str) -> str:
    """Classify ticket priority."""
    return await app.ai(
        system="Classify as: low, medium, high, critical",
        user=ticket_text
    )

@tickets.reasoner(tags=["routing"])
async def route_to_team(ticket_text: str, priority: str) -> str:
    """Route ticket to appropriate team."""
    return await app.ai(
        user=f"Route this {priority} priority ticket: {ticket_text}"
    )

app.include_router(tickets)

API endpoints:

  • /api/v1/execute/support-agent.tickets_analysis_classify_priority
  • /api/v1/execute/support-agent.tickets_analysis_route_to_team

Cross-Agent Reasoner Calls

Call reasoners from other agents while maintaining workflow context.

@app.reasoner
async def comprehensive_analysis(ticket_text: str) -> dict:
    """Multi-agent ticket analysis."""

    # Call sentiment analyzer on different agent
    sentiment = await app.call(
        "sentiment-agent.analyze_sentiment",
        text=ticket_text
    )

    # Call priority classifier on same agent
    priority = await app.call(
        "support-agent.classify_priority",
        ticket_text=ticket_text
    )

    # Combine results with AI
    final_analysis = await app.ai(
        user=f"Combine: sentiment={sentiment}, priority={priority}",
        schema=TicketAnalysis
    )

    return final_analysis

Each app.call() creates a child execution context, building a complete workflow DAG that shows the entire multi-agent orchestration.

Auto-Generated Features

Every @app.reasoner() decorated function automatically gets:

REST API Endpoint: Accessible at /api/v1/execute/{agent_node_id}.{reasoner_name}

Workflow Tracking: Creates execution contexts, builds DAGs, tracks parent-child relationships

Pydantic Validation: Automatic conversion and validation of function arguments (FastAPI-like behavior)

DID-Based Identity: Each execution gets a unique decentralized identifier for audit trails

Execution Metadata: Automatic tracking of duration, timestamps, status, and results

API Calling Patterns

Synchronous Execution

For real-time responses (< 30 seconds).

curl -X POST http://localhost:8080/api/v1/execute/support-agent.analyze_ticket \
  -H "Content-Type: application/json" \
  -d '{
    "input": {
      "ticket_text": "Cannot reset password"
    }
  }'

Asynchronous Execution

For long-running reasoners with webhook callbacks.

curl -X POST http://localhost:8080/api/v1/execute/async/support-agent.deep_analysis \
  -H "Content-Type: application/json" \
  -d '{
    "input": {
      "ticket_text": "Complex multi-part issue..."
    },
    "webhook": {
      "url": "https://your-app.com/agentfield/callback",
      "secret": "your-webhook-secret"
    }
  }'

Response:

{
  "execution_id": "exec-abc123",
  "workflow_id": "wf-def456",
  "status": "queued",
  "webhook_registered": true
}

Server-Sent Events

Real-time workflow updates via SSE.

curl -N -H "Accept: text/event-stream" \
  http://localhost:8080/api/v1/workflows/wf-def456/events

Event stream:

event: workflow.started
data: {"workflow_id": "wf-def456", "status": "running"}

event: execution.completed
data: {"execution_id": "exec-abc123", "status": "completed", "result": {...}}

Automatic Pydantic Conversion

Reasoners automatically convert function arguments to Pydantic models when type hints are provided.

from pydantic import BaseModel, Field

class TicketInput(BaseModel):
    text: str = Field(..., min_length=10)
    category: str = Field(..., pattern="^(bug|feature|support)$")
    priority: int = Field(default=3, ge=1, le=5)

@app.reasoner
async def validate_and_analyze(ticket: TicketInput) -> dict:
    """Automatic validation via Pydantic."""
    # ticket is already validated TicketInput instance
    return await app.ai(
        user=f"Analyze {ticket.category} ticket: {ticket.text}"
    )

API call with validation:

curl -X POST http://localhost:8080/api/v1/execute/support-agent.validate_and_analyze \
  -H "Content-Type: application/json" \
  -d '{
    "input": {
      "ticket": {
        "text": "Login button broken",
        "category": "bug",
        "priority": 4
      }
    }
  }'

Invalid input returns validation error:

{
  "error": "Pydantic validation failed",
  "details": {
    "category": ["Input should be 'bug', 'feature', or 'support'"]
  }
}

Memory Integration

Access shared memory within reasoners for context persistence.

@app.reasoner
async def analyze_with_history(ticket_text: str, user_id: str) -> dict:
    """Analyze ticket with user history context."""

    # Get user's previous tickets from memory
    history = await app.memory.get(f"user_{user_id}_history", default=[])

    # Analyze with context
    analysis = await app.ai(
        system="Consider user's ticket history",
        user=f"Current: {ticket_text}\nHistory: {history}"
    )

    # Update history
    history.append({"ticket": ticket_text, "analysis": analysis})
    await app.memory.set(f"user_{user_id}_history", history)

    return analysis

Memory is automatically scoped to workflow/session/actor/global contexts. See app.memory for details.

Error Handling

Reasoners automatically track errors in workflow DAGs.

@app.reasoner
async def safe_analysis(ticket_text: str) -> dict:
    """Analysis with error handling."""
    try:
        result = await app.ai(user=ticket_text)
        return {"status": "success", "result": result}
    except Exception as e:
        # Error is automatically tracked in workflow DAG
        return {"status": "error", "message": str(e)}

Failed executions appear in workflow DAG:

{
  "execution_id": "exec-abc123",
  "status": "failed",
  "error": "LLM API rate limit exceeded",
  "duration_ms": 500
}

Performance Considerations

Direct Function Calls vs app.call():

# Same agent - use direct import (fastest)
from reasoners.priority import classify_priority

@app.reasoner
async def quick_route(ticket_text: str) -> str:
    priority = classify_priority(ticket_text)  # Direct call, no network overhead
    return await app.ai(user=f"Route {priority} ticket: {ticket_text}")

# Different agent - use app.call() (enables workflow tracking)
@app.reasoner
async def cross_agent_route(ticket_text: str) -> str:
    priority = await app.call(
        "priority-agent.classify_priority",  # Network call, full DAG tracking
        ticket_text=ticket_text
    )
    return await app.ai(user=f"Route {priority} ticket: {ticket_text}")

Workflow Tracking Overhead:

  • Enabled: ~5-10ms per reasoner call for DAG updates
  • Disabled: No overhead, but no DAG visualization or context propagation