Sunday, April 5

Your Claude agent works perfectly in testing. Then it hits production and something silently breaks — a tool call returns garbage, a multi-step chain loops, costs spike 10x overnight, and you have no idea why. This is the exact problem that agent observability logging solves, and it’s the difference between a production-grade system and a demo that sometimes works. This article walks through how to instrument Claude agents with structured logging, distributed tracing, failure analysis, and cost tracking — everything you need to actually understand what your agents are doing when you’re not watching.

Why Standard Logging Fails for Agents

Traditional application logging is request-scoped: one request, one response, done. Agents don’t work that way. A single user query might trigger 12 tool calls, 3 LLM completions, 2 retrieval steps, and a conditional branch that spawns a sub-agent. If you’re just printing to stdout or dumping to a flat log file, you lose all the causality information that makes debugging possible.

The specific failure modes I’ve run into in production:

  • Silent tool failures — Claude gets a malformed response from a tool, infers something wrong from it, and continues down a bad path. No exception raised.
  • Token cost spikes — a retrieval step that usually returns 500 tokens starts returning 5,000 because of a schema change upstream. Your costs double before you notice.
  • Reasoning drift — the agent’s chain-of-thought in step 3 contradicts what it concluded in step 1, and there’s no easy way to trace the context window that caused it.
  • Latency outliers — P99 latency is 40 seconds but median is 4. Something is occasionally hitting a retry loop but you can’t reproduce it locally.

None of these are catchable with print(response.text). You need structured, correlated, span-aware logging — the same thing distributed systems engineers have been doing with services for years, adapted for LLM agent execution.

The Core Data Model: Runs, Spans, and Events

Before writing any code, establish the data model. I use a three-level hierarchy that maps cleanly onto how agents actually execute:

  • Run — the top-level trace for a single agent invocation. Has a unique run_id, input, output, total duration, total cost, and terminal status.
  • Span — a single unit of work within a run: one LLM call, one tool execution, one retrieval. Spans have parent IDs so you can reconstruct the execution tree.
  • Event — a discrete thing that happened within a span: a decision point, a warning, a retry, a context truncation.

This maps directly to the OpenTelemetry model, which is intentional — if you ever want to ship these traces to Jaeger, Honeycomb, or Datadog, the translation is trivial.

Building a Lightweight Tracer for Claude Agents

Here’s a self-contained tracer you can drop into any Claude-based agent. It writes structured JSON to a local store (SQLite by default, swappable for any backend) and exposes a context manager interface that makes instrumentation non-intrusive.

import uuid
import time
import json
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict
from typing import Optional, Any

@dataclass
class Span:
    span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    run_id: str = ""
    parent_span_id: Optional[str] = None
    name: str = ""
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    status: str = "running"  # running | ok | error
    input_tokens: int = 0
    output_tokens: int = 0
    cost_usd: float = 0.0
    model: str = ""
    metadata: dict = field(default_factory=dict)
    error: Optional[str] = None

class AgentTracer:
    # Claude Haiku 3.5 pricing at time of writing
    COST_PER_TOKEN = {
        "claude-haiku-4-5": {"input": 0.00000080, "output": 0.000004},
        "claude-sonnet-4-5": {"input": 0.000003,  "output": 0.000015},
        "claude-opus-4-5":   {"input": 0.000015,  "output": 0.000075},
    }

    def __init__(self, db_path: str = "agent_traces.db"):
        self.db_path = db_path
        self._current_run_id: Optional[str] = None
        self._current_span_id: Optional[str] = None
        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS spans (
                span_id TEXT PRIMARY KEY,
                run_id TEXT,
                parent_span_id TEXT,
                name TEXT,
                start_time REAL,
                end_time REAL,
                status TEXT,
                input_tokens INTEGER,
                output_tokens INTEGER,
                cost_usd REAL,
                model TEXT,
                metadata TEXT,
                error TEXT
            )
        """)
        conn.commit()
        conn.close()

    def start_run(self, run_name: str = "agent_run") -> str:
        run_id = str(uuid.uuid4())
        self._current_run_id = run_id
        # Root span represents the whole run
        root = Span(run_id=run_id, name=run_name)
        self._current_span_id = root.span_id
        self._write_span(root)
        return run_id

    def finish_run(self, status: str = "ok", error: Optional[str] = None):
        self._update_span(self._current_span_id, status=status, error=error)

    @contextmanager
    def span(self, name: str, metadata: dict = None):
        """Context manager for a single unit of work."""
        s = Span(
            run_id=self._current_run_id,
            parent_span_id=self._current_span_id,
            name=name,
            metadata=metadata or {}
        )
        parent_id = self._current_span_id
        self._current_span_id = s.span_id
        self._write_span(s)
        try:
            yield s
            s.status = "ok"
        except Exception as e:
            s.status = "error"
            s.error = str(e)
            raise
        finally:
            s.end_time = time.time()
            self._current_span_id = parent_id  # restore parent
            self._write_span(s)  # overwrite with final state

    def record_llm_call(self, span: Span, model: str, usage):
        """Call this after every Anthropic API response."""
        span.model = model
        span.input_tokens = usage.input_tokens
        span.output_tokens = usage.output_tokens
        rates = self.COST_PER_TOKEN.get(model, {})
        span.cost_usd = (
            usage.input_tokens * rates.get("input", 0) +
            usage.output_tokens * rates.get("output", 0)
        )

    def _write_span(self, span: Span):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            INSERT OR REPLACE INTO spans VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
        """, (
            span.span_id, span.run_id, span.parent_span_id, span.name,
            span.start_time, span.end_time, span.status,
            span.input_tokens, span.output_tokens, span.cost_usd,
            span.model, json.dumps(span.metadata), span.error
        ))
        conn.commit()
        conn.close()

    def _update_span(self, span_id: str, **kwargs):
        # Simplified: fetch, update fields, re-write
        pass  # implement as needed for your backend

