Vibe monitoring with Last9 MCP: Ask your agent to fix production issues! Setup →
Last9 Last9

Instrument LangChain and LangGraph Apps with OpenTelemetry

Understand how to trace, monitor, and debug LangChain and LangGraph apps using OpenTelemetry, down to chains, tools, tokens, and state flows.

Jul 7th, ‘25
Instrument LangChain and LangGraph Apps with OpenTelemetry
See How Last9 Works

Unified observability for all your telemetry. Open standards. Simple pricing.

Talk to us

In our previous blog, we talked about how LangChain and LangGraph help structure your agent’s behavior. But structure isn’t the same as visibility.

This one’s about fixing that.

Not with more logs. Not with generic dashboards. You need to see what your agent did, step by step, tool by tool, so you can understand how a simple query turned into a long, expensive run.

This blog walks you through how to monitor LangChain and LangGraph apps in a way that’s useful and easy to get started with.

The Instrumentation Problem

Your APM dashboard looks clean, CPU, memory, and latency are all within range. But users are getting incorrect answers, and your OpenAI usage has spiked.

This isn’t an infrastructure issue. It’s execution logic you can’t see.

A single request might trigger a LangChain retrieval chain, generate a prompt, call an LLM, and use tools for additional lookups. With LangGraph, that request could follow a state machine with branches, loops, or parallel paths.

To a monitoring tool, this looks like a single HTTP call. It doesn’t show which tool was used, how many tokens were consumed, or where time was spent. To debug and optimize, you need visibility into the steps your agent takes.

Add Basic Tracing with a Custom Callback

Before jumping into full tracing, here’s a minimal callback you can use to confirm that LangChain execution can be instrumented:

This setup works with your existing LangChain app, no config changes, no rewrites. Just drop in a callback and start seeing what’s happening.

from langchain.callbacks.base import BaseCallbackHandler

class QuickTracer(BaseCallbackHandler):
    def on_chain_start(self, serialized, inputs, **kwargs):
        print(f"Starting chain: {serialized.get('name', 'chain')}")

    def on_chain_end(self, outputs, **kwargs):
        print("Chain completed.")

chain = YourChain()  # Replace with your actual chain
chain.run("test", callbacks=[QuickTracer()])

This prints messages when a chain starts and ends. It’s a quick way to verify that your instrumentation points are active. You won’t see timing or token data yet, but it confirms that the callback interface is in place.

What’s worth recording

LangChain apps are built by composing LLMs, retrievers, tools, memory, and chains. That flexibility comes at the cost of visibility.

When something breaks or when usage gets expensive, you’ll want to answer questions like:

  • Which chains ran, and in what order?
  • How long did each step take?
  • How did inputs and outputs change between components?
  • Where were tokens used?

LangChain provides a callback interface to track these events. You can use that to plug in OpenTelemetry spans for structured observability.

Let’s start by instrumenting chain executions. This gives you a high-level view of which chains ran, how long they took, and what data flowed through them.

Add Chain-Level Instrumentation with OpenTelemetry

This example builds a custom callback handler that creates a span for each chain run, tracks key metadata, and captures timing:

from langchain.callbacks.base import BaseCallbackHandler
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
import time, hashlib

