Skip to content
Last9 named a Gartner Cool Vendor in AI for SRE Observability for 2025! Read more →
Last9

Sanic

Monitor async Sanic applications with OpenTelemetry instrumentation for comprehensive API performance tracking

Instrument your async Sanic application with OpenTelemetry to send comprehensive telemetry data to Last9. This integration provides automatic instrumentation for HTTP requests, async database operations, and custom tracing for high-performance async web applications.

Prerequisites

  • Python 3.7 or higher (async/await support)
  • Sanic 21.0 or higher
  • Last9 account with OTLP endpoint configured

Installation

Install the required OpenTelemetry packages for Sanic instrumentation:

pip install \
sanic>=21.0 \
opentelemetry-api==1.27.0 \
opentelemetry-sdk==1.27.0 \
opentelemetry-exporter-otlp-proto-grpc==1.27.0 \
opentelemetry-instrumentation-aiohttp-client==0.48b0

Configuration

  1. Set Environment Variables

    Configure the required environment variables for Last9 OTLP integration:

    export OTEL_SERVICE_NAME="your-sanic-service"
    export OTEL_EXPORTER_OTLP_ENDPOINT="$last9_otlp_endpoint"
    export OTEL_EXPORTER_OTLP_HEADERS="Authorization=$last9_otlp_auth_header"
    export OTEL_TRACES_SAMPLER="always_on"
    export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=production"
    export OTEL_LOG_LEVEL="error"
    export OTEL_METRICS_EXPORTER="none"
    export OTEL_LOGS_EXPORTER="none"
  2. Create Instrumentation Module

    Create instrumentation.py for OpenTelemetry setup:

    """OpenTelemetry instrumentation for Sanic application"""
    import os
    import logging
    from opentelemetry import trace
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    from opentelemetry.sdk.trace.sampling import (
    TraceIdRatioBased,
    ParentBased,
    ALWAYS_ON,
    ALWAYS_OFF
    )
    from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
    from opentelemetry.sdk.resources import Resource, SERVICE_NAME
    logger = logging.getLogger(__name__)
    def _setup_logging():
    """Configure logging based on OTEL_LOG_LEVEL"""
    log_level = os.getenv("OTEL_LOG_LEVEL", "error").upper()
    level_map = {
    "DEBUG": logging.DEBUG,
    "INFO": logging.INFO,
    "WARNING": logging.WARNING,
    "ERROR": logging.ERROR,
    }
    logging.basicConfig(
    level=level_map.get(log_level, logging.ERROR),
    format='[%(asctime)s] [%(name)s] %(levelname)s: %(message)s'
    )
    def _get_sampler():
    """Get sampler based on OTEL_TRACES_SAMPLER environment variable"""
    sampler_name = os.getenv("OTEL_TRACES_SAMPLER", "always_on")
    if sampler_name == "always_on":
    return ParentBased(root=ALWAYS_ON)
    elif sampler_name == "always_off":
    return ParentBased(root=ALWAYS_OFF)
    elif sampler_name == "traceidratio":
    ratio = float(os.getenv("OTEL_TRACES_SAMPLER_ARG", "0.1"))
    return ParentBased(root=TraceIdRatioBased(ratio))
    else:
    return ParentBased(root=ALWAYS_ON)
    def _parse_resource_attributes():
    """Parse OTEL_RESOURCE_ATTRIBUTES environment variable"""
    resource_attrs = os.getenv("OTEL_RESOURCE_ATTRIBUTES", "")
    attrs = {}
    if resource_attrs:
    for attr in resource_attrs.split(","):
    if "=" in attr:
    key, value = attr.split("=", 1)
    attrs[key.strip()] = value.strip()
    return attrs
    def init_telemetry():
    """
    Initialize OpenTelemetry tracing.
    Must be called in each worker process for Sanic.
    """
    _setup_logging()
    service_name = os.getenv("OTEL_SERVICE_NAME", "sanic-app")
    endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
    auth_header = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")
    if not endpoint:
    logger.error("OTEL_EXPORTER_OTLP_ENDPOINT not set")
    return
    logger.info(f"Initializing OpenTelemetry for service: {service_name}")
    # Parse resource attributes
    resource_attrs = _parse_resource_attributes()
    resource_attrs[SERVICE_NAME] = service_name
    resource_attrs["service.version"] = os.getenv("SERVICE_VERSION", "1.0.0")
    resource = Resource(attributes=resource_attrs)
    sampler = _get_sampler()
    provider = TracerProvider(resource=resource, sampler=sampler)
    # Parse authorization header
    headers = {}
    if auth_header:
    for header in auth_header.split(","):
    if "=" in header:
    key, value = header.split("=", 1)
    headers[key.strip()] = value.strip()
    exporter = OTLPSpanExporter(endpoint=endpoint, headers=headers)
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    _instrument_libraries()
    logger.info("OpenTelemetry tracing initialized successfully")
    def _instrument_libraries():
    """Automatically instrument HTTP clients and databases"""
    # HTTP clients
    try:
    from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
    AioHttpClientInstrumentor().instrument()
    logger.debug("AioHTTP client instrumentation enabled")
    except ImportError:
    logger.debug("AioHTTP client instrumentation not available")
    # Database instrumentation
    try:
    from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
    AsyncPGInstrumentor().instrument()
    logger.debug("AsyncPG instrumentation enabled")
    except ImportError:
    pass
    try:
    from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
    Psycopg2Instrumentor().instrument()
    logger.debug("Psycopg2 instrumentation enabled")
    except ImportError:
    pass
    try:
    from opentelemetry.instrumentation.redis import RedisInstrumentor
    RedisInstrumentor().instrument()
    logger.debug("Redis instrumentation enabled")
    except ImportError:
    pass
    try:
    from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
    SQLAlchemyInstrumentor().instrument()
    logger.debug("SQLAlchemy instrumentation enabled")
    except ImportError:
    pass
  3. Create OpenTelemetry Middleware

    Create otel_middleware.py:

    """OpenTelemetry middleware for Sanic"""
    import time
    from opentelemetry import trace, context
    from opentelemetry.propagate import extract
    from opentelemetry.trace import SpanKind, Status, StatusCode
    def get_tracer():
    return trace.get_tracer(__name__)
    async def otel_request_middleware(request):
    """Create SERVER span for incoming request"""
    tracer = get_tracer()
    # Extract trace context from incoming headers for distributed tracing
    ctx = extract(request.headers)
    # Create SERVER span
    span = tracer.start_span(
    f"{request.method} {request.path}",
    context=ctx,
    kind=SpanKind.SERVER
    )
    # Set HTTP semantic convention attributes
    span.set_attribute("http.method", request.method)
    span.set_attribute("http.url", str(request.url))
    span.set_attribute("http.target", request.path)
    span.set_attribute("http.scheme", request.scheme)
    span.set_attribute("http.host", request.host)
    span.set_attribute("http.user_agent", request.headers.get("user-agent", ""))
    span.set_attribute("net.peer.ip", request.remote_addr or request.ip)
    # Record request start time for duration calculation
    request.ctx.start_time = time.time()
    # Attach context
    token = context.attach(ctx)
    ctx_with_span = trace.set_span_in_context(span, ctx)
    token_span = context.attach(ctx_with_span)
    # Store for cleanup
    request.ctx.otel_span = span
    request.ctx.otel_token = token
    request.ctx.otel_token_span = token_span
    async def otel_response_middleware(request, response):
    """Finalize span after response"""
    if not hasattr(request.ctx, 'otel_span'):
    return
    span = request.ctx.otel_span
    # Calculate request duration
    if hasattr(request.ctx, 'start_time'):
    duration = time.time() - request.ctx.start_time
    span.set_attribute("http.request_duration_ms", round(duration * 1000, 2))
    if response:
    span.set_attribute("http.status_code", response.status)
    span.set_attribute("http.response_size", len(response.body) if hasattr(response, 'body') and response.body else 0)
    # Set span status based on HTTP status code
    if response.status >= 400:
    if response.status < 500:
    span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status}"))
    else:
    span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status}: Server Error"))
    span.end()
    # Cleanup context
    if hasattr(request.ctx, 'otel_token_span'):
    context.detach(request.ctx.otel_token_span)
    if hasattr(request.ctx, 'otel_token'):
    context.detach(request.ctx.otel_token)
  4. Add Exception Handler

    Create exception_handler.py for comprehensive error tracking:

    """Exception handler for OpenTelemetry"""
    import traceback
    from opentelemetry.trace import Status, StatusCode
    async def otel_exception_handler(request, exception):
    """Capture exceptions in OpenTelemetry spans with full stack traces"""
    if hasattr(request.ctx, 'otel_span'):
    span = request.ctx.otel_span
    # Record the exception with full stack trace
    span.record_exception(exception)
    span.set_status(Status(StatusCode.ERROR, str(exception)))
    # Add exception details as attributes
    span.set_attribute("exception.type", type(exception).__name__)
    span.set_attribute("exception.message", str(exception))
    span.set_attribute("exception.stacktrace", traceback.format_exc())
    # Add HTTP-specific error attributes
    span.set_attribute("http.status_code", getattr(exception, 'status_code', 500))
    # Re-raise to let Sanic handle it normally
    raise exception
  5. Integrate with Sanic Application

    Update your main application file (e.g., app.py):

    from sanic import Sanic, response
    from sanic.exceptions import NotFound, MethodNotAllowed
    from instrumentation import init_telemetry
    from otel_middleware import otel_request_middleware, otel_response_middleware
    from exception_handler import otel_exception_handler
    app = Sanic("sanic-api")
    # CRITICAL: Initialize OpenTelemetry in worker process
    @app.before_server_start
    async def setup_telemetry(app, loop):
    """Initialize OpenTelemetry when Sanic worker starts"""
    init_telemetry()
    app.ctx.logger.info("OpenTelemetry initialized for Sanic worker")
    # Register OpenTelemetry middleware
    app.middleware("request")(otel_request_middleware)
    app.middleware("response")(otel_response_middleware)
    # Register exception handler for comprehensive error tracking
    app.exception(Exception)(otel_exception_handler)
    # Sample routes
    @app.route("/")
    async def index(request):
    return response.json({
    "message": "Hello from instrumented Sanic!",
    "service": "sanic-api"
    })
    @app.route("/health")
    async def health(request):
    return response.json({
    "status": "healthy",
    "service": "sanic-api"
    })
    @app.route("/users/<user_id:int>")
    async def get_user(request, user_id):
    # Simulate business logic with custom tracing
    from opentelemetry import trace
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span("get_user_business_logic") as span:
    span.set_attribute("user.id", user_id)
    # Simulate user lookup
    await asyncio.sleep(0.05) # Simulate async operation
    if user_id == 404:
    span.set_attribute("user.found", False)
    raise NotFound("User not found")
    span.set_attribute("user.found", True)
    return response.json({
    "id": user_id,
    "name": f"User {user_id}",
    "email": f"user{user_id}@example.com"
    })
    if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=False)