The key design choice here is the _current_span_id stack — it automatically tracks parent-child relationships so you don’t have to pass span references through every function. Works fine for single-threaded agents; for async/concurrent execution you’d replace this with a contextvars.ContextVar.

Instrumenting a Real Claude Agent

Here’s how the tracer integrates with an actual Anthropic API call and tool execution loop:

import anthropic

client = anthropic.Anthropic()
tracer = AgentTracer()

def run_agent(user_query: str):
    run_id = tracer.start_run("research_agent")

    try:
        with tracer.span("llm_initial_call", metadata={"query": user_query}) as s:
            response = client.messages.create(
                model="claude-haiku-4-5",
                max_tokens=1024,
                tools=tools_definition,
                messages=[{"role": "user", "content": user_query}]
            )
            tracer.record_llm_call(s, "claude-haiku-4-5", response.usage)

        # Agentic loop
        while response.stop_reason == "tool_use":
            tool_use_block = next(
                b for b in response.content if b.type == "tool_use"
            )

            with tracer.span("tool_execution", metadata={
                "tool_name": tool_use_block.name,
                "tool_input": tool_use_block.input
            }) as tool_span:
                tool_result = execute_tool(
                    tool_use_block.name,
                    tool_use_block.input
                )
                tool_span.metadata["result_length"] = len(str(tool_result))

            with tracer.span("llm_continuation") as s:
                messages = build_messages_with_tool_result(
                    user_query, response, tool_use_block, tool_result
                )
                response = client.messages.create(
                    model="claude-haiku-4-5",
                    max_tokens=1024,
                    tools=tools_definition,
                    messages=messages
                )
                tracer.record_llm_call(s, "claude-haiku-4-5", response.usage)

        tracer.finish_run(status="ok")
        return response.content[0].text

    except Exception as e:
        tracer.finish_run(status="error", error=str(e))
        raise

This gives you per-span token counts, per-span cost attribution, and a full execution tree for every run — queryable from SQLite in seconds.

Querying Your Traces for Failure Analysis

Finding Failed Runs

import sqlite3
import json

def get_failed_runs(db_path: str = "agent_traces.db"):
    conn = sqlite3.connect(db_path)
    rows = conn.execute("""
        SELECT run_id, name, error, start_time
        FROM spans
        WHERE parent_span_id IS NULL  -- root spans only
          AND status = 'error'
        ORDER BY start_time DESC
        LIMIT 50
    """).fetchall()
    conn.close()
    return rows

def reconstruct_run(run_id: str, db_path: str = "agent_traces.db"):
    """Return the full execution tree for a single run."""
    conn = sqlite3.connect(db_path)
    spans = conn.execute("""
        SELECT * FROM spans WHERE run_id = ?
        ORDER BY start_time ASC
    """, (run_id,)).fetchall()
    conn.close()
    return spans

Tracking Cost by Tool

def cost_by_tool(db_path: str = "agent_traces.db"):
    conn = sqlite3.connect(db_path)
    # Attribute LLM costs to the tool call that preceded them
    rows = conn.execute("""
        SELECT
            json_extract(metadata, '$.tool_name') as tool,
            COUNT(*) as calls,
            SUM(cost_usd) as total_cost,
            AVG(end_time - start_time) as avg_latency_s
        FROM spans
        WHERE name = 'tool_execution'
        GROUP BY tool
        ORDER BY total_cost DESC
    """).fetchall()
    conn.close()
    for row in rows:
        print(f"{row[0]}: {row[1]} calls, ${row[2]:.4f} total, {row[3]:.2f}s avg")

