Human-in-the-Loop Approvals

Pause agent execution for human review and resume with approval decisions

Human-in-the-Loop Approvals

Pause agent execution for human review and resume with approval decisions

Some decisions shouldn't be made by an AI alone. A transaction over $50,000. A contract modification. A customer refund that falls outside policy. These are moments where you want a human in the loop, not as a bottleneck, but as a deliberate checkpoint.

Agentfield makes this a first-class primitive. An agent can pause mid-execution, hand off to a human reviewer, and resume exactly where it left off once a decision comes back. The control plane holds the execution in a waiting state. No polling loops in your code. No lost context. No re-running work that already completed.

This example walks through a financial transaction reviewer that auto-approves low-risk transactions and escalates high-risk ones to a human.

The Scenario

A payment processing agent receives incoming transactions. It runs a risk assessment using an LLM, then routes based on the score:

  • Score below 30: Auto-approved, no human needed.
  • Score 30 or above: Flagged for human review. The agent pauses, a reviewer gets a link to the approval dashboard, and the agent resumes once they decide.

Agent receives the transaction

The reasoner is invoked with the transaction details. This is a standard Agentfield execution, nothing special yet.

AI risk assessment runs

The agent calls an LLM with the transaction data and gets back a structured risk score. This happens synchronously within the execution.

Routing decision

Low-risk transactions return immediately. High-risk ones proceed to the pause step.

Execution pauses for human review

The agent calls pause() (Python) or RequestApproval + WaitForApproval (Go/TypeScript). The control plane transitions the execution to waiting. The agent process blocks, but no compute is wasted, it's just waiting on a future.

Human reviews and decides

A reviewer opens the approval URL, reads the transaction details, and clicks Approve or Reject. The control plane receives the webhook callback and resolves the waiting execution.

Agent resumes and completes

The pause() call returns an ApprovalResult. The agent reads the decision and feedback, then returns its final output.

Full Example

from dataclasses import dataclass
from agentfield import Agent
from agentfield.client import ApprovalResult

app = Agent(node_id="transaction-reviewer")


@dataclass
class RiskAssessment:
    score: int          # 0-100, higher means riskier
    reasoning: str
    flags: list[str]


@dataclass
class TransactionInput:
    transaction_id: str
    amount: float
    merchant: str
    account_id: str


@app.reasoner()
async def review_transaction(
    transaction_id: str,
    amount: float,
    merchant: str,
    account_id: str,
) -> dict:
    """Review a transaction with automatic and human approval paths."""

    # Step 1: AI risk assessment
    risk: RiskAssessment = await app.ai(
        system=(
            "You are a fraud detection system. "
            "Assess the risk of this transaction on a scale of 0-100. "
            "Consider amount, merchant category, and typical patterns. "
            "Return a structured risk assessment."
        ),
        user=f"Transaction: ${amount:.2f} at '{merchant}' for account {account_id}",
        schema=RiskAssessment,
    )

    app.note(
        f"Risk assessment complete: score {risk.score}/100 for ${amount:.2f} at {merchant}",
        tags=["risk-assessment"],
        metadata={"score": risk.score, "flags": risk.flags},
    )

    # Step 2: Auto-approve low-risk transactions
    if risk.score < 30:
        app.note(
            f"Auto-approved: risk score {risk.score} is below threshold",
            tags=["approved", "auto"],
        )
        return {
            "decision": "approved",
            "method": "automatic",
            "risk_score": risk.score,
            "transaction_id": transaction_id,
        }

    # Step 3: High risk — pause for human review
    app.note(
        f"High-risk transaction flagged for human review (score: {risk.score})",
        tags=["approval", "waiting"],
        metadata={"risk_score": risk.score, "reasoning": risk.reasoning},
    )

    result: ApprovalResult = await app.pause(
        approval_request_id=f"txn-{transaction_id}",
        approval_request_url=f"https://dashboard.example.com/review/{transaction_id}",
        expires_in_hours=4,
    )

    # Step 4: Handle the human's decision
    if result.approved:
        app.note(
            "Transaction approved by human reviewer",
            tags=["approved", "human"],
            metadata={"feedback": result.feedback},
        )
        return {
            "decision": "approved",
            "method": "human",
            "risk_score": risk.score,
            "transaction_id": transaction_id,
            "reviewer_feedback": result.feedback,
        }

    if result.changes_requested:
        app.note(
            "Reviewer requested changes before approval",
            tags=["changes-requested"],
        )
        return {
            "decision": "needs_modification",
            "transaction_id": transaction_id,
            "feedback": result.feedback,
        }

    # Rejected or expired
    app.note(
        f"Transaction not approved: {result.decision}",
        tags=["rejected"],
        metadata={"reason": result.feedback},
    )
    return {
        "decision": result.decision,
        "transaction_id": transaction_id,
        "reason": result.feedback,
    }