Database Integration Examples

Async PostgreSQL with asyncpg

import asyncpg
from opentelemetry import trace
@app.before_server_start
async def setup_database(app, loop):
# Initialize OpenTelemetry FIRST
init_telemetry()
# Create database pool (automatically instrumented)
app.ctx.db = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password",
min_size=5,
max_size=20
)
@app.route("/users")
async def list_users(request):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("fetch_users_from_db") as span:
async with request.app.ctx.db.acquire() as conn:
# Database queries are automatically traced
users = await conn.fetch("SELECT id, name, email FROM users ORDER BY id")
span.set_attribute("db.rows_returned", len(users))
return response.json([dict(user) for user in users])
@app.route("/users", methods=["POST"])
async def create_user(request):
user_data = request.json
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("create_user_in_db") as span:
span.set_attribute("user.name", user_data.get("name", ""))
span.set_attribute("user.email", user_data.get("email", ""))
async with request.app.ctx.db.acquire() as conn:
user_id = await conn.fetchval(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
user_data["name"],
user_data["email"]
)
span.set_attribute("user.id", user_id)
return response.json({
"id": user_id,
"name": user_data["name"],
"email": user_data["email"]
}, status=201)

Redis Caching

import aioredis
import json
@app.before_server_start
async def setup_redis(app, loop):
init_telemetry()
app.ctx.redis = await aioredis.from_url(
"redis://localhost",
encoding="utf-8",
decode_responses=True
)
@app.route("/cache/<key>")
async def get_cached(request, key):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("cache_lookup") as span:
span.set_attribute("cache.key", key)
# Redis operations are automatically traced
value = await request.app.ctx.redis.get(key)
span.set_attribute("cache.hit", value is not None)
if value:
return response.json({"key": key, "value": json.loads(value)})
else:
return response.json({"error": "Key not found"}, status=404)
@app.route("/cache/<key>", methods=["PUT"])
async def set_cached(request, key):
data = request.json
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("cache_set") as span:
span.set_attribute("cache.key", key)
span.set_attribute("cache.ttl", data.get("ttl", 3600))
await request.app.ctx.redis.setex(
key,
data.get("ttl", 3600),
json.dumps(data["value"])
)
return response.json({"message": "Cached successfully"})