class ChainInstrumentationHandler(BaseCallbackHandler):
    def __init__(self, tracer):
        self.tracer = tracer
        self.spans = {}
        self.chain_state = {}

    def on_chain_start(self, serialized, inputs, **kwargs):
        run_id = kwargs.get("run_id")
        parent_run_id = kwargs.get("parent_run_id")
        chain_name = serialized.get("name", "unknown_chain")
        span_name = f"langchain.chain.{chain_name}"

        span = self.tracer.start_span(span_name)
        span.set_attributes({
            "chain.name": chain_name,
            "chain.type": serialized.get("_type", "unknown"),
            "chain.run_id": str(run_id),
            "chain.parent_run_id": str(parent_run_id) if parent_run_id else None,
            "chain.input_count": len(inputs) if isinstance(inputs, dict) else 1,
            "chain.input_hash": hashlib.md5(str(inputs).encode()).hexdigest()[:8]
        })

        self.spans[run_id] = span
        self.chain_state[run_id] = {
            "start_time": time.time(),
            "inputs": inputs,
            "component_calls": 0
        }

    def on_chain_end(self, outputs, **kwargs):
        run_id = kwargs.get("run_id")
        span = self.spans.pop(run_id, None)
        state = self.chain_state.pop(run_id, None)

        if span and state:
            duration = time.time() - state["start_time"]
            span.set_attributes({
                "chain.duration_ms": int(duration * 1000),
                "chain.component_calls": state["component_calls"],
                "chain.output_count": len(outputs) if isinstance(outputs, dict) else 1,
                "chain.status": "success"
            })
            span.set_status(Status(StatusCode.OK))
            span.end()

    def on_chain_error(self, error, **kwargs):
        run_id = kwargs.get("run_id")
        span = self.spans.pop(run_id, None)
        self.chain_state.pop(run_id, None)

        if span:
            span.record_exception(error)
            span.set_attributes({
                "chain.status": "error",
                "chain.error_type": type(error).__name__
            })
            span.set_status(Status(StatusCode.ERROR, str(error)))
            span.end()

This setup tracks:

  • Chain identity: name, type, input fingerprint
  • Execution timing: duration in milliseconds
  • Call structure: how many internal components were involved
  • Failure state: error type and status, if applicable

Before: A user asks a simple question. The agent ends up calling three tools, generating long prompts, and pushing a massive memory context into the next LLM call. It works, but costs spike, and latency jumps.

After: With trace spans, you can see that pattern immediately: which chain ran, which tool took the longest, how many tokens were used, and where memory growth started to slow things down. This approach scales from development to production, handling thousands of requests per second with minimal overhead.

The result is a structured view of how your LangChain application executes, one you can send to Last9, Grafana, or any OpenTelemetry-compatible backend.

💡
If you're just getting started, this LangChain observability guide walks through adding basic tracing and metrics in under 10 minutes.

How LangGraph Works Differently

LangGraph applications aren’t built from straight-line chains. They run as state machines. That means:

  • Nodes are revisited
  • Execution paths can branch or loop
  • Behavior depends on runtime conditions

If your observability is built for LangChain’s linear flows, it won’t hold up here. You need to capture how the state moves through the graph, step by step, node by node.

Track the Full Graph Execution

The first thing you’ll want is a root span that tracks an entire graph run. This gives you a place to store high-level metadata like:

  • Which graph ran
  • When it started
  • What the initial state looked like

Here’s one way to do that:

import uuid
from typing import Dict, Any
from opentelemetry import trace
from datetime import datetime

class GraphExecutionTracer:
    def __init__(self, tracer):
        self.tracer = tracer
        self.execution_spans = {}
        self.state_history = {}
        self.node_visit_counts = {}

    def start_execution(self, graph_name: str, initial_state: Dict[str, Any]) -> str:
        execution_id = str(uuid.uuid4())
        root_span = self.tracer.start_span(f"langgraph.execution.{graph_name}")
        root_span.set_attributes({
            "langgraph.graph.name": graph_name,
            "langgraph.execution.id": execution_id,
            "langgraph.execution.start_time": datetime.now().isoformat(),
            "langgraph.state.initial_keys": list(initial_state.keys()),
            "langgraph.state.initial_size": len(str(initial_state))
        })

        self.execution_spans[execution_id] = root_span
        self.state_history[execution_id] = []
        self.node_visit_counts[execution_id] = {}

        return execution_id

This span gives you one trace per run. You can later attach all node spans and state transitions under it.

Capture What Happens Inside Each Node

As the graph executes, it moves through different nodes, sometimes looping back to the same one multiple times. To debug that behavior, you’ll need:

  • Timestamps for each node visit
  • State before and after the node runs
  • A count of how often each node is hit

Here’s a method that traces those details:

