Agent Class
Core agent initialization and configuration
The Agent class is the foundation of the Agentfield Python SDK. It inherits from FastAPI to provide HTTP endpoints while integrating with the Agentfield ecosystem for distributed AI workflows, cross-agent communication, and persistent memory.
Constructor
from agentfield import Agent, AIConfig, MemoryConfig
from agentfield.async_config import AsyncConfig
app = Agent(
node_id="my_agent",
agentfield_server="http://localhost:8080",
version="1.0.0",
ai_config=AIConfig(model="gpt-4o"),
memory_config=MemoryConfig(
auto_inject=["user_context"],
memory_retention="persistent",
cache_results=True
),
async_config=AsyncConfig(enable_async_execution=True),
dev_mode=False,
callback_url=None,
auto_register=True
)Parameters
Prop
Type
Basic Usage
Minimal Agent
from agentfield import Agent
app = Agent(node_id="simple_agent")
@app.reasoner
async def process(text: str) -> str:
return await app.ai(user=text)
if __name__ == "__main__":
app.serve(port=8001)Production Agent
from agentfield import Agent, AIConfig, MemoryConfig
from agentfield.async_config import AsyncConfig
app = Agent(
node_id="production_agent",
agentfield_server="https://agentfield.company.com",
version="2.1.0",
ai_config=AIConfig(
model="gpt-4o",
temperature=0.7,
max_tokens=2000,
timeout=30
),
memory_config=MemoryConfig(
auto_inject=["user_context", "session_data"],
memory_retention="persistent",
cache_results=True
),
async_config=AsyncConfig(
enable_async_execution=True,
max_execution_timeout=3600
),
dev_mode=False
)Key Properties
app.memory
Access the memory interface for the current execution context. Provides persistent and session-based storage automatically scoped to workflow, agent, and user context.
@app.reasoner
async def analyze_with_context(message: str, user_id: str) -> dict:
# Get user history from memory
history = await app.memory.get(f"user_{user_id}_history", default=[])
# Analyze with context
result = await app.ai(
system=f"Previous interactions: {history[-5:]}",
user=message
)
# Update history
history.append({"message": message, "result": result})
await app.memory.set(f"user_{user_id}_history", history)
return resultMemory Operations:
await app.memory.get(key, default=None)- Retrieve values with hierarchical fallbackawait app.memory.set(key, value)- Store values in the active scopeawait app.memory.delete(key)- Remove values from the active scopeawait app.memory.exists(key)- Check if a key exists in any scopeapp.memory.session(session_id)/actor(actor_id)/workflow(workflow_id)- Scoped clients with async helpers includinglist_keys()app.memory.global_scope- Global client (e.g.,await app.memory.global_scope.list_keys())
See app.memory for complete documentation.
app.reasoners
List of registered reasoner metadata. Each reasoner includes:
{
"id": "analyze_sentiment",
"input_schema": {...},
"output_schema": {...},
"memory_config": {...}
}app.skills
List of registered skill metadata. Each skill includes:
{
"id": "get_user_profile",
"input_schema": {...},
"tags": ["database", "user"]
}Core Methods
app.ai()
Primary interface for LLM interactions. Supports text, structured output, multimodal inputs, and streaming.
# Simple text generation
response = await app.ai(
system="You are a helpful assistant",
user="Explain quantum computing"
)
# Structured output with Pydantic
from pydantic import BaseModel
class Analysis(BaseModel):
sentiment: str
confidence: float
result = await app.ai(
user="I love this product!",
schema=Analysis
)See app.ai() for complete documentation.
app.call()
Execute reasoners and skills on other agents while maintaining workflow context.
@app.reasoner
async def orchestrate(ticket_text: str) -> dict:
# Call sentiment analyzer on different agent
sentiment = await app.call(
"sentiment_agent.analyze",
text=ticket_text
)
# Call priority classifier
priority = await app.call(
"priority_agent.classify",
ticket_text=ticket_text
)
return {"sentiment": sentiment, "priority": priority}See app.call() for complete documentation.
app.note()
Add debugging notes to the current execution for tracking and monitoring.
@app.reasoner
async def complex_analysis(data: str) -> dict:
app.note("Starting analysis", tags=["debug"])
result = await app.ai(user=data)
app.note(f"Analysis completed: {len(result)} items", tags=["info"])
return resultSee app.note() for complete documentation.
Lifecycle Methods
app.serve()
Start the agent server with automatic Agentfield registration and heartbeat management.
app.serve(
port=8001,
host="0.0.0.0",
dev=False,
heartbeat_interval=2,
auto_port=False
)Parameters:
Prop
Type
Example:
if __name__ == "__main__":
app.serve(
port=8001,
dev=True,
auto_port=True
)app.handle_serverless()
Universal serverless handler for AWS Lambda, Google Cloud Functions, and other serverless platforms.
app = Agent(node_id="serverless_agent", auto_register=False)
def adapter(event: dict) -> dict:
body = event.get("body")
if isinstance(body, str):
import json
try:
body = json.loads(body)
except json.JSONDecodeError:
body = {}
return {
"path": event.get("rawPath") or event.get("path") or "/execute",
"headers": event.get("headers") or {},
"target": event.get("target") or event.get("reasoner"),
"input": (body or {}).get("input") or body or event.get("input", {}),
"executionContext": event.get("executionContext") or event.get("execution_context"),
}
@app.reasoner
async def process_event(data: dict) -> dict:
return await app.ai(user=f"Process: {data}")
# AWS Lambda handler
def lambda_handler(event, context):
return app.handle_serverless(event, adapter=adapter)Event Format:
{
"path": "/discover", # or "/execute"
"action": "discover", # or "execute"
"reasoner": "process_event", # for execution
"input": {"data": "..."}, # for execution
"execution_context": {...} # optional
}Discovery Response:
{
"node_id": "serverless_agent",
"version": "1.0.0",
"deployment_type": "serverless",
"reasoners": [...],
"skills": [...]
}Local CLI Testing
While building your agent, you can test skills and reasoners locally using CLI commands without needing to run the control plane:
# Execute functions directly
python main.py call say_hello --name Alice
# List all available functions
python main.py list
# Interactive shell
python main.py shellCLI testing runs locally without the control plane - no workflow tracking, DIDs, or observability. Perfect for rapid development and debugging.
See Local CLI Testing for complete documentation.
Deployment Patterns
from agentfield import Agent
app = Agent(
node_id="dev_agent",
agentfield_server="http://localhost:8080",
dev_mode=True
)
@app.reasoner
async def test_reasoner(text: str) -> str:
return await app.ai(user=text)
if __name__ == "__main__":
app.serve(
port=8001,
dev=True
)FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
ENV AGENTFIELD_SERVER=http://af-server:8080
ENV AGENT_PORT=8001
CMD ["python", "main.py"]# main.py
import os
from agentfield import Agent
app = Agent(
node_id="docker_agent",
agentfield_server=os.getenv("AGENTFIELD_SERVER"),
callback_url=os.getenv("AGENT_CALLBACK_URL")
)
if __name__ == "__main__":
app.serve(
port=int(os.getenv("AGENT_PORT", 8001)),
host="0.0.0.0"
)Docker Networking Requirement
When the control plane runs in Docker but the agent runs on the host:
export AGENT_CALLBACK_URL=http://host.docker.internal:8001See Docker Deployment Guide for details.
apiVersion: apps/v1
kind: Deployment
metadata:
name: agentfield-agent
spec:
replicas: 3
selector:
matchLabels:
app: agentfield-agent
template:
metadata:
labels:
app: agentfield-agent
spec:
containers:
- name: agent
image: your-registry/agentfield-agent:latest
env:
- name: AGENTFIELD_SERVER
value: "http://af-server:8080"
- name: AGENT_CALLBACK_URL
value: "http://agentfield-agent:8001"
ports:
- containerPort: 8001# AWS Lambda
from agentfield import Agent
app = Agent(
node_id="lambda_agent",
auto_register=False
)
@app.reasoner
async def process(data: dict) -> dict:
return await app.ai(user=str(data))
def lambda_handler(event, context):
return app.handle_serverless(event)# serverless.yml
service: agentfield-agent
provider:
name: aws
runtime: python3.11
environment:
AGENTFIELD_SERVER: https://agentfield.company.com
functions:
agent:
handler: handler.lambda_handler
timeout: 300
memorySize: 1024Configuration Examples
AI Configuration
from agentfield import Agent, AIConfig
app = Agent(
node_id="ai_agent",
ai_config=AIConfig(
model="gpt-4o",
temperature=0.7,
max_tokens=2000,
vision_model="dall-e-3",
audio_model="tts-1-hd",
timeout=60,
retry_attempts=3
)
)See Configuration for all options.
Memory Configuration
from agentfield import Agent, MemoryConfig
app = Agent(
node_id="memory_agent",
memory_config=MemoryConfig(
auto_inject=["user_context", "conversation_history"],
memory_retention="persistent",
cache_results=True
)
)Async Execution
from agentfield import Agent
from agentfield.async_config import AsyncConfig
app = Agent(
node_id="async_agent",
async_config=AsyncConfig(
enable_async_execution=True,
max_execution_timeout=3600,
polling_timeout=20,
enable_batch_polling=True
)
)Integration Features
Automatic Workflow Tracking
Every reasoner and skill call automatically builds workflow DAGs showing execution flow, parent-child relationships, and timing information.
@app.reasoner
async def parent_task(data: str) -> dict:
# This creates a root execution node
result1 = await app.call("agent1.process", data=data)
# This creates a child execution node
result2 = await app.call("agent2.analyze", result=result1)
return {"result1": result1, "result2": result2}See Workflow API for details.
Execution Context
Access workflow metadata and execution details within reasoners and skills.
from agentfield.execution_context import ExecutionContext
@app.reasoner
async def context_aware(
data: str,
execution_context: ExecutionContext = None
) -> dict:
if execution_context:
workflow_id = execution_context.workflow_id
parent_id = execution_context.parent_execution_id
else:
workflow_id = None
parent_id = None
return {
"data": data,
"workflow_id": workflow_id,
"parent_id": parent_id
}Related
- Python SDK Overview - Getting started guide
- @app.reasoner - Define AI-powered functions
- @app.skill() - Create deterministic skills
- Configuration - AI, Memory, and Async settings
- Workflow API - Execution tracking and DAG building