Agent Class

Core agent initialization and configuration

The Agent class is the foundation of the Agentfield Python SDK. It inherits from FastAPI to provide HTTP endpoints while integrating with the Agentfield ecosystem for distributed AI workflows, cross-agent communication, and persistent memory.

Constructor

from agentfield import Agent, AIConfig, MemoryConfig
from agentfield.async_config import AsyncConfig

app = Agent(
    node_id="my_agent",
    agentfield_server="http://localhost:8080",
    version="1.0.0",
    ai_config=AIConfig(model="gpt-4o"),
    memory_config=MemoryConfig(
        auto_inject=["user_context"],
        memory_retention="persistent",
        cache_results=True
    ),
    async_config=AsyncConfig(enable_async_execution=True),
    dev_mode=False,
    callback_url=None,
    auto_register=True
)

Parameters

Prop

Type

Basic Usage

Minimal Agent

from agentfield import Agent

app = Agent(node_id="simple_agent")

@app.reasoner
async def process(text: str) -> str:
    return await app.ai(user=text)

if __name__ == "__main__":
    app.serve(port=8001)

Production Agent

from agentfield import Agent, AIConfig, MemoryConfig
from agentfield.async_config import AsyncConfig

app = Agent(
    node_id="production_agent",
    agentfield_server="https://agentfield.company.com",
    version="2.1.0",
    ai_config=AIConfig(
        model="gpt-4o",
        temperature=0.7,
        max_tokens=2000,
        timeout=30
    ),
    memory_config=MemoryConfig(
        auto_inject=["user_context", "session_data"],
        memory_retention="persistent",
        cache_results=True
    ),
    async_config=AsyncConfig(
        enable_async_execution=True,
        max_execution_timeout=3600
    ),
    dev_mode=False
)

Key Properties

app.memory

Access the memory interface for the current execution context. Provides persistent and session-based storage automatically scoped to workflow, agent, and user context.

@app.reasoner
async def analyze_with_context(message: str, user_id: str) -> dict:
    # Get user history from memory
    history = await app.memory.get(f"user_{user_id}_history", default=[])

    # Analyze with context
    result = await app.ai(
        system=f"Previous interactions: {history[-5:]}",
        user=message
    )

    # Update history
    history.append({"message": message, "result": result})
    await app.memory.set(f"user_{user_id}_history", history)

    return result

Memory Operations:

  • await app.memory.get(key, default=None) - Retrieve values with hierarchical fallback
  • await app.memory.set(key, value) - Store values in the active scope
  • await app.memory.delete(key) - Remove values from the active scope
  • await app.memory.exists(key) - Check if a key exists in any scope
  • app.memory.session(session_id) / actor(actor_id) / workflow(workflow_id) - Scoped clients with async helpers including list_keys()
  • app.memory.global_scope - Global client (e.g., await app.memory.global_scope.list_keys())

See app.memory for complete documentation.

app.reasoners

List of registered reasoner metadata. Each reasoner includes:

{
    "id": "analyze_sentiment",
    "input_schema": {...},
    "output_schema": {...},
    "memory_config": {...}
}

app.skills

List of registered skill metadata. Each skill includes:

{
    "id": "get_user_profile",
    "input_schema": {...},
    "tags": ["database", "user"]
}

Core Methods

app.ai()

Primary interface for LLM interactions. Supports text, structured output, multimodal inputs, and streaming.

# Simple text generation
response = await app.ai(
    system="You are a helpful assistant",
    user="Explain quantum computing"
)

# Structured output with Pydantic
from pydantic import BaseModel

class Analysis(BaseModel):
    sentiment: str
    confidence: float

result = await app.ai(
    user="I love this product!",
    schema=Analysis
)

See app.ai() for complete documentation.

app.call()

Execute reasoners and skills on other agents while maintaining workflow context.

@app.reasoner
async def orchestrate(ticket_text: str) -> dict:
    # Call sentiment analyzer on different agent
    sentiment = await app.call(
        "sentiment_agent.analyze",
        text=ticket_text
    )

    # Call priority classifier
    priority = await app.call(
        "priority_agent.classify",
        ticket_text=ticket_text
    )

    return {"sentiment": sentiment, "priority": priority}

See app.call() for complete documentation.

app.note()

Add debugging notes to the current execution for tracking and monitoring.

@app.reasoner
async def complex_analysis(data: str) -> dict:
    app.note("Starting analysis", tags=["debug"])

    result = await app.ai(user=data)

    app.note(f"Analysis completed: {len(result)} items", tags=["info"])

    return result

See app.note() for complete documentation.

Lifecycle Methods

app.serve()

Start the agent server with automatic Agentfield registration and heartbeat management.

app.serve(
    port=8001,
    host="0.0.0.0",
    dev=False,
    heartbeat_interval=2,
    auto_port=False
)

Parameters:

Prop

Type

Example:

if __name__ == "__main__":
    app.serve(
        port=8001,
        dev=True,
        auto_port=True
    )

app.handle_serverless()

Universal serverless handler for AWS Lambda, Google Cloud Functions, and other serverless platforms.

app = Agent(node_id="serverless_agent", auto_register=False)

