Anthropic Claude SDK
A FastAPI wrapper around the Anthropic Python SDK using a manual tool-use loop and thinking-block forwarding.
The fastest implementation path uses
pipelines.odyssey
run_anthropic_loop, which implements this pattern in one line.
Register the agent
Use external_http mode with one tools_schema entry per tool. TOOLS below already matches platform format. Dump and paste into Import JSON:
uv run python -c 'import json; from app import TOOLS; print(json.dumps(TOOLS, indent=2))'Do not include registration-only keys in the tool list passed to messages.create. The tools_schema shape supports keys such as default_execution_mode, passthrough_binding, and ledger fields, but Anthropic messages.create rejects unknown tool keys with HTTP 400. The TOOLS list below is reusable because it contains only name, description, and input_schema. If passthrough or ledger settings are required, maintain two projections: full entries for Import JSON and registration, and stripped entries for messages.create.
requirements.txt
fastapi>=0.115
uvicorn[standard]>=0.32
httpx>=0.27
anthropic>=0.39app.py
import asyncio
import json
from functools import lru_cache
from typing import Any
import httpx
from anthropic import Anthropic
from fastapi import FastAPI, Header, HTTPException, Request
app = FastAPI()
@lru_cache(maxsize=1)
def get_client() -> Anthropic:
# Lazy so the server still boots (and answers the ping probe) when
# ANTHROPIC_API_KEY isn't set yet. `Anthropic()` raises at
# construction time without a key, so building it at import would
# crash uvicorn before Test connection can ever succeed.
return Anthropic()
MODEL = "claude-sonnet-4-5" # dash-delimited
MAX_TURNS = 10
SYSTEM_PROMPT = (
"You triage refund requests. Look up orders before deciding, and "
"never invent an order_id."
)
TOOLS = [
{
"name": "get_order",
"description": "Look up an order by id.",
"input_schema": {
"type": "object",
"properties": {"order_id": {"type": "string"}},
"required": ["order_id"],
},
},
{
"name": "refund_order",
"description": "Refund an order.",
"input_schema": {
"type": "object",
"properties": {"order_id": {"type": "string"}},
"required": ["order_id"],
},
},
]
def _proxy_call(proxy_url: str, run_token: str, tool_name: str, args: dict) -> Any:
url = f"{proxy_url.rstrip('/')}/tools/{tool_name}"
with httpx.Client(timeout=120.0) as c:
r = c.post(url, json=args, headers={"Authorization": f"Bearer {run_token}"})
r.raise_for_status()
return r.json()["response"]
def _record_assistant_turn(messages: list[dict], response) -> tuple[list[dict], list[dict]]:
history_content: list[dict] = []
trace_thinking: list[dict] = []
trace_text: list[dict] = []
tool_uses: list[dict] = []
for block in response.content:
if block.type == "thinking":
trace_thinking.append({"type": "text", "text": block.thinking})
history_content.append({"type": "thinking", "thinking": block.thinking, "signature": block.signature})
elif block.type == "text":
trace_text.append({"type": "text", "text": block.text})
history_content.append({"type": "text", "text": block.text})
elif block.type == "tool_use":
tool_uses.append({"id": block.id, "name": block.name, "arguments": block.input})
history_content.append({"type": "tool_use", "id": block.id, "name": block.name, "input": block.input})
messages.append({"role": "assistant", "content": history_content})
return tool_uses, [{
"role": "assistant",
"content": "\n".join(b["text"] for b in trace_text) or None,
"thinking": trace_thinking or None,
"tool_calls": tool_uses or None,
}]
def _run_loop(proxy_url: str, run_token: str, user_instruction: str) -> dict:
"""Synchronous multi-turn loop. Runs on a worker thread (see dispatch)."""
history: list[dict] = [{"role": "user", "content": user_instruction}]
trace: list[dict] = [{"role": "user", "content": user_instruction}]
final_text = ""
input_tokens = 0
output_tokens = 0
for _ in range(MAX_TURNS):
response = get_client().messages.create(
model=MODEL,
max_tokens=2048,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=history,
thinking={"type": "enabled", "budget_tokens": 1024},
)
input_tokens += response.usage.input_tokens
output_tokens += response.usage.output_tokens
tool_uses, trace_blocks = _record_assistant_turn(history, response)
trace.extend(trace_blocks)
if response.stop_reason != "tool_use":
final_text = "\n".join(b.text for b in response.content if b.type == "text")
break
tool_results: list[dict] = []
for tu in tool_uses:
try:
result = _proxy_call(proxy_url, run_token, tu["name"], tu["arguments"])
tool_results.append({"type": "tool_result", "tool_use_id": tu["id"], "content": json.dumps(result)})
trace.append({"role": "tool", "tool_call_id": tu["id"], "content": json.dumps(result)})
except httpx.HTTPError as exc:
tool_results.append({"type": "tool_result", "tool_use_id": tu["id"], "is_error": True, "content": str(exc)})
trace.append({"role": "tool", "tool_call_id": tu["id"], "content": str(exc)})
history.append({"role": "user", "content": tool_results})
return {
"final_response": final_text or "(no final response)",
"messages": trace,
"metadata": {
"model": MODEL,
"total_input_tokens": input_tokens,
"total_output_tokens": output_tokens,
},
}
@app.post("/dispatch")
async def dispatch(
request: Request,
x_pipelines_run_token: str | None = Header(default=None),
x_pipelines_odyssey_proxy_url: str | None = Header(default=None),
):
payload = await request.json()
proxy_url = payload.get("odyssey_proxy_url") or x_pipelines_odyssey_proxy_url
run_token = x_pipelines_run_token
if not proxy_url or not run_token:
raise HTTPException(400, "missing proxy URL or run token")
user_instruction = (payload.get("input") or {}).get("user_instruction") or ""
# Offload the blocking Anthropic + httpx loop to a worker thread so a
# single multi-turn run doesn't freeze the event loop and serialize
# every other concurrent dispatch (concurrency_cap is load-bearing).
return await asyncio.to_thread(_run_loop, proxy_url, run_token, user_instruction)Live trace forwarding
Stream thinking and assistant_message events during the loop so trajectory updates appear during execution:
import httpx
def _post_trace(proxy_url, run_token, event_type, payload):
try:
with httpx.Client(timeout=15.0) as c:
c.post(
f"{proxy_url.rstrip('/')}/traces/{event_type}",
json=payload,
headers={"Authorization": f"Bearer {run_token}"},
)
except httpx.HTTPError:
pass
# Once at the start of the run:
_post_trace(proxy_url, run_token, "system_prompt", {"text": SYSTEM_PROMPT, "model": MODEL})
# Inside the loop, per block:
for block in response.content:
if block.type == "thinking":
_post_trace(proxy_url, run_token, "thinking", {"text": block.thinking})
elif block.type == "text":
_post_trace(proxy_url, run_token, "assistant_message", {"text": block.text})Customizations
- Model and thinking budget: Anthropic model IDs are dash-delimited, for example claude-sonnet-4-5, not claude-sonnet-4.5.
- System prompt: pass through messages.create(system=...), not as a role=system message.
- Per-task context: set on task current_state and read from payload["input"]["input"]. behavior_instructions remains platform-side.