Pipelines Docs is in beta — content is actively being added.
AgentsReference agent examples

Strands Agents SDK

A FastAPI wrapper around Strands Agents using the OTEL exporter drain pattern.

The fastest implementation path uses pipelines.odyssey, which ships proxy_tool, dump_tools_schema, and forward_agent_result_events.

Register the agent

Use external_http mode. Dump the tool schema:

uv run python <<'EOF'
import json
from app import _build_agent

agent = _build_agent("http://stub", "stub")
print(json.dumps(
    [
        {"name": t.tool_spec.name, "description": t.tool_spec.description, "parameters": t.tool_spec.input_schema}
        for t in agent.tool_registry.registry.values()
    ],
    indent=2,
))
EOF

requirements.txt

fastapi>=0.115
uvicorn[standard]>=0.32
httpx>=0.27
strands-agents>=0.4
opentelemetry-sdk>=1.27

app.py

import asyncio
import json
from typing import Any

import httpx
from fastapi import FastAPI, Header, HTTPException, Request
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider, ReadableSpan
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter
from strands import Agent, tool

app = FastAPI()


class _MemorySpanExporter(SpanExporter):
    def __init__(self) -> None:
        self.spans: list[ReadableSpan] = []

    def export(self, spans):
        self.spans.extend(spans)
        return 0

    def shutdown(self) -> None:
        self.spans.clear()


_exporter = _MemorySpanExporter()
_provider = TracerProvider()
_provider.add_span_processor(SimpleSpanProcessor(_exporter))
trace.set_tracer_provider(_provider)


def _proxy_call(proxy_url: str, run_token: str, tool_name: str, args: dict) -> Any:
    url = f"{proxy_url.rstrip('/')}/tools/{tool_name}"
    with httpx.Client(timeout=120.0) as c:
        r = c.post(url, json=args, headers={"Authorization": f"Bearer {run_token}"})
    r.raise_for_status()
    return r.json()["response"]


def _build_agent(proxy_url: str, run_token: str) -> Agent:
    @tool
    def get_order(order_id: str) -> dict:
        """Look up an order by id."""
        return _proxy_call(proxy_url, run_token, "get_order", {"order_id": order_id})

    @tool
    def refund_order(order_id: str) -> dict:
        """Refund an order."""
        return _proxy_call(proxy_url, run_token, "refund_order", {"order_id": order_id})

    return Agent(
        name="orders-triage",
        model="claude-sonnet-4-5",
        tools=[get_order, refund_order],
        system_prompt=(
            "You triage refund requests. Look up orders before deciding."
        ),
    )


def _drain_trace(spans: list[ReadableSpan]) -> tuple[list[dict], dict]:
    messages: list[dict] = []
    metadata = {"total_input_tokens": 0, "total_output_tokens": 0}

    for span in sorted(spans, key=lambda s: s.start_time):
        attrs = dict(span.attributes or {})
        kind = attrs.get("strands.span.kind") or span.name

        if kind == "agent.llm":
            messages.append({
                "role": "assistant",
                "content": attrs.get("strands.completion.text"),
                "thinking": json.loads(attrs["strands.thinking.json"]) if "strands.thinking.json" in attrs else None,
                "tool_calls": json.loads(attrs["strands.tool_calls.json"]) if "strands.tool_calls.json" in attrs else None,
            })
            metadata["total_input_tokens"] += int(attrs.get("strands.usage.input_tokens", 0))
            metadata["total_output_tokens"] += int(attrs.get("strands.usage.output_tokens", 0))
        elif kind == "agent.tool":
            messages.append({
                "role": "tool",
                "tool_call_id": attrs.get("strands.tool.call_id"),
                "content": attrs.get("strands.tool.result_json"),
            })

    return messages, metadata


# Serialize the model run + span drain: the in-memory exporter is
# process-global, so two runs draining `_exporter.spans` at once would
# interleave. The lock scopes one run's spans at a time. For real
# concurrency, give each run its own exporter/provider (see the OTEL
# callout below) instead of widening this lock.
_run_lock = asyncio.Lock()


def _run_loop(proxy_url: str, run_token: str, user_instruction: str) -> dict:
    """Synchronous Strands run. Executed on a worker thread (see dispatch)."""
    _exporter.spans.clear()
    agent = _build_agent(proxy_url, run_token)
    result = agent(user_instruction)

    messages, metadata = _drain_trace(_exporter.spans)
    metadata["model"] = "claude-sonnet-4-5"

    final_response = getattr(result, "message", None) or str(result)
    return {
        "final_response": final_response,
        "messages": [{"role": "user", "content": user_instruction}, *messages],
        "metadata": metadata,
    }


@app.post("/dispatch")
async def dispatch(
    request: Request,
    x_pipelines_run_token: str | None = Header(default=None),
    x_pipelines_odyssey_proxy_url: str | None = Header(default=None),
):
    payload = await request.json()
    proxy_url = payload.get("odyssey_proxy_url") or x_pipelines_odyssey_proxy_url
    run_token = x_pipelines_run_token
    if not proxy_url or not run_token:
        raise HTTPException(400, "missing proxy URL or run token")

    user_instruction = (payload.get("input") or {}).get("user_instruction") or ""

    # Offload Strands' blocking call to a worker thread so it never freezes
    # the event loop; the lock keeps the shared exporter's spans per-run.
    async with _run_lock:
        return await asyncio.to_thread(_run_loop, proxy_url, run_token, user_instruction)

Strands OTEL attribute names can change across releases. The strands.span.kind and strands.tool_calls.json keys shown in this example are illustrative. Run one dispatch with a debug exporter and adjust key mapping as needed.

Live trace forwarding

Use post-run flush:

from pipelines.odyssey.adapters.strands import forward_agent_result_events
forward_agent_result_events(result)

For per-turn streaming, attach safe_post_trace_event to a Strands CallbackHandler, following the same pattern as the built-in OTEL exporter. See Trace events.

Customizations

  • Span attribute mapping: print span.attributes during local testing and adjust key mapping as needed.
  • Per-task context: set on task current_state and read from payload["input"]["input"]. behavior_instructions remains on the platform.