def trace_node_execution(self, execution_id: str, node_name: str,
                             state_before: Dict[str, Any],
                             state_after: Dict[str, Any],
                             duration_ms: int) -> None:

        self.node_visit_counts[execution_id].setdefault(node_name, 0)
        self.node_visit_counts[execution_id][node_name] += 1

        node_span = self.tracer.start_span(f"langgraph.node.{node_name}")
        state_changes = self._calculate_state_changes(state_before, state_after)

        node_span.set_attributes({
            "langgraph.node.name": node_name,
            "langgraph.node.execution_id": execution_id,
            "langgraph.node.visit_count": self.node_visit_counts[execution_id][node_name],
            "langgraph.node.duration_ms": duration_ms,
            "langgraph.state.before_size": len(str(state_before)),
            "langgraph.state.after_size": len(str(state_after)),
            "langgraph.state.changes_count": len(state_changes),
            "langgraph.state.keys_added": state_changes.get("added", []),
            "langgraph.state.keys_modified": state_changes.get("modified", []),
            "langgraph.state.keys_removed": state_changes.get("removed", [])
        })

        self.state_history[execution_id].append({
            "node": node_name,
            "timestamp": datetime.now().isoformat(),
            "state_before": state_before,
            "state_after": state_after,
            "visit_count": self.node_visit_counts[execution_id][node_name]
        })

        node_span.end()

Each span gives you a snapshot of how the state evolved. The visit_count field is especially helpful when you want to spot loops or high-frequency nodes.

Understand Conditional Branches

Not every node has one clear next step. LangGraph allows multiple outgoing edges based on runtime decisions.

To trace these decisions, you can create a span for each conditional transition:

def trace_conditional_edge(self, execution_id: str, from_node: str,
                           to_node: str, condition_result: Any,
                           available_options: List[str]) -> None:

    edge_span = self.tracer.start_span("langgraph.conditional_edge")
    edge_span.set_attributes({
        "langgraph.edge.from_node": from_node,
        "langgraph.edge.to_node": to_node,
        "langgraph.edge.execution_id": execution_id,
        "langgraph.edge.condition_result": str(condition_result),
        "langgraph.edge.available_options": available_options,
        "langgraph.edge.chosen_option": to_node,
        "langgraph.edge.options_count": len(available_options),
        "langgraph.edge.branching": len(available_options) > 1
    })

    edge_span.end()

This gives you visibility into decision points: which node was chosen, what options were available, and why one path was taken over another.

Detect Repeated States and Loops

LangGraph doesn’t stop you from looping forever. A good first step is detecting patterns that look like infinite loops.

Here’s a loop detection class you can wire into the tracer:

class LoopDetectionHandler:
    def __init__(self, max_visits_per_node: int = 50, max_total_iterations: int = 200):
        self.max_visits_per_node = max_visits_per_node
        self.max_total_iterations = max_total_iterations
        self.execution_stats = {}

    def check_for_loops(self, execution_id: str, node_name: str, current_state: Dict[str, Any]) -> Dict[str, Any]:
        if execution_id not in self.execution_stats:
            self.execution_stats[execution_id] = {
                "total_iterations": 0,
                "node_visits": {},
                "state_signatures": {}
            }

        stats = self.execution_stats[execution_id]
        stats["total_iterations"] += 1

        stats["node_visits"].setdefault(node_name, 0)
        stats["node_visits"][node_name] += 1

        state_signature = self._create_state_signature(current_state)
        seen_states = stats["state_signatures"].setdefault(node_name, [])
        state_cycle_detected = state_signature in seen_states
        seen_states.append(state_signature)

        return {
            "excessive_node_visits": stats["node_visits"][node_name] > self.max_visits_per_node,
            "excessive_total_iterations": stats["total_iterations"] > self.max_total_iterations,
            "state_cycle_detected": state_cycle_detected,
            "loop_risk_score": self._calculate_loop_risk(stats, node_name)
        }

    def _create_state_signature(self, state: Dict[str, Any]) -> str:
        sorted_items = sorted(state.items())
        return hashlib.md5(str(sorted_items).encode()).hexdigest()

    def _calculate_loop_risk(self, stats: Dict[str, Any], current_node: str) -> float:
        total_iterations = stats["total_iterations"]
        node_visits = stats["node_visits"].get(current_node, 0)
        unique_nodes = len(stats["node_visits"])

        iteration_risk = min(total_iterations / self.max_total_iterations, 1.0)
        node_visit_risk = min(node_visits / self.max_visits_per_node, 1.0)
        node_diversity_risk = 1.0 - min(unique_nodes / 10, 1.0)

        return (
            iteration_risk * 0.4 +
            node_visit_risk * 0.5 +
            node_diversity_risk * 0.1
        )

