Your Claude agent works perfectly in testing. Then it hits production and something silently breaks — a tool call returns garbage, a multi-step chain loops, costs spike 10x overnight, and you have no idea why. This is the exact problem that agent observability logging solves, and it’s the difference between a production-grade system and a demo that sometimes works. This article walks through how to instrument Claude agents with structured logging, distributed tracing, failure analysis, and cost tracking — everything you need to actually understand what your agents are doing when you’re not watching.
Why Standard Logging Fails for Agents
Traditional application logging is request-scoped: one request, one response, done. Agents don’t work that way. A single user query might trigger 12 tool calls, 3 LLM completions, 2 retrieval steps, and a conditional branch that spawns a sub-agent. If you’re just printing to stdout or dumping to a flat log file, you lose all the causality information that makes debugging possible.
The specific failure modes I’ve run into in production:
- Silent tool failures — Claude gets a malformed response from a tool, infers something wrong from it, and continues down a bad path. No exception raised.
- Token cost spikes — a retrieval step that usually returns 500 tokens starts returning 5,000 because of a schema change upstream. Your costs double before you notice.
- Reasoning drift — the agent’s chain-of-thought in step 3 contradicts what it concluded in step 1, and there’s no easy way to trace the context window that caused it.
- Latency outliers — P99 latency is 40 seconds but median is 4. Something is occasionally hitting a retry loop but you can’t reproduce it locally.
None of these are catchable with print(response.text). You need structured, correlated, span-aware logging — the same thing distributed systems engineers have been doing with services for years, adapted for LLM agent execution.
The Core Data Model: Runs, Spans, and Events
Before writing any code, establish the data model. I use a three-level hierarchy that maps cleanly onto how agents actually execute:
- Run — the top-level trace for a single agent invocation. Has a unique
run_id, input, output, total duration, total cost, and terminal status. - Span — a single unit of work within a run: one LLM call, one tool execution, one retrieval. Spans have parent IDs so you can reconstruct the execution tree.
- Event — a discrete thing that happened within a span: a decision point, a warning, a retry, a context truncation.
This maps directly to the OpenTelemetry model, which is intentional — if you ever want to ship these traces to Jaeger, Honeycomb, or Datadog, the translation is trivial.
Building a Lightweight Tracer for Claude Agents
Here’s a self-contained tracer you can drop into any Claude-based agent. It writes structured JSON to a local store (SQLite by default, swappable for any backend) and exposes a context manager interface that makes instrumentation non-intrusive.
import uuid
import time
import json
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
@dataclass
class Span:
span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
run_id: str = ""
parent_span_id: Optional[str] = None
name: str = ""
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
status: str = "running" # running | ok | error
input_tokens: int = 0
output_tokens: int = 0
cost_usd: float = 0.0
model: str = ""
metadata: dict = field(default_factory=dict)
error: Optional[str] = None
class AgentTracer:
# Claude Haiku 3.5 pricing at time of writing
COST_PER_TOKEN = {
"claude-haiku-4-5": {"input": 0.00000080, "output": 0.000004},
"claude-sonnet-4-5": {"input": 0.000003, "output": 0.000015},
"claude-opus-4-5": {"input": 0.000015, "output": 0.000075},
}
def __init__(self, db_path: str = "agent_traces.db"):
self.db_path = db_path
self._current_run_id: Optional[str] = None
self._current_span_id: Optional[str] = None
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS spans (
span_id TEXT PRIMARY KEY,
run_id TEXT,
parent_span_id TEXT,
name TEXT,
start_time REAL,
end_time REAL,
status TEXT,
input_tokens INTEGER,
output_tokens INTEGER,
cost_usd REAL,
model TEXT,
metadata TEXT,
error TEXT
)
""")
conn.commit()
conn.close()
def start_run(self, run_name: str = "agent_run") -> str:
run_id = str(uuid.uuid4())
self._current_run_id = run_id
# Root span represents the whole run
root = Span(run_id=run_id, name=run_name)
self._current_span_id = root.span_id
self._write_span(root)
return run_id
def finish_run(self, status: str = "ok", error: Optional[str] = None):
self._update_span(self._current_span_id, status=status, error=error)
@contextmanager
def span(self, name: str, metadata: dict = None):
"""Context manager for a single unit of work."""
s = Span(
run_id=self._current_run_id,
parent_span_id=self._current_span_id,
name=name,
metadata=metadata or {}
)
parent_id = self._current_span_id
self._current_span_id = s.span_id
self._write_span(s)
try:
yield s
s.status = "ok"
except Exception as e:
s.status = "error"
s.error = str(e)
raise
finally:
s.end_time = time.time()
self._current_span_id = parent_id # restore parent
self._write_span(s) # overwrite with final state
def record_llm_call(self, span: Span, model: str, usage):
"""Call this after every Anthropic API response."""
span.model = model
span.input_tokens = usage.input_tokens
span.output_tokens = usage.output_tokens
rates = self.COST_PER_TOKEN.get(model, {})
span.cost_usd = (
usage.input_tokens * rates.get("input", 0) +
usage.output_tokens * rates.get("output", 0)
)
def _write_span(self, span: Span):
conn = sqlite3.connect(self.db_path)
conn.execute("""
INSERT OR REPLACE INTO spans VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
span.span_id, span.run_id, span.parent_span_id, span.name,
span.start_time, span.end_time, span.status,
span.input_tokens, span.output_tokens, span.cost_usd,
span.model, json.dumps(span.metadata), span.error
))
conn.commit()
conn.close()
def _update_span(self, span_id: str, **kwargs):
# Simplified: fetch, update fields, re-write
pass # implement as needed for your backend
The key design choice here is the _current_span_id stack — it automatically tracks parent-child relationships so you don’t have to pass span references through every function. Works fine for single-threaded agents; for async/concurrent execution you’d replace this with a contextvars.ContextVar.
Instrumenting a Real Claude Agent
Here’s how the tracer integrates with an actual Anthropic API call and tool execution loop:
import anthropic
client = anthropic.Anthropic()
tracer = AgentTracer()
def run_agent(user_query: str):
run_id = tracer.start_run("research_agent")
try:
with tracer.span("llm_initial_call", metadata={"query": user_query}) as s:
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=1024,
tools=tools_definition,
messages=[{"role": "user", "content": user_query}]
)
tracer.record_llm_call(s, "claude-haiku-4-5", response.usage)
# Agentic loop
while response.stop_reason == "tool_use":
tool_use_block = next(
b for b in response.content if b.type == "tool_use"
)
with tracer.span("tool_execution", metadata={
"tool_name": tool_use_block.name,
"tool_input": tool_use_block.input
}) as tool_span:
tool_result = execute_tool(
tool_use_block.name,
tool_use_block.input
)
tool_span.metadata["result_length"] = len(str(tool_result))
with tracer.span("llm_continuation") as s:
messages = build_messages_with_tool_result(
user_query, response, tool_use_block, tool_result
)
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=1024,
tools=tools_definition,
messages=messages
)
tracer.record_llm_call(s, "claude-haiku-4-5", response.usage)
tracer.finish_run(status="ok")
return response.content[0].text
except Exception as e:
tracer.finish_run(status="error", error=str(e))
raise
This gives you per-span token counts, per-span cost attribution, and a full execution tree for every run — queryable from SQLite in seconds.
Querying Your Traces for Failure Analysis
Finding Failed Runs
import sqlite3
import json
def get_failed_runs(db_path: str = "agent_traces.db"):
conn = sqlite3.connect(db_path)
rows = conn.execute("""
SELECT run_id, name, error, start_time
FROM spans
WHERE parent_span_id IS NULL -- root spans only
AND status = 'error'
ORDER BY start_time DESC
LIMIT 50
""").fetchall()
conn.close()
return rows
def reconstruct_run(run_id: str, db_path: str = "agent_traces.db"):
"""Return the full execution tree for a single run."""
conn = sqlite3.connect(db_path)
spans = conn.execute("""
SELECT * FROM spans WHERE run_id = ?
ORDER BY start_time ASC
""", (run_id,)).fetchall()
conn.close()
return spans
Tracking Cost by Tool
def cost_by_tool(db_path: str = "agent_traces.db"):
conn = sqlite3.connect(db_path)
# Attribute LLM costs to the tool call that preceded them
rows = conn.execute("""
SELECT
json_extract(metadata, '$.tool_name') as tool,
COUNT(*) as calls,
SUM(cost_usd) as total_cost,
AVG(end_time - start_time) as avg_latency_s
FROM spans
WHERE name = 'tool_execution'
GROUP BY tool
ORDER BY total_cost DESC
""").fetchall()
conn.close()
for row in rows:
print(f"{row[0]}: {row[1]} calls, ${row[2]:.4f} total, {row[3]:.2f}s avg")
This query has saved me multiple times. One project had a web scraping tool that was silently fetching full HTML pages instead of excerpts — it showed up immediately as an outlier in per-tool cost.
Integrating with Production Observability Stacks
SQLite is fine for solo projects and early production. Once you have multiple agent instances or need alerting, you’ll want to forward spans to a real backend.
OpenTelemetry Export
The tracer above maps directly to OTEL spans. Replace _write_span with an OTEL exporter and you get Jaeger/Honeycomb/Datadog support for free:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(provider)
otel_tracer = trace.get_tracer("claude-agent")
Honeycomb has a free tier that handles ~20M events/month — more than enough for most solo or small-team deployments. At that scale you’re looking at $0 for the observability layer itself.
Managed Options Worth Considering
- Langfuse — open-source, self-hostable, purpose-built for LLM tracing. Has a Python SDK that integrates with Anthropic calls in about 10 lines. Good choice if you want a UI out of the box.
- Helicone — proxy-based, so zero code changes required. You just route your Anthropic calls through their endpoint. Costs around $20/month for teams. The tradeoff: you’re adding a hop to every API call, which matters if you care about P99 latency.
- Weights & Biases Weave — excellent if your team is already using W&B for model training. Overkill if you’re just running inference agents.
I’d use Langfuse for most production deployments — it’s free to self-host, the UI is genuinely good, and it doesn’t add latency to your API path. Helicone is the right call when you need to instrument a codebase you don’t control and can’t modify.
Alerting on Agent Failures in Production
Logging without alerting is archaeology. You want to know about failures while they’re happening, not the next morning. A minimal alerting setup I use with a lightweight background process:
import time
import requests # for Slack webhook
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
def monitor_failure_rate(db_path: str, window_minutes: int = 5, threshold: float = 0.2):
"""Alert if error rate in the last N minutes exceeds threshold."""
conn = sqlite3.connect(db_path)
cutoff = time.time() - (window_minutes * 60)
row = conn.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors
FROM spans
WHERE parent_span_id IS NULL
AND start_time > ?
""", (cutoff,)).fetchone()
conn.close()
total, errors = row
if total > 0 and (errors / total) > threshold:
requests.post(SLACK_WEBHOOK, json={
"text": f"⚠️ Agent error rate: {errors}/{total} in last {window_minutes}m"
})
Run this every 2 minutes via a cron job or a simple asyncio loop alongside your agent process.
What to Do When You Find a Failed Run
Once you have traces, the debugging workflow becomes deterministic:
- Query failed runs, sort by recency.
- Pull the full span tree for the run — identify which span has
status = 'error'. - Check the
metadataon the failing span — what was the tool input, what did retrieval return, how many tokens was the context? - Replay the run locally with the same inputs using your trace data. Most agent bugs are deterministic once you have the exact context.
- Check the LLM continuation span that immediately preceded the failure — the model’s reasoning at that point usually explains the bad tool call.
Step 5 is the one most people miss. The error shows up in the tool execution span, but the cause is in the prior LLM reasoning. You need both sides of the trace to find the root cause.
Bottom Line: Who Should Build This and How
Solo founder running a single agent in production: Start with the SQLite tracer above plus Langfuse self-hosted. Total setup time is about 2 hours and the ongoing cost is essentially zero. You’ll catch the vast majority of production issues with just the failure rate monitor and per-tool cost query.
Small team with multiple agents: Add OpenTelemetry export to Honeycomb or a self-hosted Jaeger. The cross-agent visibility is worth the extra setup. Budget $0–50/month depending on event volume.
Enterprise / compliance-sensitive: Langfuse self-hosted on your own infrastructure keeps all trace data internal. Add a proper alerting pipeline (PagerDuty or similar) and make sure your spans capture enough metadata for audit trails without logging PII into the tool inputs.
Agent observability logging isn’t optional infrastructure — it’s the foundation that makes everything else debuggable. The agents that cause the most pain in production are the ones running invisibly. Every hour you spend on instrumentation upfront will save you five hours of blind debugging later.
Editorial note: API pricing, model capabilities, and tool features change frequently — always verify current details on the vendor’s website before building in production. Code examples are tested at time of writing; pin your dependency versions to avoid breaking changes. Some links in this article may be affiliate links — we may earn a commission if you sign up, at no extra cost to you.

