Agent Node
Core component for creating distributed AI agent nodes in the Agentfield ecosystem
Agent Node
Core component for creating distributed AI agent nodes in the Agentfield ecosystem
The foundational class for creating AI agent nodes that connect to Agentfield's distributed infrastructure. Inherits from FastAPI to provide HTTP endpoints while integrating with Agentfield's execution gateway, memory system, and workflow tracking.
What It Is
Agent is a FastAPI subclass that transforms Python functions into distributed, API-accessible AI services. When you create an Agent instance, you get:
- Automatic REST API - Every reasoner and skill becomes an HTTP endpoint
- Agentfield Server Integration - Service discovery, routing, and workflow orchestration
- Execution Context - Automatic workflow DAG building and parent-child tracking
- Memory Interface - Scoped persistent storage across distributed agents
- MCP Integration - Dynamic skill generation from Model Context Protocol servers
- Identity System - DID-based agent identity with verifiable credentials
Agentfield routes all app.call() requests through its execution gateway for
proper workflow tracking, even when calling functions on the same agent. This
ensures complete observability and DAG construction.
Basic Example
from agentfield import Agent
from pydantic import BaseModel
# Initialize agent
app = Agent(
node_id="sentiment_analyzer",
agentfield_server="http://localhost:8080"
)
class SentimentResult(BaseModel):
sentiment: str
confidence: float
keywords: list[str]
# Define AI-powered reasoner
@app.reasoner()
async def analyze_sentiment(text: str) -> SentimentResult:
"""Analyze text sentiment using LLM."""
return await app.ai(
system="Analyze sentiment and extract keywords.",
user=text,
schema=SentimentResult
)
# Define deterministic skill
@app.skill(tags=["utility"])
def format_result(sentiment: str, confidence: float) -> str:
"""Format sentiment analysis result."""
return f"Sentiment: {sentiment} ({confidence:.0%} confidence)"
# Start server
if __name__ == "__main__":
app.serve(port=8001)curl -X POST http://localhost:8080/api/v1/execute/sentiment_analyzer.analyze_sentiment \
-H "Content-Type: application/json" \
-d '{
"input": {
"text": "I love this product! It works amazingly well."
}
}'{
"execution_id": "exec-abc123",
"workflow_id": "wf-def456",
"status": "completed",
"result": {
"sentiment": "positive",
"confidence": 0.95,
"keywords": ["love", "product", "amazingly", "well"]
},
"duration_ms": 850,
"timestamp": "2024-07-08T18:20:05Z"
}{
"workflow_dag": {
"nodes": [
{
"id": "sentiment_analyzer.analyze_sentiment",
"type": "reasoner",
"status": "completed",
"duration_ms": 850
}
],
"edges": []
}
}Initialization Parameters
Constructor Parameters
Prop
Type
# Basic initialization
app = Agent(node_id="customer_support")
# Full configuration
app = Agent(
node_id="my_agent",
agentfield_server="https://agentfield.company.com",
version="2.1.0",
ai_config=AIConfig(model="gpt-4o", temperature=0.7),
memory_config=MemoryConfig(memory_retention="persistent"),
dev_mode=True,
callback_url="http://my-agent.company.com:8001"
)AIConfig Fields
Prop
Type
MemoryConfig Fields
Prop
Type
AsyncConfig Fields
Prop
Type
Core Methods
app.ai()
Primary interface for LLM interactions with multimodal support.
async def ai(
*args: Any,
system: Optional[str] = None,
user: Optional[str] = None,
schema: Optional[Type[BaseModel]] = None,
model: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
stream: Optional[bool] = None,
response_format: Optional[str] = None,
context: Optional[Dict] = None,
memory_scope: Optional[List[str]] = None,
**kwargs
) -> Any# Simple text prompt
response = await app.ai("What is the capital of France?")
# System + user pattern
response = await app.ai(
system="You are a geography expert.",
user="What is the capital of France?"
)from pydantic import BaseModel
class CapitalInfo(BaseModel):
city: str
country: str
population: int
info = await app.ai(
"Provide details about the capital of France.",
schema=CapitalInfo
)
# Returns: CapitalInfo(city="Paris", country="France", population=2161000)# Analyze image from URL
analysis = await app.ai(
"Describe this image in detail.",
"https://example.com/image.jpg"
)
# Mix text, images, and audio
response = await app.ai(
"Compare the audio description with the visual content",
"./product_review.wav",
"https://example.com/product.jpg",
"Additional context: Premium product line."
)# Generate audio with default settings
audio_result = await app.ai_with_audio("Say hello warmly")
audio_result.audio.save("greeting.wav")
# Custom voice and format
audio_result = await app.ai_with_audio(
"Explain quantum computing",
voice="nova",
format="mp3"
)
audio_result.audio.play()# Generate image
image_result = await app.ai_with_vision("A sunset over mountains")
image_result.images[0].save("sunset.png")
# High-quality image
image_result = await app.ai_with_vision(
"Futuristic cityscape with flying cars",
size="1792x1024",
quality="hd",
style="vivid"
)Returns: MultimodalResponse object with text, audio, and image access methods.
app.call()
Execute reasoners or skills on other agents via Agentfield's execution gateway.
async def call(
target: str,
*args,
**kwargs
) -> dictParameters:
target- Format:"node_id.function_name"(e.g.,"sentiment_analyzer.analyze_sentiment")*args- Positional arguments (auto-mapped to function parameters)**kwargs- Keyword arguments
@app.reasoner()
async def process_customer_feedback(customer_id: int, feedback: str) -> dict:
# Call sentiment analyzer agent
sentiment = await app.call(
"sentiment_analyzer.analyze_sentiment",
text=feedback
)
# Call risk analyzer with sentiment data
risk = await app.call(
"risk_analyzer.assess_risk",
customer_id=customer_id,
sentiment=sentiment.get("sentiment")
)
return {"sentiment": sentiment, "risk": risk}All app.call() requests route through Agentfield server for proper workflow
tracking and DAG construction, even when calling functions on the same agent.
Returns: dict - Always returns JSON/dict objects (no automatic schema conversion)
app.memory
Access to scoped persistent memory system.
@property
def memory -> Optional[MemoryInterface]@app.reasoner()
async def process_user_request(user_id: int, request: str) -> dict:
# Store in workflow-scoped memory
await app.memory.set("current_request", request)
# Get from memory with default
user_prefs = await app.memory.get(
f"user_{user_id}_preferences",
default={"theme": "light"}
)
# Check existence
if await app.memory.exists(f"user_{user_id}_history"):
history = await app.memory.get(f"user_{user_id}_history")
return {"processed": True}Memory Scopes:
app.memory- Automatic hierarchical scoping (workflow → session → actor → global)app.memory.session(session_id)- Session-scoped memoryapp.memory.actor(actor_id)- Actor-scoped memoryapp.memory.global_scope- Global memory
app.note()
Add execution notes for debugging and tracking.
def note(
message: str,
tags: List[str] = None
) -> None@app.reasoner()
async def process_data(data: str) -> dict:
app.note("Starting data processing", ["debug", "processing"])
result = await some_processing(data)
app.note(f"Processing completed with {len(result)} items", ["info"])
return resultNotes are fire-and-forget and run asynchronously without blocking execution.
Decorators
@app.reasoner()
Register AI-powered functions that use LLMs for complex reasoning.
def reasoner(
path: Optional[str] = None,
name: Optional[str] = None
) -> CallableParameters:
path- Custom API endpoint path (default:/reasoners/{function_name})name- Explicit Agentfield registration ID (default: function name)
from pydantic import BaseModel
class AnalysisResult(BaseModel):
summary: str
key_findings: list[str]
confidence: float
@app.reasoner()
async def analyze_document(document: str, focus: str = "general") -> AnalysisResult:
"""Analyze document with AI."""
return await app.ai(
system=f"Analyze this document focusing on: {focus}",
user=document,
schema=AnalysisResult
)Auto-Generated Features:
- REST API endpoint:
POST /reasoners/analyze_document - Input/output schema validation
- Execution context injection
- Workflow DAG tracking
- Verifiable credential generation
Learn more about @app.reasoner() →
@app.skill()
Register deterministic functions for business logic and integrations.
def skill(
tags: Optional[List[str]] = None,
path: Optional[str] = None,
name: Optional[str] = None
) -> CallableParameters:
tags- List of tags for organization (e.g.,["database", "user"])path- Custom API endpoint path (default:/skills/{function_name})name- Explicit Agentfield registration ID (default: function name)
@app.skill(tags=["database", "user"])
def get_user_profile(user_id: int) -> dict:
"""Retrieve user profile from database."""
user = database.get_user(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
return {
"id": user.id,
"name": user.name,
"email": user.email,
"created_at": user.created_at.isoformat()
}Auto-Generated Features:
- REST API endpoint:
POST /skills/get_user_profile - Input schema validation
- Execution context injection
- Workflow DAG tracking
Learn more about @app.skill() →
@app.memory.on_change()
Subscribe to memory change events for reactive programming.
def on_change(
pattern: Union[str, List[str]]
) -> Callable# Single pattern
@app.memory.on_change("customer_*")
async def handle_customer_changes(event):
customer_id = event.key.replace("customer_", "")
if event.action == "set":
await notify_customer_team(customer_id, event.data)
# Multiple patterns
@app.memory.on_change(["order_*.status", "order_*.payment"])
async def handle_order_updates(event):
order_id = event.key.split('_')[1].split('.')[0]
await process_order_update(order_id, event)Learn more about memory events →
Router System
Organize related reasoners and skills with AgentRouter.
from agentfield.router import AgentRouter
app = Agent(node_id="user_agent")
# Create router with prefix
profile = AgentRouter(prefix="Users/Profile-v1")
@profile.reasoner()
async def get_profile(user_id: str) -> dict:
return {"user_id": user_id}
@profile.skill()
def update_preferences(user_id: str, preferences: dict) -> dict:
return {"user_id": user_id, "preferences": preferences}
# Include router (prefix becomes: users_profile_v1_)
app.include_router(profile)
# Registered IDs:
# - users_profile_v1_get_profile
# - users_profile_v1_update_preferencesPrefix Translation:
"Billing"→billing_"Support/Inbox"→support_inbox_"API/v2/Users"→api_v2_users_
Learn more about AgentRouter →
Server Lifecycle
serve()
Start the agent server with Agentfield integration.
def serve(
port: Optional[int] = None,
host: str = "0.0.0.0",
dev: bool = False,
heartbeat_interval: int = 2,
auto_port: bool = False,
**kwargs
) -> NoneParameters:
port- Server port (auto-discovers if None)host- Host address (default: "0.0.0.0")dev- Enable development mode with auto-reloadheartbeat_interval- Heartbeat frequency in seconds (default: 2)auto_port- Auto-find available port (default: False)**kwargs- Additional uvicorn parameters
# Basic server
app.serve()
# Development server
app.serve(
port=8001,
dev=True
)
# Production server
app.serve(
port=8080,
host="0.0.0.0"
)Server Lifecycle:
- Initialize routes and handlers
- Start MCP servers (if configured)
- Register with Agentfield server
- Start heartbeat loop
- Ready to receive requests
- Graceful shutdown on SIGINT/SIGTERM
Auto-Generated Endpoints:
POST /reasoners/{name}- Execute reasonerPOST /skills/{name}- Execute skillGET /health- Health checkGET /mcp/status- MCP server statusGET /docs- Swagger UIGET /redoc- ReDoc documentation
Auto-Generated Features
When you create an Agent instance, Agentfield automatically provides:
REST API Endpoints
Every reasoner and skill becomes a REST endpoint:
# Synchronous execution
POST /api/v1/execute/{node_id}.{function_name}
# Asynchronous execution
POST /api/v1/execute/async/{node_id}.{function_name}
# Execution status
GET /api/v1/execute/status/{execution_id}Workflow DAG Tracking
All executions are tracked in a Directed Acyclic Graph:
{
"workflow_dag": {
"nodes": [
{
"id": "agent.reasoner_name",
"type": "reasoner",
"status": "completed",
"duration_ms": 850
}
],
"edges": [
{
"from": "parent_agent.parent_reasoner",
"to": "agent.reasoner_name"
}
]
}
}DID Registration
Agents automatically receive Decentralized Identifiers:
# Access agent DID
agent_did = app.did_manager.get_agent_did()
# Returns: "did:agentfield:agent:sentiment_analyzer"
# DIDs are used for:
# - Agent identity and authentication
# - Verifiable credential generation
# - Audit trails and complianceMCP Skill Discovery Beta
MCP servers are automatically discovered and their tools become skills:
# MCP server tools automatically become callable skills
result = await app.call(
"my_agent.github_api_create_issue",
owner="myorg",
repo="myrepo",
title="Bug report",
body="Description"
)Execution Context Propagation
Every request automatically includes execution context:
@app.reasoner()
async def my_reasoner(data: str, execution_context: ExecutionContext = None) -> dict:
if execution_context:
print(f"Workflow ID: {execution_context.workflow_id}")
print(f"Session ID: {execution_context.session_id}")
print(f"Parent Execution: {execution_context.parent_execution_id}")
return {"processed": True}Production Patterns
Multi-Agent Orchestration
from agentfield import Agent
from pydantic import BaseModel
app = Agent(node_id="orchestrator")
class ProcessingResult(BaseModel):
user_id: int
sentiment_score: float
risk_level: str
actions_taken: list[str]
@app.reasoner()
async def process_user_feedback(user_id: int, feedback: str) -> ProcessingResult:
"""Orchestrate multi-agent workflow."""
# Step 1: Analyze sentiment
sentiment = await app.call(
"sentiment_analyzer.analyze_sentiment",
text=feedback
)
# Step 2: Assess risk
risk = await app.call(
"risk_analyzer.assess_user_risk",
user_id=user_id,
sentiment=sentiment.get("sentiment")
)
# Step 3: Generate recommendations
recommendations = await app.call(
"recommendation_engine.generate_recommendations",
user_id=user_id,
sentiment_data=sentiment,
risk_data=risk
)
# Step 4: Take automated actions
actions = []
if risk.get("risk_level") == "high":
notification = await app.call(
"notification_agent.send_alert",
type="high_risk_user",
user_id=user_id
)
actions.append(f"Alert sent: {notification.get('alert_id')}")
return ProcessingResult(
user_id=user_id,
sentiment_score=sentiment.get("confidence"),
risk_level=risk.get("risk_level"),
actions_taken=actions
)Error Handling
@app.reasoner()
async def resilient_workflow(user_id: int) -> dict:
"""Build resilient workflows with proper error handling."""
results = {"user_id": user_id, "steps_completed": []}
try:
# Critical step - must succeed
user_data = await app.call(
"user_service.get_user_data",
user_id=user_id
)
results["steps_completed"].append("user_data_retrieved")
# Optional enhancement - can fail gracefully
try:
enrichment = await app.call(
"enrichment_service.enrich_user_data",
user_data=user_data
)
results["enriched_data"] = enrichment
results["steps_completed"].append("data_enriched")
except Exception as e:
results["enrichment_error"] = str(e)
# Continue with basic data
# Final processing
final_result = await app.call(
"processor.finalize_processing",
user_data=user_data,
enriched_data=results.get("enriched_data")
)
results["final_result"] = final_result
results["steps_completed"].append("processing_completed")
except Exception as e:
results["error"] = str(e)
results["status"] = "failed"
return resultsParallel Execution
@app.reasoner()
async def parallel_processing(items: list[dict]) -> dict:
"""Process items in parallel for better performance."""
import asyncio
# Process items in parallel
tasks = [
app.call("item_processor.process_item", item=item)
for item in items
]
# Wait for all to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Separate successes and failures
successful = [r for r in results if not isinstance(r, Exception)]
failed = [str(r) for r in results if isinstance(r, Exception)]
# Aggregate results
summary = await app.call(
"aggregator.summarize_results",
successful_count=len(successful),
failed_count=len(failed),
results=successful
)
return {
"summary": summary,
"successful_items": len(successful),
"failed_items": len(failed)
}Related
- app.ai() - LLM Interface
- app.call() - Cross-Agent Communication
- app.memory - Memory System
- @app.reasoner() - AI Functions
- @app.skill() - Deterministic Functions
- AgentRouter - Function Organization
- Execution Context - Workflow Tracking
- Configuration - AIConfig & MemoryConfig
- REST API - HTTP Endpoints
- Async Execution - Long-Running Tasks