This doesn’t prevent loops, but it gives you an early warning. You can log these risk signals or surface them in your tracing backend when they cross thresholds.

💡
Now you can debug LangChain agents in production, right from your IDE. With AI and Last9 MCP, bring in the logs, metrics, and traces you need to figure out why something broke, without digging through dashboards.

When You Have More Than One Agent

LangGraph supports multi-agent systems where different agents run independently but still interact. This adds a new layer of complexity: message passing, shared state, tool contention, and coordination issues that aren't visible with standard tracing.

To monitor these setups properly, you need to trace how agents communicate and what resources they use.

What Happens When Agents Communicate

If one agent calls a tool and another agent reacts to the result, you need visibility into that interaction. Who sent the message? What kind of message was it? How often are these agents talking?

This method tracks agent-to-agent communication and stores basic metrics around interaction patterns:

from datetime import datetime

class MultiAgentTracer:
    def __init__(self, tracer):
        self.tracer = tracer
        self.agent_interactions = {}
        self.shared_resources = {}

    def trace_agent_communication(self, from_agent: str, to_agent: str,
                                   message_type: str, message_content: Any) -> None:

        communication_span = self.tracer.start_span("langgraph.agent.communication")
        communication_span.set_attributes({
            "langgraph.agent.from": from_agent,
            "langgraph.agent.to": to_agent,
            "langgraph.agent.message_type": message_type,
            "langgraph.agent.message_size": len(str(message_content)),
            "langgraph.agent.timestamp": datetime.now().isoformat()
        })

        interaction_key = f"{from_agent}->{to_agent}"
        self.agent_interactions.setdefault(interaction_key, 0)
        self.agent_interactions[interaction_key] += 1

        communication_span.set_attribute(
            "langgraph.agent.interaction_count",
            self.agent_interactions[interaction_key]
        )

        communication_span.end()

This gives you a timeline of agent conversations and helps surface coordination issues like message loops, delays, or redundant exchanges.

How Shared Resources Can Create Bottlenecks

When agents share memory, vector databases, or tools, conflicts can show up as latency spikes or unexpected outputs. You’ll want to monitor not just who accessed a resource, but how frequently, and whether multiple agents are involved.

Here’s how you can track that:

def trace_resource_access(self, agent_name: str, resource_name: str,
                              access_type: str, duration_ms: int) -> None:

        resource_span = self.tracer.start_span("langgraph.resource.access")
        resource_span.set_attributes({
            "langgraph.resource.name": resource_name,
            "langgraph.resource.agent": agent_name,
            "langgraph.resource.access_type": access_type,
            "langgraph.resource.duration_ms": duration_ms
        })

        self.shared_resources.setdefault(resource_name, {"accesses": 0, "agents": set()})
        self.shared_resources[resource_name]["accesses"] += 1
        self.shared_resources[resource_name]["agents"].add(agent_name)

        resource_span.set_attributes({
            "langgraph.resource.total_accesses": self.shared_resources[resource_name]["accesses"],
            "langgraph.resource.agent_count": len(self.shared_resources[resource_name]["agents"])
        })

        resource_span.end()

This helps you track which agents are using the same resource, how often, and whether access patterns are balanced or skewed.

What to Track Beyond Execution and Communication