def adapter(event: dict) -> dict:
    body = event.get("body")
    if isinstance(body, str):
        import json
        try:
            body = json.loads(body)
        except json.JSONDecodeError:
            body = {}
    return {
        "path": event.get("rawPath") or event.get("path") or "/execute",
        "headers": event.get("headers") or {},
        "target": event.get("target") or event.get("reasoner"),
        "input": (body or {}).get("input") or body or event.get("input", {}),
        "executionContext": event.get("executionContext") or event.get("execution_context"),
    }

@app.reasoner
async def process_event(data: dict) -> dict:
    return await app.ai(user=f"Process: {data}")

# AWS Lambda handler
def lambda_handler(event, context):
    return app.handle_serverless(event, adapter=adapter)

Event Format:

{
    "path": "/discover",  # or "/execute"
    "action": "discover",  # or "execute"
    "reasoner": "process_event",  # for execution
    "input": {"data": "..."},  # for execution
    "execution_context": {...}  # optional
}

Discovery Response:

{
    "node_id": "serverless_agent",
    "version": "1.0.0",
    "deployment_type": "serverless",
    "reasoners": [...],
    "skills": [...]
}

Local CLI Testing

While building your agent, you can test skills and reasoners locally using CLI commands without needing to run the control plane:

# Execute functions directly
python main.py call say_hello --name Alice

# List all available functions
python main.py list

# Interactive shell
python main.py shell

CLI testing runs locally without the control plane - no workflow tracking, DIDs, or observability. Perfect for rapid development and debugging.

See Local CLI Testing for complete documentation.

Deployment Patterns

from agentfield import Agent

app = Agent(
    node_id="dev_agent",
    agentfield_server="http://localhost:8080",
    dev_mode=True
)

@app.reasoner
async def test_reasoner(text: str) -> str:
    return await app.ai(user=text)

if __name__ == "__main__":
    app.serve(
        port=8001,
        dev=True
    )
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

ENV AGENTFIELD_SERVER=http://af-server:8080
ENV AGENT_PORT=8001

CMD ["python", "main.py"]
# main.py
import os
from agentfield import Agent

app = Agent(
    node_id="docker_agent",
    agentfield_server=os.getenv("AGENTFIELD_SERVER"),
    callback_url=os.getenv("AGENT_CALLBACK_URL")
)

if __name__ == "__main__":
    app.serve(
        port=int(os.getenv("AGENT_PORT", 8001)),
        host="0.0.0.0"
    )

Docker Networking Requirement

When the control plane runs in Docker but the agent runs on the host:

export AGENT_CALLBACK_URL=http://host.docker.internal:8001

See Docker Deployment Guide for details.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: agentfield-agent
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agentfield-agent
  template:
    metadata:
      labels:
        app: agentfield-agent
    spec:
      containers:
      - name: agent
        image: your-registry/agentfield-agent:latest
        env:
        - name: AGENTFIELD_SERVER
          value: "http://af-server:8080"
        - name: AGENT_CALLBACK_URL
          value: "http://agentfield-agent:8001"
        ports:
        - containerPort: 8001
# AWS Lambda
from agentfield import Agent

app = Agent(
    node_id="lambda_agent",
    auto_register=False
)

@app.reasoner
async def process(data: dict) -> dict:
    return await app.ai(user=str(data))

def lambda_handler(event, context):
    return app.handle_serverless(event)
# serverless.yml
service: agentfield-agent

provider:
  name: aws
  runtime: python3.11
  environment:
    AGENTFIELD_SERVER: https://agentfield.company.com

functions:
  agent:
    handler: handler.lambda_handler
    timeout: 300
    memorySize: 1024

Configuration Examples

AI Configuration

from agentfield import Agent, AIConfig

app = Agent(
    node_id="ai_agent",
    ai_config=AIConfig(
        model="gpt-4o",
        temperature=0.7,
        max_tokens=2000,
        vision_model="dall-e-3",
        audio_model="tts-1-hd",
        timeout=60,
        retry_attempts=3
    )
)

See Configuration for all options.

Memory Configuration

from agentfield import Agent, MemoryConfig

app = Agent(
    node_id="memory_agent",
    memory_config=MemoryConfig(
        auto_inject=["user_context", "conversation_history"],
        memory_retention="persistent",
        cache_results=True
    )
)

Async Execution

from agentfield import Agent
from agentfield.async_config import AsyncConfig

app = Agent(
    node_id="async_agent",
    async_config=AsyncConfig(
        enable_async_execution=True,
        max_execution_timeout=3600,
        polling_timeout=20,
        enable_batch_polling=True
    )
)

Integration Features

Automatic Workflow Tracking

Every reasoner and skill call automatically builds workflow DAGs showing execution flow, parent-child relationships, and timing information.

@app.reasoner
async def parent_task(data: str) -> dict:
    # This creates a root execution node
    result1 = await app.call("agent1.process", data=data)

    # This creates a child execution node
    result2 = await app.call("agent2.analyze", result=result1)

    return {"result1": result1, "result2": result2}

See Workflow API for details.

Execution Context

Access workflow metadata and execution details within reasoners and skills.

from agentfield.execution_context import ExecutionContext

@app.reasoner
async def context_aware(
    data: str,
    execution_context: ExecutionContext = None
) -> dict:
    if execution_context:
        workflow_id = execution_context.workflow_id
        parent_id = execution_context.parent_execution_id
    else:
        workflow_id = None
        parent_id = None

    return {
        "data": data,
        "workflow_id": workflow_id,
        "parent_id": parent_id
    }