In our previous blog, we talked about how LangChain and LangGraph help structure your agent’s behavior. But structure isn’t the same as visibility.
This one’s about fixing that.
Not with more logs. Not with generic dashboards. You need to see what your agent did, step by step, tool by tool, so you can understand how a simple query turned into a long, expensive run.
This blog walks you through how to monitor LangChain and LangGraph apps in a way that’s useful and easy to get started with.
The Instrumentation Problem
Your APM dashboard looks clean, CPU, memory, and latency are all within range. But users are getting incorrect answers, and your OpenAI usage has spiked.
This isn’t an infrastructure issue. It’s execution logic you can’t see.
A single request might trigger a LangChain retrieval chain, generate a prompt, call an LLM, and use tools for additional lookups. With LangGraph, that request could follow a state machine with branches, loops, or parallel paths.
To a monitoring tool, this looks like a single HTTP call. It doesn’t show which tool was used, how many tokens were consumed, or where time was spent. To debug and optimize, you need visibility into the steps your agent takes.
Add Basic Tracing with a Custom Callback
Before jumping into full tracing, here’s a minimal callback you can use to confirm that LangChain execution can be instrumented:
This setup works with your existing LangChain app, no config changes, no rewrites. Just drop in a callback and start seeing what’s happening.
from langchain.callbacks.base import BaseCallbackHandler
class QuickTracer(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
print(f"Starting chain: {serialized.get('name', 'chain')}")
def on_chain_end(self, outputs, **kwargs):
print("Chain completed.")
chain = YourChain() # Replace with your actual chain
chain.run("test", callbacks=[QuickTracer()])
This prints messages when a chain starts and ends. It’s a quick way to verify that your instrumentation points are active. You won’t see timing or token data yet, but it confirms that the callback interface is in place.
What’s worth recording
LangChain apps are built by composing LLMs, retrievers, tools, memory, and chains. That flexibility comes at the cost of visibility.
When something breaks or when usage gets expensive, you’ll want to answer questions like:
- Which chains ran, and in what order?
- How long did each step take?
- How did inputs and outputs change between components?
- Where were tokens used?
LangChain provides a callback interface to track these events. You can use that to plug in OpenTelemetry spans for structured observability.
Let’s start by instrumenting chain executions. This gives you a high-level view of which chains ran, how long they took, and what data flowed through them.
Add Chain-Level Instrumentation with OpenTelemetry
This example builds a custom callback handler that creates a span for each chain run, tracks key metadata, and captures timing:
from langchain.callbacks.base import BaseCallbackHandler
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
import time, hashlib
class ChainInstrumentationHandler(BaseCallbackHandler):
def __init__(self, tracer):
self.tracer = tracer
self.spans = {}
self.chain_state = {}
def on_chain_start(self, serialized, inputs, **kwargs):
run_id = kwargs.get("run_id")
parent_run_id = kwargs.get("parent_run_id")
chain_name = serialized.get("name", "unknown_chain")
span_name = f"langchain.chain.{chain_name}"
span = self.tracer.start_span(span_name)
span.set_attributes({
"chain.name": chain_name,
"chain.type": serialized.get("_type", "unknown"),
"chain.run_id": str(run_id),
"chain.parent_run_id": str(parent_run_id) if parent_run_id else None,
"chain.input_count": len(inputs) if isinstance(inputs, dict) else 1,
"chain.input_hash": hashlib.md5(str(inputs).encode()).hexdigest()[:8]
})
self.spans[run_id] = span
self.chain_state[run_id] = {
"start_time": time.time(),
"inputs": inputs,
"component_calls": 0
}
def on_chain_end(self, outputs, **kwargs):
run_id = kwargs.get("run_id")
span = self.spans.pop(run_id, None)
state = self.chain_state.pop(run_id, None)
if span and state:
duration = time.time() - state["start_time"]
span.set_attributes({
"chain.duration_ms": int(duration * 1000),
"chain.component_calls": state["component_calls"],
"chain.output_count": len(outputs) if isinstance(outputs, dict) else 1,
"chain.status": "success"
})
span.set_status(Status(StatusCode.OK))
span.end()
def on_chain_error(self, error, **kwargs):
run_id = kwargs.get("run_id")
span = self.spans.pop(run_id, None)
self.chain_state.pop(run_id, None)
if span:
span.record_exception(error)
span.set_attributes({
"chain.status": "error",
"chain.error_type": type(error).__name__
})
span.set_status(Status(StatusCode.ERROR, str(error)))
span.end()
This setup tracks:
- Chain identity: name, type, input fingerprint
- Execution timing: duration in milliseconds
- Call structure: how many internal components were involved
- Failure state: error type and status, if applicable
Before: A user asks a simple question. The agent ends up calling three tools, generating long prompts, and pushing a massive memory context into the next LLM call. It works, but costs spike, and latency jumps.
After: With trace spans, you can see that pattern immediately: which chain ran, which tool took the longest, how many tokens were used, and where memory growth started to slow things down. This approach scales from development to production, handling thousands of requests per second with minimal overhead.
The result is a structured view of how your LangChain application executes, one you can send to Last9, Grafana, or any OpenTelemetry-compatible backend.
How LangGraph Works Differently
LangGraph applications aren’t built from straight-line chains. They run as state machines. That means:
- Nodes are revisited
- Execution paths can branch or loop
- Behavior depends on runtime conditions
If your observability is built for LangChain’s linear flows, it won’t hold up here. You need to capture how the state moves through the graph, step by step, node by node.
Track the Full Graph Execution
The first thing you’ll want is a root span that tracks an entire graph run. This gives you a place to store high-level metadata like:
- Which graph ran
- When it started
- What the initial state looked like
Here’s one way to do that:
import uuid
from typing import Dict, Any
from opentelemetry import trace
from datetime import datetime
class GraphExecutionTracer:
def __init__(self, tracer):
self.tracer = tracer
self.execution_spans = {}
self.state_history = {}
self.node_visit_counts = {}
def start_execution(self, graph_name: str, initial_state: Dict[str, Any]) -> str:
execution_id = str(uuid.uuid4())
root_span = self.tracer.start_span(f"langgraph.execution.{graph_name}")
root_span.set_attributes({
"langgraph.graph.name": graph_name,
"langgraph.execution.id": execution_id,
"langgraph.execution.start_time": datetime.now().isoformat(),
"langgraph.state.initial_keys": list(initial_state.keys()),
"langgraph.state.initial_size": len(str(initial_state))
})
self.execution_spans[execution_id] = root_span
self.state_history[execution_id] = []
self.node_visit_counts[execution_id] = {}
return execution_id
This span gives you one trace per run. You can later attach all node spans and state transitions under it.
Capture What Happens Inside Each Node
As the graph executes, it moves through different nodes, sometimes looping back to the same one multiple times. To debug that behavior, you’ll need:
- Timestamps for each node visit
- State before and after the node runs
- A count of how often each node is hit
Here’s a method that traces those details:
def trace_node_execution(self, execution_id: str, node_name: str,
state_before: Dict[str, Any],
state_after: Dict[str, Any],
duration_ms: int) -> None:
self.node_visit_counts[execution_id].setdefault(node_name, 0)
self.node_visit_counts[execution_id][node_name] += 1
node_span = self.tracer.start_span(f"langgraph.node.{node_name}")
state_changes = self._calculate_state_changes(state_before, state_after)
node_span.set_attributes({
"langgraph.node.name": node_name,
"langgraph.node.execution_id": execution_id,
"langgraph.node.visit_count": self.node_visit_counts[execution_id][node_name],
"langgraph.node.duration_ms": duration_ms,
"langgraph.state.before_size": len(str(state_before)),
"langgraph.state.after_size": len(str(state_after)),
"langgraph.state.changes_count": len(state_changes),
"langgraph.state.keys_added": state_changes.get("added", []),
"langgraph.state.keys_modified": state_changes.get("modified", []),
"langgraph.state.keys_removed": state_changes.get("removed", [])
})
self.state_history[execution_id].append({
"node": node_name,
"timestamp": datetime.now().isoformat(),
"state_before": state_before,
"state_after": state_after,
"visit_count": self.node_visit_counts[execution_id][node_name]
})
node_span.end()
Each span gives you a snapshot of how the state evolved. The visit_count
field is especially helpful when you want to spot loops or high-frequency nodes.
Understand Conditional Branches
Not every node has one clear next step. LangGraph allows multiple outgoing edges based on runtime decisions.
To trace these decisions, you can create a span for each conditional transition:
def trace_conditional_edge(self, execution_id: str, from_node: str,
to_node: str, condition_result: Any,
available_options: List[str]) -> None:
edge_span = self.tracer.start_span("langgraph.conditional_edge")
edge_span.set_attributes({
"langgraph.edge.from_node": from_node,
"langgraph.edge.to_node": to_node,
"langgraph.edge.execution_id": execution_id,
"langgraph.edge.condition_result": str(condition_result),
"langgraph.edge.available_options": available_options,
"langgraph.edge.chosen_option": to_node,
"langgraph.edge.options_count": len(available_options),
"langgraph.edge.branching": len(available_options) > 1
})
edge_span.end()
This gives you visibility into decision points: which node was chosen, what options were available, and why one path was taken over another.
Detect Repeated States and Loops
LangGraph doesn’t stop you from looping forever. A good first step is detecting patterns that look like infinite loops.
Here’s a loop detection class you can wire into the tracer:
class LoopDetectionHandler:
def __init__(self, max_visits_per_node: int = 50, max_total_iterations: int = 200):
self.max_visits_per_node = max_visits_per_node
self.max_total_iterations = max_total_iterations
self.execution_stats = {}
def check_for_loops(self, execution_id: str, node_name: str, current_state: Dict[str, Any]) -> Dict[str, Any]:
if execution_id not in self.execution_stats:
self.execution_stats[execution_id] = {
"total_iterations": 0,
"node_visits": {},
"state_signatures": {}
}
stats = self.execution_stats[execution_id]
stats["total_iterations"] += 1
stats["node_visits"].setdefault(node_name, 0)
stats["node_visits"][node_name] += 1
state_signature = self._create_state_signature(current_state)
seen_states = stats["state_signatures"].setdefault(node_name, [])
state_cycle_detected = state_signature in seen_states
seen_states.append(state_signature)
return {
"excessive_node_visits": stats["node_visits"][node_name] > self.max_visits_per_node,
"excessive_total_iterations": stats["total_iterations"] > self.max_total_iterations,
"state_cycle_detected": state_cycle_detected,
"loop_risk_score": self._calculate_loop_risk(stats, node_name)
}
def _create_state_signature(self, state: Dict[str, Any]) -> str:
sorted_items = sorted(state.items())
return hashlib.md5(str(sorted_items).encode()).hexdigest()
def _calculate_loop_risk(self, stats: Dict[str, Any], current_node: str) -> float:
total_iterations = stats["total_iterations"]
node_visits = stats["node_visits"].get(current_node, 0)
unique_nodes = len(stats["node_visits"])
iteration_risk = min(total_iterations / self.max_total_iterations, 1.0)
node_visit_risk = min(node_visits / self.max_visits_per_node, 1.0)
node_diversity_risk = 1.0 - min(unique_nodes / 10, 1.0)
return (
iteration_risk * 0.4 +
node_visit_risk * 0.5 +
node_diversity_risk * 0.1
)
This doesn’t prevent loops, but it gives you an early warning. You can log these risk signals or surface them in your tracing backend when they cross thresholds.
When You Have More Than One Agent
LangGraph supports multi-agent systems where different agents run independently but still interact. This adds a new layer of complexity: message passing, shared state, tool contention, and coordination issues that aren't visible with standard tracing.
To monitor these setups properly, you need to trace how agents communicate and what resources they use.
What Happens When Agents Communicate
If one agent calls a tool and another agent reacts to the result, you need visibility into that interaction. Who sent the message? What kind of message was it? How often are these agents talking?
This method tracks agent-to-agent communication and stores basic metrics around interaction patterns:
from datetime import datetime
class MultiAgentTracer:
def __init__(self, tracer):
self.tracer = tracer
self.agent_interactions = {}
self.shared_resources = {}
def trace_agent_communication(self, from_agent: str, to_agent: str,
message_type: str, message_content: Any) -> None:
communication_span = self.tracer.start_span("langgraph.agent.communication")
communication_span.set_attributes({
"langgraph.agent.from": from_agent,
"langgraph.agent.to": to_agent,
"langgraph.agent.message_type": message_type,
"langgraph.agent.message_size": len(str(message_content)),
"langgraph.agent.timestamp": datetime.now().isoformat()
})
interaction_key = f"{from_agent}->{to_agent}"
self.agent_interactions.setdefault(interaction_key, 0)
self.agent_interactions[interaction_key] += 1
communication_span.set_attribute(
"langgraph.agent.interaction_count",
self.agent_interactions[interaction_key]
)
communication_span.end()
This gives you a timeline of agent conversations and helps surface coordination issues like message loops, delays, or redundant exchanges.
How Shared Resources Can Create Bottlenecks
When agents share memory, vector databases, or tools, conflicts can show up as latency spikes or unexpected outputs. You’ll want to monitor not just who accessed a resource, but how frequently, and whether multiple agents are involved.
Here’s how you can track that:
def trace_resource_access(self, agent_name: str, resource_name: str,
access_type: str, duration_ms: int) -> None:
resource_span = self.tracer.start_span("langgraph.resource.access")
resource_span.set_attributes({
"langgraph.resource.name": resource_name,
"langgraph.resource.agent": agent_name,
"langgraph.resource.access_type": access_type,
"langgraph.resource.duration_ms": duration_ms
})
self.shared_resources.setdefault(resource_name, {"accesses": 0, "agents": set()})
self.shared_resources[resource_name]["accesses"] += 1
self.shared_resources[resource_name]["agents"].add(agent_name)
resource_span.set_attributes({
"langgraph.resource.total_accesses": self.shared_resources[resource_name]["accesses"],
"langgraph.resource.agent_count": len(self.shared_resources[resource_name]["agents"])
})
resource_span.end()
This helps you track which agents are using the same resource, how often, and whether access patterns are balanced or skewed.
What to Track Beyond Execution and Communication
Once you're capturing execution flows, node transitions, and agent interactions, the next step is understanding whether the system is producing the right answers and doing so consistently.
This means adding instrumentation around semantic validation and performance expectations.
Check for Output Quality, Not Just Errors
LangGraph runs don’t always fail loudly. A chain can return a perfectly formatted response that’s logically incorrect or irrelevant. That’s where semantic validation comes in.
You can register a validation function per node to assess output quality:
class SemanticMonitor:
def __init__(self, tracer):
self.tracer = tracer
self.semantic_validators = {}
def register_validator(self, node_name: str, validator_func):
self.semantic_validators[node_name] = validator_func
def validate_node_output(self, execution_id: str, node_name: str,
output: Any, context: Dict[str, Any]) -> Dict[str, Any]:
if node_name not in self.semantic_validators:
return {"valid": True, "confidence": 1.0}
validator = self.semantic_validators[node_name]
try:
result = validator(output, context)
semantic_span = self.tracer.start_span("langgraph.semantic.validation")
semantic_span.set_attributes({
"langgraph.semantic.node": node_name,
"langgraph.semantic.execution_id": execution_id,
"langgraph.semantic.valid": result.get("valid", False),
"langgraph.semantic.confidence": result.get("confidence", 0.0),
"langgraph.semantic.issues": result.get("issues", [])
})
semantic_span.end()
return result
except Exception as e:
error_span = self.tracer.start_span("langgraph.semantic.error")
error_span.record_exception(e)
error_span.set_attributes({
"langgraph.semantic.node": node_name,
"langgraph.semantic.execution_id": execution_id,
"langgraph.semantic.error": str(e)
})
error_span.end()
return {"valid": False, "confidence": 0.0, "error": str(e)}
This gives you structured trace data on whether a node’s output was meaningful, not just syntactically correct. You can build confidence scores, track recurring failure cases, or flag specific nodes where validation regularly fails.
Track Performance Against a Moving Baseline
Once your system is stable, raw latency isn’t always enough. You need to know when performance drifts, when a retrieval step is 3× slower than usual, or when an LLM call suddenly becomes a bottleneck.
You can track that using dynamic baselines:
class PerformanceBaseline:
def __init__(self, tracer):
self.tracer = tracer
self.baselines = {}
def record_performance(self, operation_type: str, duration_ms: int,
context: Dict[str, Any]) -> None:
if operation_type not in self.baselines:
self.baselines[operation_type] = {
"measurements": [],
"p50": 0,
"p95": 0,
"p99": 0,
"mean": 0
}
baseline = self.baselines[operation_type]
baseline["measurements"].append(duration_ms)
if len(baseline["measurements"]) > 1000:
baseline["measurements"] = baseline["measurements"][-1000:]
measurements = sorted(baseline["measurements"])
count = len(measurements)
if count > 0:
baseline["p50"] = measurements[int(count * 0.5)]
baseline["p95"] = measurements[int(count * 0.95)]
baseline["p99"] = measurements[int(count * 0.99)]
baseline["mean"] = sum(measurements) / count
performance_ratio = duration_ms / baseline["mean"] if baseline["mean"] > 0 else 1.0
perf_span = self.tracer.start_span("langgraph.performance.baseline")
perf_span.set_attributes({
"langgraph.performance.operation": operation_type,
"langgraph.performance.duration_ms": duration_ms,
"langgraph.performance.baseline_mean": baseline["mean"],
"langgraph.performance.baseline_p95": baseline["p95"],
"langgraph.performance.ratio_to_mean": performance_ratio,
"langgraph.performance.degraded": performance_ratio > 2.0
})
perf_span.end()
This gives you an early warning when a familiar operation starts behaving abnormally, even if it still completes successfully. You can later use this data to tune timeouts, identify regressions, or surface hidden dependencies.
Export Your Traces and Metrics
With spans in place, you can now export your traces to a backend that supports OpenTelemetry and understands LLM execution patterns.
LLM traces are different; they're high-cardinality, token-heavy, and need long-term storage for cost analysis. Last9 is designed specifically for this type of telemetry data.
Here's a 2-minute setup guide to get started:
Configure the Tracer and Meter
The Last9Integration
class sets up both tracing and metrics using OTLP over gRPC. This setup assumes you’ve already generated your API key and endpoint details.
import os
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource
This sets up two exporters:
- Trace exporter: sends spans from LangChain and LangGraph to Last9
- Metric exporter: sends counters, gauges, and histograms at a regular interval
The class below does three things:
- Sets up the OTLP exporter for traces and metrics
- Defines basic service-level metadata
- Creates a few custom metrics useful for LLM agents
class Last9Integration:
def __init__(self):
self.setup_tracing()
self.setup_metrics()
self.tracer = trace.get_tracer(__name__)
self.meter = metrics.get_meter(__name__)
self.setup_custom_metrics()
def setup_tracing(self):
resource = Resource.create({
"service.name": "langchain-langgraph-app",
"service.version": "1.0.0",
"deployment.environment": os.getenv("ENVIRONMENT", "production"),
"application.type": "llm-agent"
})
trace.set_tracer_provider(TracerProvider(resource=resource))
otlp_exporter = OTLPSpanExporter(
endpoint=os.getenv("OTLP_ENDPOINT", "https://otlp.last9.io:443"),
headers={"Authorization": f"Bearer {os.getenv('LAST9_API_KEY')}"}
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
def setup_metrics(self):
metric_exporter = OTLPMetricExporter(
endpoint=os.getenv("OTLP_ENDPOINT", "https://otlp.last9.io:443"),
headers={"Authorization": f"Bearer {os.getenv('LAST9_API_KEY')}"}
)
metric_reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=30000 # 30s export interval
)
metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))
def setup_custom_metrics(self):
self.token_counter = self.meter.create_counter(
"langchain_tokens_total",
description="Total tokens consumed by LangChain operations"
)
self.execution_duration = self.meter.create_histogram(
"langgraph_execution_duration_seconds",
description="Duration of LangGraph executions"
)
self.node_visits = self.meter.create_counter(
"langgraph_node_visits_total",
description="Total visits to LangGraph nodes"
)
self.loop_risk_gauge = self.meter.create_gauge(
"langgraph_loop_risk_score",
description="Current loop risk score for LangGraph executions"
)
self.semantic_validation_counter = self.meter.create_counter(
"langgraph_semantic_validations_total",
description="Total semantic validations performed"
)
This setup gives you standard observability plus domain-specific metrics: token use, graph execution time, node visit counts, semantic validation frequency, and loop risk levels.
Create Custom Spans for Composite Operations
For more complex execution patterns, like coordination across agents or conditional state updates, you can define your spans and attach relevant context.
from datetime import datetime
def create_comprehensive_span(self, operation_type: str, context: Dict[str, Any]) -> trace.Span:
span = self.tracer.start_span(f"langgraph.{operation_type}")
span.set_attributes({
"langgraph.operation": operation_type,
"langgraph.timestamp": datetime.now().isoformat(),
"langgraph.context_size": len(str(context))
})
if operation_type == "multi_agent_coordination":
span.set_attributes({
"langgraph.agents_involved": context.get("agents", []),
"langgraph.coordination_type": context.get("coordination_type", "unknown"),
"langgraph.shared_resources": context.get("shared_resources", [])
})
elif operation_type == "conditional_routing":
span.set_attributes({
"langgraph.condition_type": context.get("condition_type", "unknown"),
"langgraph.available_paths": context.get("available_paths", []),
"langgraph.chosen_path": context.get("chosen_path", "unknown")
})
elif operation_type == "state_mutation":
span.set_attributes({
"langgraph.state_keys_changed": context.get("keys_changed", []),
"langgraph.state_size_before": context.get("size_before", 0),
"langgraph.state_size_after": context.get("size_after", 0)
})
return span
You can use this pattern to trace composite workflows across LangChain and LangGraph. For example, wrap the execution of an entire multi-agent plan or a conditional branch in one span and attach relevant metadata.
Not Every Environment Needs the Same Level of Detail
In development, you might want to capture everything, full state dumps, intermediate values, and semantic checks. But in production, that level of detail adds overhead quickly. It helps to define clear levels of instrumentation so you can switch configurations based on the environment.
Here’s one example:
class InstrumentationLevel:
DEBUG = {
"capture_full_state": True,
"capture_intermediate_outputs": True,
"detailed_token_tracking": True,
"semantic_validation": True,
"performance_baselines": True
}
PRODUCTION = {
"capture_full_state": False,
"capture_intermediate_outputs": False,
"detailed_token_tracking": True,
"semantic_validation": True,
"performance_baselines": True
}
MINIMAL = {
"capture_full_state": False,
"capture_intermediate_outputs": False,
"detailed_token_tracking": False,
"semantic_validation": False,
"performance_baselines": False
}
These configs can be passed into your instrumentation code to enable or skip certain behaviors, like skipping semantic validation in staging, or turning off detailed token tracking outside of debug environments.
When to Record a Trace and When to Skip It
Tracing everything isn’t scalable. But if you only trace a fixed percentage of traffic, you’ll miss the patterns that matter, errors, slow responses, or early signs of degradation.
This sampler keeps trace volume low under normal load but increases coverage when there’s a spike in latency or errors:
import random
class AdaptiveSampler:
def __init__(self):
self.error_rate_threshold = 0.05 # 5%
self.latency_threshold = 5000 # 5 seconds
self.recent_errors = []
self.recent_latencies = []
def should_sample(self, context: Dict[str, Any]) -> bool:
if context.get("has_error", False):
return True
if context.get("duration_ms", 0) > self.latency_threshold:
return True
if self._calculate_error_rate() > self.error_rate_threshold:
return True
return random.random() < 0.1 # 10% baseline
def _calculate_error_rate(self) -> float:
if not self.recent_errors:
return 0.0
recent_window = self.recent_errors[-100:]
return sum(recent_window) / len(recent_window)
You can use this as a simple conditional check before starting a span or recording metrics. If should_sample()
returns False
, skip the instrumentation for that request. If it returns True
, record the full trace and context.
Before you ship this
You don’t need to instrument everything on day one. Start by tracing chains, measuring execution time, and recording where token usage is spiking. Expand from there.
The patterns in this guide work with any OpenTelemetry backend, but Last9 is built for use cases like this. You get:
- Long-term storage for high-cardinality metrics, including token-level counters
- Trace stitching across async tasks, tools, and agents
- Rule-based dashboards that automatically organize LLM traces by model, cost, or tool usage
- Live debugging with real-time views of trace spans and execution timelines
- No sampling pressure—metrics are streamed, not aggregated at scrape time
If you're already exporting OpenTelemetry data, you can send it to Last9 in minutes. Once set up, you'll be able to filter, inspect, and debug complex LangChain and LangGraph runs without digging through logs.
And if you want to get more out of your traces and metrics, book sometime with us - we’ll walk you through it!