Once you're capturing execution flows, node transitions, and agent interactions, the next step is understanding whether the system is producing the right answers and doing so consistently.

This means adding instrumentation around semantic validation and performance expectations.

Check for Output Quality, Not Just Errors

LangGraph runs don’t always fail loudly. A chain can return a perfectly formatted response that’s logically incorrect or irrelevant. That’s where semantic validation comes in.

You can register a validation function per node to assess output quality:

class SemanticMonitor:
    def __init__(self, tracer):
        self.tracer = tracer
        self.semantic_validators = {}

    def register_validator(self, node_name: str, validator_func):
        self.semantic_validators[node_name] = validator_func

    def validate_node_output(self, execution_id: str, node_name: str,
                             output: Any, context: Dict[str, Any]) -> Dict[str, Any]:

        if node_name not in self.semantic_validators:
            return {"valid": True, "confidence": 1.0}

        validator = self.semantic_validators[node_name]

        try:
            result = validator(output, context)

            semantic_span = self.tracer.start_span("langgraph.semantic.validation")
            semantic_span.set_attributes({
                "langgraph.semantic.node": node_name,
                "langgraph.semantic.execution_id": execution_id,
                "langgraph.semantic.valid": result.get("valid", False),
                "langgraph.semantic.confidence": result.get("confidence", 0.0),
                "langgraph.semantic.issues": result.get("issues", [])
            })
            semantic_span.end()

            return result

        except Exception as e:
            error_span = self.tracer.start_span("langgraph.semantic.error")
            error_span.record_exception(e)
            error_span.set_attributes({
                "langgraph.semantic.node": node_name,
                "langgraph.semantic.execution_id": execution_id,
                "langgraph.semantic.error": str(e)
            })
            error_span.end()

            return {"valid": False, "confidence": 0.0, "error": str(e)}

This gives you structured trace data on whether a node’s output was meaningful, not just syntactically correct. You can build confidence scores, track recurring failure cases, or flag specific nodes where validation regularly fails.

Track Performance Against a Moving Baseline

Once your system is stable, raw latency isn’t always enough. You need to know when performance drifts, when a retrieval step is 3× slower than usual, or when an LLM call suddenly becomes a bottleneck.

You can track that using dynamic baselines:

class PerformanceBaseline:
    def __init__(self, tracer):
        self.tracer = tracer
        self.baselines = {}

    def record_performance(self, operation_type: str, duration_ms: int,
                           context: Dict[str, Any]) -> None:

        if operation_type not in self.baselines:
            self.baselines[operation_type] = {
                "measurements": [],
                "p50": 0,
                "p95": 0,
                "p99": 0,
                "mean": 0
            }

        baseline = self.baselines[operation_type]
        baseline["measurements"].append(duration_ms)

        if len(baseline["measurements"]) > 1000:
            baseline["measurements"] = baseline["measurements"][-1000:]

        measurements = sorted(baseline["measurements"])
        count = len(measurements)

        if count > 0:
            baseline["p50"] = measurements[int(count * 0.5)]
            baseline["p95"] = measurements[int(count * 0.95)]
            baseline["p99"] = measurements[int(count * 0.99)]
            baseline["mean"] = sum(measurements) / count

        performance_ratio = duration_ms / baseline["mean"] if baseline["mean"] > 0 else 1.0

        perf_span = self.tracer.start_span("langgraph.performance.baseline")
        perf_span.set_attributes({
            "langgraph.performance.operation": operation_type,
            "langgraph.performance.duration_ms": duration_ms,
            "langgraph.performance.baseline_mean": baseline["mean"],
            "langgraph.performance.baseline_p95": baseline["p95"],
            "langgraph.performance.ratio_to_mean": performance_ratio,
            "langgraph.performance.degraded": performance_ratio > 2.0
        })
        perf_span.end()

This gives you an early warning when a familiar operation starts behaving abnormally, even if it still completes successfully. You can later use this data to tune timeouts, identify regressions, or surface hidden dependencies.