This query has saved me multiple times. One project had a web scraping tool that was silently fetching full HTML pages instead of excerpts — it showed up immediately as an outlier in per-tool cost.

Integrating with Production Observability Stacks

SQLite is fine for solo projects and early production. Once you have multiple agent instances or need alerting, you’ll want to forward spans to a real backend.

OpenTelemetry Export

The tracer above maps directly to OTEL spans. Replace _write_span with an OTEL exporter and you get Jaeger/Honeycomb/Datadog support for free:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(provider)
otel_tracer = trace.get_tracer("claude-agent")

Honeycomb has a free tier that handles ~20M events/month — more than enough for most solo or small-team deployments. At that scale you’re looking at $0 for the observability layer itself.

Managed Options Worth Considering

  • Langfuse — open-source, self-hostable, purpose-built for LLM tracing. Has a Python SDK that integrates with Anthropic calls in about 10 lines. Good choice if you want a UI out of the box.
  • Helicone — proxy-based, so zero code changes required. You just route your Anthropic calls through their endpoint. Costs around $20/month for teams. The tradeoff: you’re adding a hop to every API call, which matters if you care about P99 latency.
  • Weights & Biases Weave — excellent if your team is already using W&B for model training. Overkill if you’re just running inference agents.

I’d use Langfuse for most production deployments — it’s free to self-host, the UI is genuinely good, and it doesn’t add latency to your API path. Helicone is the right call when you need to instrument a codebase you don’t control and can’t modify.

Alerting on Agent Failures in Production

Logging without alerting is archaeology. You want to know about failures while they’re happening, not the next morning. A minimal alerting setup I use with a lightweight background process:

import time
import requests  # for Slack webhook

SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

def monitor_failure_rate(db_path: str, window_minutes: int = 5, threshold: float = 0.2):
    """Alert if error rate in the last N minutes exceeds threshold."""
    conn = sqlite3.connect(db_path)
    cutoff = time.time() - (window_minutes * 60)
    row = conn.execute("""
        SELECT
            COUNT(*) as total,
            SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors
        FROM spans
        WHERE parent_span_id IS NULL
          AND start_time > ?
    """, (cutoff,)).fetchone()
    conn.close()

    total, errors = row
    if total > 0 and (errors / total) > threshold:
        requests.post(SLACK_WEBHOOK, json={
            "text": f"⚠️ Agent error rate: {errors}/{total} in last {window_minutes}m"
        })

Run this every 2 minutes via a cron job or a simple asyncio loop alongside your agent process.

What to Do When You Find a Failed Run

Once you have traces, the debugging workflow becomes deterministic:

  1. Query failed runs, sort by recency.
  2. Pull the full span tree for the run — identify which span has status = 'error'.
  3. Check the metadata on the failing span — what was the tool input, what did retrieval return, how many tokens was the context?
  4. Replay the run locally with the same inputs using your trace data. Most agent bugs are deterministic once you have the exact context.
  5. Check the LLM continuation span that immediately preceded the failure — the model’s reasoning at that point usually explains the bad tool call.

Step 5 is the one most people miss. The error shows up in the tool execution span, but the cause is in the prior LLM reasoning. You need both sides of the trace to find the root cause.

Bottom Line: Who Should Build This and How

Solo founder running a single agent in production: Start with the SQLite tracer above plus Langfuse self-hosted. Total setup time is about 2 hours and the ongoing cost is essentially zero. You’ll catch the vast majority of production issues with just the failure rate monitor and per-tool cost query.

Small team with multiple agents: Add OpenTelemetry export to Honeycomb or a self-hosted Jaeger. The cross-agent visibility is worth the extra setup. Budget $0–50/month depending on event volume.

Enterprise / compliance-sensitive: Langfuse self-hosted on your own infrastructure keeps all trace data internal. Add a proper alerting pipeline (PagerDuty or similar) and make sure your spans capture enough metadata for audit trails without logging PII into the tool inputs.

Agent observability logging isn’t optional infrastructure — it’s the foundation that makes everything else debuggable. The agents that cause the most pain in production are the ones running invisibly. Every hour you spend on instrumentation upfront will save you five hours of blind debugging later.

Editorial note: API pricing, model capabilities, and tool features change frequently — always verify current details on the vendor’s website before building in production. Code examples are tested at time of writing; pin your dependency versions to avoid breaking changes. Some links in this article may be affiliate links — we may earn a commission if you sign up, at no extra cost to you.

Share.
Leave A Reply