if __name__ == "__main__":
    app.serve()
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    agentfieldagent "github.com/Agent-Field/agentfield/sdk/go/agent"
    "github.com/Agent-Field/agentfield/sdk/go/client"
)

func main() {
    a, err := agentfieldagent.New(agentfieldagent.Config{
        NodeID:        "transaction-reviewer",
        AgentFieldURL: "http://localhost:8080",
    })
    if err != nil {
        log.Fatal(err)
    }

    a.RegisterSkill("review_transaction", reviewTransaction(a))

    if err := a.Run(context.Background()); err != nil {
        log.Fatal(err)
    }
}

func reviewTransaction(a *agentfieldagent.Agent) agentfieldagent.HandlerFunc {
    return func(ctx context.Context, input map[string]any) (any, error) {
        txnID, _ := input["transaction_id"].(string)
        amount, _ := input["amount"].(float64)
        merchant, _ := input["merchant"].(string)

        execCtx := agentfieldagent.ExecutionContextFrom(ctx)

        // Step 1: AI risk assessment
        // (call your LLM or scoring service here)
        riskScore := assessRisk(amount, merchant)

        a.Note(ctx, fmt.Sprintf(
            "Risk assessment: score %d/100 for $%.2f at %s",
            riskScore, amount, merchant,
        ))

        // Step 2: Auto-approve low-risk transactions
        if riskScore < 30 {
            return map[string]any{
                "decision":       "approved",
                "method":         "automatic",
                "risk_score":     riskScore,
                "transaction_id": txnID,
            }, nil
        }

        // Step 3: Request human approval
        a.Note(ctx, fmt.Sprintf(
            "High-risk transaction flagged for review (score: %d)",
            riskScore,
        ))

        c := a.Client()
        approvalResp, err := c.RequestApproval(ctx, a.NodeID(), execCtx.ExecutionID,
            client.RequestApprovalRequest{
                Title:          fmt.Sprintf("Review transaction $%.2f at %s", amount, merchant),
                Description:    fmt.Sprintf("Risk score: %d/100. Manual review required.", riskScore),
                TemplateType:   "plan-review-v1",
                ExpiresInHours: 4,
                Payload: map[string]interface{}{
                    "transaction_id": txnID,
                    "amount":         amount,
                    "merchant":       merchant,
                    "risk_score":     riskScore,
                },
            },
        )
        if err != nil {
            return nil, fmt.Errorf("request approval: %w", err)
        }

        // Step 4: Wait for the human's decision
        deadline := time.Now().Add(4 * time.Hour)
        waitCtx, cancel := context.WithDeadline(ctx, deadline)
        defer cancel()

        status, err := c.WaitForApproval(waitCtx, a.NodeID(), execCtx.ExecutionID,
            &client.WaitForApprovalOptions{
                PollInterval: 10 * time.Second,
                MaxInterval:  60 * time.Second,
            },
        )
        if err != nil {
            return nil, fmt.Errorf("wait for approval: %w", err)
        }

        _ = approvalResp // URL available at approvalResp.ApprovalRequestURL

        // Step 5: Handle the decision
        switch status.Status {
        case "approved":
            return map[string]any{
                "decision":       "approved",
                "method":         "human",
                "risk_score":     riskScore,
                "transaction_id": txnID,
            }, nil
        case "rejected":
            return map[string]any{
                "decision":       "rejected",
                "transaction_id": txnID,
            }, nil
        default:
            return map[string]any{
                "decision":       status.Status,
                "transaction_id": txnID,
            }, nil
        }
    }
}

// assessRisk is a placeholder for your actual risk scoring logic.
func assessRisk(amount float64, merchant string) int {
    if amount > 10_000 {
        return 75
    }
    return 15
}
import { Agent } from "@agentfield/sdk";
import { ApprovalClient } from "@agentfield/sdk/approval";

const app = new Agent({
  nodeId: "transaction-reviewer",
  agentFieldUrl: process.env.AGENTFIELD_SERVER ?? "http://localhost:8080",
});

const approvalClient = new ApprovalClient({
  baseURL: process.env.AGENTFIELD_SERVER ?? "http://localhost:8080",
  nodeId: "transaction-reviewer",
  apiKey: process.env.AGENTFIELD_API_KEY,
});

interface TransactionInput {
  transactionId: string;
  amount: number;
  merchant: string;
  accountId: string;
}

