Pipelines Docs is in beta — content is actively being added.
AgentsReference agent examples

Anthropic Claude SDK

A FastAPI wrapper around the Anthropic Python SDK using a manual tool-use loop and thinking-block forwarding.

The fastest implementation path uses pipelines.odyssey run_anthropic_loop, which implements this pattern in one line.

Register the agent

Use external_http mode with one tools_schema entry per tool. TOOLS below already matches platform format. Dump and paste into Import JSON:

uv run python -c 'import json; from app import TOOLS; print(json.dumps(TOOLS, indent=2))'

Do not include registration-only keys in the tool list passed to messages.create. The tools_schema shape supports keys such as default_execution_mode, passthrough_binding, and ledger fields, but Anthropic messages.create rejects unknown tool keys with HTTP 400. The TOOLS list below is reusable because it contains only name, description, and input_schema. If passthrough or ledger settings are required, maintain two projections: full entries for Import JSON and registration, and stripped entries for messages.create.

requirements.txt

fastapi>=0.115
uvicorn[standard]>=0.32
httpx>=0.27
anthropic>=0.39

app.py

import asyncio
import json
from functools import lru_cache
from typing import Any

import httpx
from anthropic import Anthropic
from fastapi import FastAPI, Header, HTTPException, Request

app = FastAPI()


@lru_cache(maxsize=1)
def get_client() -> Anthropic:
    # Lazy so the server still boots (and answers the ping probe) when
    # ANTHROPIC_API_KEY isn't set yet. `Anthropic()` raises at
    # construction time without a key, so building it at import would
    # crash uvicorn before Test connection can ever succeed.
    return Anthropic()


MODEL = "claude-sonnet-4-5"  # dash-delimited
MAX_TURNS = 10
SYSTEM_PROMPT = (
    "You triage refund requests. Look up orders before deciding, and "
    "never invent an order_id."
)

TOOLS = [
    {
        "name": "get_order",
        "description": "Look up an order by id.",
        "input_schema": {
            "type": "object",
            "properties": {"order_id": {"type": "string"}},
            "required": ["order_id"],
        },
    },
    {
        "name": "refund_order",
        "description": "Refund an order.",
        "input_schema": {
            "type": "object",
            "properties": {"order_id": {"type": "string"}},
            "required": ["order_id"],
        },
    },
]


def _proxy_call(proxy_url: str, run_token: str, tool_name: str, args: dict) -> Any:
    url = f"{proxy_url.rstrip('/')}/tools/{tool_name}"
    with httpx.Client(timeout=120.0) as c:
        r = c.post(url, json=args, headers={"Authorization": f"Bearer {run_token}"})
    r.raise_for_status()
    return r.json()["response"]


def _record_assistant_turn(messages: list[dict], response) -> tuple[list[dict], list[dict]]:
    history_content: list[dict] = []
    trace_thinking: list[dict] = []
    trace_text: list[dict] = []
    tool_uses: list[dict] = []

    for block in response.content:
        if block.type == "thinking":
            trace_thinking.append({"type": "text", "text": block.thinking})
            history_content.append({"type": "thinking", "thinking": block.thinking, "signature": block.signature})
        elif block.type == "text":
            trace_text.append({"type": "text", "text": block.text})
            history_content.append({"type": "text", "text": block.text})
        elif block.type == "tool_use":
            tool_uses.append({"id": block.id, "name": block.name, "arguments": block.input})
            history_content.append({"type": "tool_use", "id": block.id, "name": block.name, "input": block.input})

    messages.append({"role": "assistant", "content": history_content})
    return tool_uses, [{
        "role": "assistant",
        "content": "\n".join(b["text"] for b in trace_text) or None,
        "thinking": trace_thinking or None,
        "tool_calls": tool_uses or None,
    }]