💡
If you're new to OpenTelemetry, here's a guide that covers the basics and shows how to get started.

Export Your Traces and Metrics

With spans in place, you can now export your traces to a backend that supports OpenTelemetry and understands LLM execution patterns.

LLM traces are different; they're high-cardinality, token-heavy, and need long-term storage for cost analysis. Last9 is designed specifically for this type of telemetry data.

Here's a 2-minute setup guide to get started:

Configure the Tracer and Meter

The Last9Integration class sets up both tracing and metrics using OTLP over gRPC. This setup assumes you’ve already generated your API key and endpoint details.

import os
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource

This sets up two exporters:

  • Trace exporter: sends spans from LangChain and LangGraph to Last9
  • Metric exporter: sends counters, gauges, and histograms at a regular interval

The class below does three things:

  1. Sets up the OTLP exporter for traces and metrics
  2. Defines basic service-level metadata
  3. Creates a few custom metrics useful for LLM agents
class Last9Integration:
    def __init__(self):
        self.setup_tracing()
        self.setup_metrics()
        self.tracer = trace.get_tracer(__name__)
        self.meter = metrics.get_meter(__name__)
        self.setup_custom_metrics()

    def setup_tracing(self):
        resource = Resource.create({
            "service.name": "langchain-langgraph-app",
            "service.version": "1.0.0",
            "deployment.environment": os.getenv("ENVIRONMENT", "production"),
            "application.type": "llm-agent"
        })

        trace.set_tracer_provider(TracerProvider(resource=resource))

        otlp_exporter = OTLPSpanExporter(
            endpoint=os.getenv("OTLP_ENDPOINT", "https://otlp.last9.io:443"),
            headers={"Authorization": f"Bearer {os.getenv('LAST9_API_KEY')}"}
        )

        span_processor = BatchSpanProcessor(otlp_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)

    def setup_metrics(self):
        metric_exporter = OTLPMetricExporter(
            endpoint=os.getenv("OTLP_ENDPOINT", "https://otlp.last9.io:443"),
            headers={"Authorization": f"Bearer {os.getenv('LAST9_API_KEY')}"}
        )

        metric_reader = PeriodicExportingMetricReader(
            metric_exporter,
            export_interval_millis=30000  # 30s export interval
        )

        metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))

    def setup_custom_metrics(self):
        self.token_counter = self.meter.create_counter(
            "langchain_tokens_total",
            description="Total tokens consumed by LangChain operations"
        )

        self.execution_duration = self.meter.create_histogram(
            "langgraph_execution_duration_seconds",
            description="Duration of LangGraph executions"
        )

        self.node_visits = self.meter.create_counter(
            "langgraph_node_visits_total",
            description="Total visits to LangGraph nodes"
        )

        self.loop_risk_gauge = self.meter.create_gauge(
            "langgraph_loop_risk_score",
            description="Current loop risk score for LangGraph executions"
        )

        self.semantic_validation_counter = self.meter.create_counter(
            "langgraph_semantic_validations_total",
            description="Total semantic validations performed"
        )

This setup gives you standard observability plus domain-specific metrics: token use, graph execution time, node visit counts, semantic validation frequency, and loop risk levels.

Create Custom Spans for Composite Operations

For more complex execution patterns, like coordination across agents or conditional state updates, you can define your spans and attach relevant context.

from datetime import datetime

def create_comprehensive_span(self, operation_type: str, context: Dict[str, Any]) -> trace.Span:
    span = self.tracer.start_span(f"langgraph.{operation_type}")

    span.set_attributes({
        "langgraph.operation": operation_type,
        "langgraph.timestamp": datetime.now().isoformat(),
        "langgraph.context_size": len(str(context))
    })

    if operation_type == "multi_agent_coordination":
        span.set_attributes({
            "langgraph.agents_involved": context.get("agents", []),
            "langgraph.coordination_type": context.get("coordination_type", "unknown"),
            "langgraph.shared_resources": context.get("shared_resources", [])
        })

    elif operation_type == "conditional_routing":
        span.set_attributes({
            "langgraph.condition_type": context.get("condition_type", "unknown"),
            "langgraph.available_paths": context.get("available_paths", []),
            "langgraph.chosen_path": context.get("chosen_path", "unknown")
        })

    elif operation_type == "state_mutation":
        span.set_attributes({
            "langgraph.state_keys_changed": context.get("keys_changed", []),
            "langgraph.state_size_before": context.get("size_before", 0),
            "langgraph.state_size_after": context.get("size_after", 0)
        })

    return span