app.registerReasoner(
  "review_transaction",
  async (ctx, input: TransactionInput) => {
    const { transactionId, amount, merchant } = input;
    const executionId = ctx.executionId;

    // Step 1: AI risk assessment
    // (call your LLM or scoring service here)
    const riskScore = assessRisk(amount, merchant);

    app.note(ctx, `Risk assessment: score ${riskScore}/100 for $${amount} at ${merchant}`);

    // Step 2: Auto-approve low-risk transactions
    if (riskScore < 30) {
      return {
        decision: "approved",
        method: "automatic",
        riskScore,
        transactionId,
      };
    }

    // Step 3: Request human approval
    app.note(ctx, `High-risk transaction flagged for review (score: ${riskScore})`);

    const approvalResp = await approvalClient.requestApproval(executionId, {
      title: `Review transaction $${amount} at ${merchant}`,
      description: `Risk score: ${riskScore}/100. Manual review required.`,
      templateType: "plan-review-v1",
      projectId: ctx.projectId ?? "default",
      expiresInHours: 4,
      payload: {
        transactionId,
        amount,
        merchant,
        riskScore,
      },
    });

    // Step 4: Wait for the human's decision
    const status = await approvalClient.waitForApproval(executionId, {
      pollIntervalMs: 10_000,
      maxIntervalMs: 60_000,
      timeoutMs: 4 * 60 * 60 * 1000, // 4 hours
    });

    // approvalResp.approvalRequestUrl is the link sent to the reviewer

    // Step 5: Handle the decision
    if (status.status === "approved") {
      return {
        decision: "approved",
        method: "human",
        riskScore,
        transactionId,
      };
    }

    return {
      decision: status.status,
      transactionId,
    };
  }
);

// assessRisk is a placeholder for your actual risk scoring logic.
function assessRisk(amount: number, merchant: string): number {
  if (amount > 10_000) return 75;
  return 15;
}

app.serve();

What the Control Plane Does

When pause() is called, the control plane does several things automatically:

Transitions the execution to waiting. The execution record moves from running to waiting. It stays there until the approval resolves or expires. You can see this state in the web UI under the workflow's execution list.

Stores the approval request metadata. The approval_request_id, the review URL, and the expiry time are all persisted. If your agent process restarts, the execution is still waiting on the control plane side.

Accepts the webhook callback. When a reviewer submits their decision, the approval service calls back to your agent's /webhooks/approval endpoint. The control plane routes this to the right execution and resolves the waiting future.

Handles expiry. If no decision arrives before expires_in_hours, the execution transitions to expired and pause() returns an ApprovalResult with decision="expired". Your agent can handle this case explicitly.

The waiting state is visible in the Agentfield web UI. You can see which executions are pending human review, how long they've been waiting, and the review URL for each one.

Crash Recovery

If your agent process crashes while an execution is in the waiting state, the approval request is still live on the control plane. When the agent restarts, you can reconnect to the pending approval using wait_for_resume():

@app.reasoner()
async def review_transaction_with_recovery(
    transaction_id: str,
    amount: float,
    merchant: str,
    execution_id: str | None = None,
) -> dict:
    """Transaction reviewer with crash recovery support."""

    # Check if this execution already has a pending approval
    # (e.g. the agent restarted mid-execution)
    approval_request_id = f"txn-{transaction_id}"

    try:
        existing_status = await app.client.get_approval_status(
            execution_id or app._get_current_execution_context().execution_id
        )
        if existing_status.status == "pending":
            # Approval was already requested before the crash.
            # Just wait for the callback to arrive.
            app.note(
                f"Reconnecting to pending approval for {transaction_id}",
                tags=["recovery", "waiting"],
            )
            result = await app.wait_for_resume(
                approval_request_id=approval_request_id,
                timeout=4 * 3600,
            )
            return _handle_result(result, transaction_id)
    except Exception:
        pass  # No existing approval, proceed normally

    # Normal path: run risk assessment and pause if needed
    risk = await app.ai(
        system="You are a fraud detection system. Assess transaction risk.",
        user=f"Transaction: ${amount:.2f} at {merchant}",
        schema=RiskAssessment,
    )

    if risk.score < 30:
        return {"decision": "approved", "method": "automatic", "risk_score": risk.score}

    result = await app.pause(
        approval_request_id=approval_request_id,
        approval_request_url=f"https://dashboard.example.com/review/{transaction_id}",
        expires_in_hours=4,
    )
    return _handle_result(result, transaction_id)


def _handle_result(result, transaction_id: str) -> dict:
    if result.approved:
        return {"decision": "approved", "method": "human", "transaction_id": transaction_id}
    return {"decision": result.decision, "transaction_id": transaction_id, "reason": result.feedback}

wait_for_resume() does not call the control plane again to create a new approval request. It only waits for the callback from an approval that was already requested. Use it when you know the execution is already in waiting state.

ApprovalResult Fields

The ApprovalResult returned by pause() and wait_for_resume() carries everything you need to act on the decision:

FieldTypeDescription
decisionstrOne of approved, rejected, request_changes, expired, error
feedbackstrFree-text feedback from the reviewer
execution_idstrThe execution that was waiting
approval_request_idstrThe ID passed to pause()
approvedboolConvenience property: decision == "approved"
changes_requestedboolConvenience property: decision == "request_changes"