def _run_loop(proxy_url: str, run_token: str, user_instruction: str) -> dict:
    """Synchronous multi-turn loop. Runs on a worker thread (see dispatch)."""
    history: list[dict] = [{"role": "user", "content": user_instruction}]
    trace: list[dict] = [{"role": "user", "content": user_instruction}]

    final_text = ""
    input_tokens = 0
    output_tokens = 0

    for _ in range(MAX_TURNS):
        response = get_client().messages.create(
            model=MODEL,
            max_tokens=2048,
            system=SYSTEM_PROMPT,
            tools=TOOLS,
            messages=history,
            thinking={"type": "enabled", "budget_tokens": 1024},
        )
        input_tokens += response.usage.input_tokens
        output_tokens += response.usage.output_tokens

        tool_uses, trace_blocks = _record_assistant_turn(history, response)
        trace.extend(trace_blocks)

        if response.stop_reason != "tool_use":
            final_text = "\n".join(b.text for b in response.content if b.type == "text")
            break

        tool_results: list[dict] = []
        for tu in tool_uses:
            try:
                result = _proxy_call(proxy_url, run_token, tu["name"], tu["arguments"])
                tool_results.append({"type": "tool_result", "tool_use_id": tu["id"], "content": json.dumps(result)})
                trace.append({"role": "tool", "tool_call_id": tu["id"], "content": json.dumps(result)})
            except httpx.HTTPError as exc:
                tool_results.append({"type": "tool_result", "tool_use_id": tu["id"], "is_error": True, "content": str(exc)})
                trace.append({"role": "tool", "tool_call_id": tu["id"], "content": str(exc)})
        history.append({"role": "user", "content": tool_results})

    return {
        "final_response": final_text or "(no final response)",
        "messages": trace,
        "metadata": {
            "model": MODEL,
            "total_input_tokens": input_tokens,
            "total_output_tokens": output_tokens,
        },
    }


@app.post("/dispatch")
async def dispatch(
    request: Request,
    x_pipelines_run_token: str | None = Header(default=None),
    x_pipelines_odyssey_proxy_url: str | None = Header(default=None),
):
    payload = await request.json()
    proxy_url = payload.get("odyssey_proxy_url") or x_pipelines_odyssey_proxy_url
    run_token = x_pipelines_run_token
    if not proxy_url or not run_token:
        raise HTTPException(400, "missing proxy URL or run token")

    user_instruction = (payload.get("input") or {}).get("user_instruction") or ""

    # Offload the blocking Anthropic + httpx loop to a worker thread so a
    # single multi-turn run doesn't freeze the event loop and serialize
    # every other concurrent dispatch (concurrency_cap is load-bearing).
    return await asyncio.to_thread(_run_loop, proxy_url, run_token, user_instruction)

Live trace forwarding

Stream thinking and assistant_message events during the loop so trajectory updates appear during execution:

import httpx

def _post_trace(proxy_url, run_token, event_type, payload):
    try:
        with httpx.Client(timeout=15.0) as c:
            c.post(
                f"{proxy_url.rstrip('/')}/traces/{event_type}",
                json=payload,
                headers={"Authorization": f"Bearer {run_token}"},
            )
    except httpx.HTTPError:
        pass

# Once at the start of the run:
_post_trace(proxy_url, run_token, "system_prompt", {"text": SYSTEM_PROMPT, "model": MODEL})

# Inside the loop, per block:
for block in response.content:
    if block.type == "thinking":
        _post_trace(proxy_url, run_token, "thinking", {"text": block.thinking})
    elif block.type == "text":
        _post_trace(proxy_url, run_token, "assistant_message", {"text": block.text})

Customizations

  • Model and thinking budget: Anthropic model IDs are dash-delimited, for example claude-sonnet-4-5, not claude-sonnet-4.5.
  • System prompt: pass through messages.create(system=...), not as a role=system message.
  • Per-task context: set on task current_state and read from payload["input"]["input"]. behavior_instructions remains platform-side.