You can use this pattern to trace composite workflows across LangChain and LangGraph. For example, wrap the execution of an entire multi-agent plan or a conditional branch in one span and attach relevant metadata.

Not Every Environment Needs the Same Level of Detail

In development, you might want to capture everything, full state dumps, intermediate values, and semantic checks. But in production, that level of detail adds overhead quickly. It helps to define clear levels of instrumentation so you can switch configurations based on the environment.

Here’s one example:

class InstrumentationLevel:
    DEBUG = {
        "capture_full_state": True,
        "capture_intermediate_outputs": True,
        "detailed_token_tracking": True,
        "semantic_validation": True,
        "performance_baselines": True
    }

    PRODUCTION = {
        "capture_full_state": False,
        "capture_intermediate_outputs": False,
        "detailed_token_tracking": True,
        "semantic_validation": True,
        "performance_baselines": True
    }

    MINIMAL = {
        "capture_full_state": False,
        "capture_intermediate_outputs": False,
        "detailed_token_tracking": False,
        "semantic_validation": False,
        "performance_baselines": False
    }

These configs can be passed into your instrumentation code to enable or skip certain behaviors, like skipping semantic validation in staging, or turning off detailed token tracking outside of debug environments.

When to Record a Trace and When to Skip It

Tracing everything isn’t scalable. But if you only trace a fixed percentage of traffic, you’ll miss the patterns that matter, errors, slow responses, or early signs of degradation.

This sampler keeps trace volume low under normal load but increases coverage when there’s a spike in latency or errors:

import random

class AdaptiveSampler:
    def __init__(self):
        self.error_rate_threshold = 0.05  # 5%
        self.latency_threshold = 5000    # 5 seconds
        self.recent_errors = []
        self.recent_latencies = []

    def should_sample(self, context: Dict[str, Any]) -> bool:
        if context.get("has_error", False):
            return True

        if context.get("duration_ms", 0) > self.latency_threshold:
            return True

        if self._calculate_error_rate() > self.error_rate_threshold:
            return True

        return random.random() < 0.1  # 10% baseline

    def _calculate_error_rate(self) -> float:
        if not self.recent_errors:
            return 0.0

        recent_window = self.recent_errors[-100:]
        return sum(recent_window) / len(recent_window)

You can use this as a simple conditional check before starting a span or recording metrics. If should_sample() returns False, skip the instrumentation for that request. If it returns True, record the full trace and context.

Before you ship this

You don’t need to instrument everything on day one. Start by tracing chains, measuring execution time, and recording where token usage is spiking. Expand from there.

The patterns in this guide work with any OpenTelemetry backend, but Last9 is built for use cases like this. You get:

  • Long-term storage for high-cardinality metrics, including token-level counters
  • Trace stitching across async tasks, tools, and agents
  • Rule-based dashboards that automatically organize LLM traces by model, cost, or tool usage
  • Live debugging with real-time views of trace spans and execution timelines
  • No sampling pressure—metrics are streamed, not aggregated at scrape time

If you're already exporting OpenTelemetry data, you can send it to Last9 in minutes. Once set up, you'll be able to filter, inspect, and debug complex LangChain and LangGraph runs without digging through logs.

And if you want to get more out of your traces and metrics, book sometime with us - we’ll walk you through it!

Authors
Anjali Udasi

Anjali Udasi

Helping to make the tech a little less intimidating. I love breaking down complex concepts into easy-to-understand terms.

Contents

Do More with Less

Unlock high cardinality monitoring for your teams.