Production Deployment

Docker Configuration

# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Set OpenTelemetry environment variables
ENV OTEL_SERVICE_NAME=sanic-docker-app
ENV OTEL_EXPORTER_OTLP_ENDPOINT=$last9_otlp_endpoint
ENV OTEL_EXPORTER_OTLP_HEADERS="Authorization=$last9_otlp_auth_header"
ENV OTEL_RESOURCE_ATTRIBUTES="deployment.environment=docker,service.version=1.0.0"
EXPOSE 8000
# Run Sanic with proper worker configuration
CMD ["python", "-m", "sanic", "app.app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Gunicorn with Sanic

For production deployments with Gunicorn:

# gunicorn_config.py
import multiprocessing
from instrumentation import init_telemetry
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "sanic.worker.GunicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 50
preload_app = False # Important for Sanic
def post_fork(server, worker):
"""Initialize OpenTelemetry in each worker process"""
init_telemetry()
server.log.info(f"OpenTelemetry initialized in worker {worker.pid}")

Start with:

gunicorn -c gunicorn_config.py app:app

Kubernetes Deployment

# kubernetes-sanic.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: sanic-app
spec:
replicas: 3
selector:
matchLabels:
app: sanic-app
template:
metadata:
labels:
app: sanic-app
spec:
containers:
- name: sanic-app
image: your-registry/sanic-app:latest
ports:
- containerPort: 8000
env:
- name: OTEL_SERVICE_NAME
value: "sanic-k8s-app"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
valueFrom:
secretKeyRef:
name: last9-credentials
key: endpoint
- name: OTEL_EXPORTER_OTLP_HEADERS
valueFrom:
secretKeyRef:
name: last9-credentials
key: auth-header
- name: OTEL_RESOURCE_ATTRIBUTES
value: "deployment.environment=kubernetes,service.version=1.0.0"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: sanic-app-service
spec:
selector:
app: sanic-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer

Troubleshooting

Common Issues

  1. No traces appearing:

    • Verify init_telemetry() is called in worker processes
    • Check environment variables are correctly set
    • Enable debug logging: export OTEL_LOG_LEVEL=debug
  2. Worker process issues:

    • Ensure OpenTelemetry is initialized in each worker
    • Use @app.before_server_start for initialization
    • Avoid initializing in main process when using multiple workers
  3. Database spans missing:

    • Initialize telemetry before creating database connections
    • Ensure database instrumentation packages are installed
    • Check that connections are created after init_telemetry()

Debug Mode

Enable detailed logging:

import logging
logging.getLogger("opentelemetry").setLevel(logging.DEBUG)
logging.getLogger("instrumentation").setLevel(logging.DEBUG)

Or via environment:

export OTEL_LOG_LEVEL=debug

Performance Optimization

Sampling Configuration

# Use ratio-based sampling in production
export OTEL_TRACES_SAMPLER="traceidratio"
export OTEL_TRACES_SAMPLER_ARG="0.1" # Sample 10% of traces

Async Best Practices

# Use connection pooling for databases
app.ctx.db = await asyncpg.create_pool(
dsn="postgresql://...",
min_size=5,
max_size=20,
command_timeout=60
)
# Configure Redis connection pool
app.ctx.redis = await aioredis.from_url(
"redis://localhost",
max_connections=20
)

Monitoring Capabilities

This integration automatically captures:

  • HTTP Requests: All async route handlers and middleware
  • Database Operations: Async PostgreSQL, Redis, SQLAlchemy queries
  • HTTP Client Calls: Outbound aiohttp requests
  • Custom Business Logic: Through manual instrumentation
  • Exception Tracking: Detailed async exception handling
  • Request/Response Metrics: Duration, status codes, payload sizes

Your Sanic application will now provide comprehensive async telemetry data to Last9, enabling detailed performance monitoring and debugging of high-performance Python async web applications.