Sanic
Monitor async Sanic applications with OpenTelemetry instrumentation for comprehensive API performance tracking
Instrument your async Sanic application with OpenTelemetry to send comprehensive telemetry data to Last9. This integration provides automatic instrumentation for HTTP requests, async database operations, and custom tracing for high-performance async web applications.
Prerequisites
- Python 3.7 or higher (async/await support)
- Sanic 21.0 or higher
- Last9 account with OTLP endpoint configured
Installation
Install the required OpenTelemetry packages for Sanic instrumentation:
pip install \ sanic>=21.0 \ opentelemetry-api==1.27.0 \ opentelemetry-sdk==1.27.0 \ opentelemetry-exporter-otlp-proto-grpc==1.27.0 \ opentelemetry-instrumentation-aiohttp-client==0.48b0# Core Sanic and OpenTelemetrypip install sanic>=21.0 opentelemetry-api==1.27.0 opentelemetry-sdk==1.27.0 opentelemetry-exporter-otlp-proto-grpc==1.27.0
# HTTP client instrumentationpip install opentelemetry-instrumentation-aiohttp-client==0.48b0
# Database instrumentation (install what you use)pip install opentelemetry-instrumentation-asyncpg==0.48b0 # Async PostgreSQLpip install opentelemetry-instrumentation-psycopg2==0.48b0 # PostgreSQLpip install opentelemetry-instrumentation-redis==0.48b0 # Redispip install opentelemetry-instrumentation-sqlalchemy==0.48b0 # SQLAlchemyAdd to your requirements.txt:
sanic>=21.0opentelemetry-api==1.27.0opentelemetry-sdk==1.27.0opentelemetry-exporter-otlp-proto-grpc==1.27.0opentelemetry-instrumentation-aiohttp-client==0.48b0opentelemetry-instrumentation-asyncpg==0.48b0opentelemetry-instrumentation-redis==0.48b0opentelemetry-instrumentation-sqlalchemy==0.48b0Configuration
-
Set Environment Variables
Configure the required environment variables for Last9 OTLP integration:
export OTEL_SERVICE_NAME="your-sanic-service"export OTEL_EXPORTER_OTLP_ENDPOINT="$last9_otlp_endpoint"export OTEL_EXPORTER_OTLP_HEADERS="Authorization=$last9_otlp_auth_header"export OTEL_TRACES_SAMPLER="always_on"export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=production"export OTEL_LOG_LEVEL="error"export OTEL_METRICS_EXPORTER="none"export OTEL_LOGS_EXPORTER="none" -
Create Instrumentation Module
Create
instrumentation.pyfor OpenTelemetry setup:"""OpenTelemetry instrumentation for Sanic application"""import osimport loggingfrom opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.sdk.trace.sampling import (TraceIdRatioBased,ParentBased,ALWAYS_ON,ALWAYS_OFF)from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterfrom opentelemetry.sdk.resources import Resource, SERVICE_NAMElogger = logging.getLogger(__name__)def _setup_logging():"""Configure logging based on OTEL_LOG_LEVEL"""log_level = os.getenv("OTEL_LOG_LEVEL", "error").upper()level_map = {"DEBUG": logging.DEBUG,"INFO": logging.INFO,"WARNING": logging.WARNING,"ERROR": logging.ERROR,}logging.basicConfig(level=level_map.get(log_level, logging.ERROR),format='[%(asctime)s] [%(name)s] %(levelname)s: %(message)s')def _get_sampler():"""Get sampler based on OTEL_TRACES_SAMPLER environment variable"""sampler_name = os.getenv("OTEL_TRACES_SAMPLER", "always_on")if sampler_name == "always_on":return ParentBased(root=ALWAYS_ON)elif sampler_name == "always_off":return ParentBased(root=ALWAYS_OFF)elif sampler_name == "traceidratio":ratio = float(os.getenv("OTEL_TRACES_SAMPLER_ARG", "0.1"))return ParentBased(root=TraceIdRatioBased(ratio))else:return ParentBased(root=ALWAYS_ON)def _parse_resource_attributes():"""Parse OTEL_RESOURCE_ATTRIBUTES environment variable"""resource_attrs = os.getenv("OTEL_RESOURCE_ATTRIBUTES", "")attrs = {}if resource_attrs:for attr in resource_attrs.split(","):if "=" in attr:key, value = attr.split("=", 1)attrs[key.strip()] = value.strip()return attrsdef init_telemetry():"""Initialize OpenTelemetry tracing.Must be called in each worker process for Sanic."""_setup_logging()service_name = os.getenv("OTEL_SERVICE_NAME", "sanic-app")endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")auth_header = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")if not endpoint:logger.error("OTEL_EXPORTER_OTLP_ENDPOINT not set")returnlogger.info(f"Initializing OpenTelemetry for service: {service_name}")# Parse resource attributesresource_attrs = _parse_resource_attributes()resource_attrs[SERVICE_NAME] = service_nameresource_attrs["service.version"] = os.getenv("SERVICE_VERSION", "1.0.0")resource = Resource(attributes=resource_attrs)sampler = _get_sampler()provider = TracerProvider(resource=resource, sampler=sampler)# Parse authorization headerheaders = {}if auth_header:for header in auth_header.split(","):if "=" in header:key, value = header.split("=", 1)headers[key.strip()] = value.strip()exporter = OTLPSpanExporter(endpoint=endpoint, headers=headers)provider.add_span_processor(BatchSpanProcessor(exporter))trace.set_tracer_provider(provider)_instrument_libraries()logger.info("OpenTelemetry tracing initialized successfully")def _instrument_libraries():"""Automatically instrument HTTP clients and databases"""# HTTP clientstry:from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentorAioHttpClientInstrumentor().instrument()logger.debug("AioHTTP client instrumentation enabled")except ImportError:logger.debug("AioHTTP client instrumentation not available")# Database instrumentationtry:from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentorAsyncPGInstrumentor().instrument()logger.debug("AsyncPG instrumentation enabled")except ImportError:passtry:from opentelemetry.instrumentation.psycopg2 import Psycopg2InstrumentorPsycopg2Instrumentor().instrument()logger.debug("Psycopg2 instrumentation enabled")except ImportError:passtry:from opentelemetry.instrumentation.redis import RedisInstrumentorRedisInstrumentor().instrument()logger.debug("Redis instrumentation enabled")except ImportError:passtry:from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentorSQLAlchemyInstrumentor().instrument()logger.debug("SQLAlchemy instrumentation enabled")except ImportError:pass -
Create OpenTelemetry Middleware
Create
otel_middleware.py:"""OpenTelemetry middleware for Sanic"""import timefrom opentelemetry import trace, contextfrom opentelemetry.propagate import extractfrom opentelemetry.trace import SpanKind, Status, StatusCodedef get_tracer():return trace.get_tracer(__name__)async def otel_request_middleware(request):"""Create SERVER span for incoming request"""tracer = get_tracer()# Extract trace context from incoming headers for distributed tracingctx = extract(request.headers)# Create SERVER spanspan = tracer.start_span(f"{request.method} {request.path}",context=ctx,kind=SpanKind.SERVER)# Set HTTP semantic convention attributesspan.set_attribute("http.method", request.method)span.set_attribute("http.url", str(request.url))span.set_attribute("http.target", request.path)span.set_attribute("http.scheme", request.scheme)span.set_attribute("http.host", request.host)span.set_attribute("http.user_agent", request.headers.get("user-agent", ""))span.set_attribute("net.peer.ip", request.remote_addr or request.ip)# Record request start time for duration calculationrequest.ctx.start_time = time.time()# Attach contexttoken = context.attach(ctx)ctx_with_span = trace.set_span_in_context(span, ctx)token_span = context.attach(ctx_with_span)# Store for cleanuprequest.ctx.otel_span = spanrequest.ctx.otel_token = tokenrequest.ctx.otel_token_span = token_spanasync def otel_response_middleware(request, response):"""Finalize span after response"""if not hasattr(request.ctx, 'otel_span'):returnspan = request.ctx.otel_span# Calculate request durationif hasattr(request.ctx, 'start_time'):duration = time.time() - request.ctx.start_timespan.set_attribute("http.request_duration_ms", round(duration * 1000, 2))if response:span.set_attribute("http.status_code", response.status)span.set_attribute("http.response_size", len(response.body) if hasattr(response, 'body') and response.body else 0)# Set span status based on HTTP status codeif response.status >= 400:if response.status < 500:span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status}"))else:span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status}: Server Error"))span.end()# Cleanup contextif hasattr(request.ctx, 'otel_token_span'):context.detach(request.ctx.otel_token_span)if hasattr(request.ctx, 'otel_token'):context.detach(request.ctx.otel_token) -
Add Exception Handler
Create
exception_handler.pyfor comprehensive error tracking:"""Exception handler for OpenTelemetry"""import tracebackfrom opentelemetry.trace import Status, StatusCodeasync def otel_exception_handler(request, exception):"""Capture exceptions in OpenTelemetry spans with full stack traces"""if hasattr(request.ctx, 'otel_span'):span = request.ctx.otel_span# Record the exception with full stack tracespan.record_exception(exception)span.set_status(Status(StatusCode.ERROR, str(exception)))# Add exception details as attributesspan.set_attribute("exception.type", type(exception).__name__)span.set_attribute("exception.message", str(exception))span.set_attribute("exception.stacktrace", traceback.format_exc())# Add HTTP-specific error attributesspan.set_attribute("http.status_code", getattr(exception, 'status_code', 500))# Re-raise to let Sanic handle it normallyraise exception -
Integrate with Sanic Application
Update your main application file (e.g.,
app.py):from sanic import Sanic, responsefrom sanic.exceptions import NotFound, MethodNotAllowedfrom instrumentation import init_telemetryfrom otel_middleware import otel_request_middleware, otel_response_middlewarefrom exception_handler import otel_exception_handlerapp = Sanic("sanic-api")# CRITICAL: Initialize OpenTelemetry in worker process@app.before_server_startasync def setup_telemetry(app, loop):"""Initialize OpenTelemetry when Sanic worker starts"""init_telemetry()app.ctx.logger.info("OpenTelemetry initialized for Sanic worker")# Register OpenTelemetry middlewareapp.middleware("request")(otel_request_middleware)app.middleware("response")(otel_response_middleware)# Register exception handler for comprehensive error trackingapp.exception(Exception)(otel_exception_handler)# Sample routes@app.route("/")async def index(request):return response.json({"message": "Hello from instrumented Sanic!","service": "sanic-api"})@app.route("/health")async def health(request):return response.json({"status": "healthy","service": "sanic-api"})@app.route("/users/<user_id:int>")async def get_user(request, user_id):# Simulate business logic with custom tracingfrom opentelemetry import tracetracer = trace.get_tracer(__name__)with tracer.start_as_current_span("get_user_business_logic") as span:span.set_attribute("user.id", user_id)# Simulate user lookupawait asyncio.sleep(0.05) # Simulate async operationif user_id == 404:span.set_attribute("user.found", False)raise NotFound("User not found")span.set_attribute("user.found", True)return response.json({"id": user_id,"name": f"User {user_id}","email": f"user{user_id}@example.com"})if __name__ == "__main__":app.run(host="0.0.0.0", port=8000, debug=False)
Database Integration Examples
Async PostgreSQL with asyncpg
import asyncpgfrom opentelemetry import trace
@app.before_server_startasync def setup_database(app, loop): # Initialize OpenTelemetry FIRST init_telemetry()
# Create database pool (automatically instrumented) app.ctx.db = await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password", min_size=5, max_size=20 )
@app.route("/users")async def list_users(request): tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("fetch_users_from_db") as span: async with request.app.ctx.db.acquire() as conn: # Database queries are automatically traced users = await conn.fetch("SELECT id, name, email FROM users ORDER BY id")
span.set_attribute("db.rows_returned", len(users))
return response.json([dict(user) for user in users])
@app.route("/users", methods=["POST"])async def create_user(request): user_data = request.json tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("create_user_in_db") as span: span.set_attribute("user.name", user_data.get("name", "")) span.set_attribute("user.email", user_data.get("email", ""))
async with request.app.ctx.db.acquire() as conn: user_id = await conn.fetchval( "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id", user_data["name"], user_data["email"] )
span.set_attribute("user.id", user_id)
return response.json({ "id": user_id, "name": user_data["name"], "email": user_data["email"] }, status=201)Redis Caching
import aioredisimport json
@app.before_server_startasync def setup_redis(app, loop): init_telemetry() app.ctx.redis = await aioredis.from_url( "redis://localhost", encoding="utf-8", decode_responses=True )
@app.route("/cache/<key>")async def get_cached(request, key): tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("cache_lookup") as span: span.set_attribute("cache.key", key)
# Redis operations are automatically traced value = await request.app.ctx.redis.get(key)
span.set_attribute("cache.hit", value is not None)
if value: return response.json({"key": key, "value": json.loads(value)}) else: return response.json({"error": "Key not found"}, status=404)
@app.route("/cache/<key>", methods=["PUT"])async def set_cached(request, key): data = request.json tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("cache_set") as span: span.set_attribute("cache.key", key) span.set_attribute("cache.ttl", data.get("ttl", 3600))
await request.app.ctx.redis.setex( key, data.get("ttl", 3600), json.dumps(data["value"]) )
return response.json({"message": "Cached successfully"})Production Deployment
Docker Configuration
# DockerfileFROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Set OpenTelemetry environment variablesENV OTEL_SERVICE_NAME=sanic-docker-appENV OTEL_EXPORTER_OTLP_ENDPOINT=$last9_otlp_endpointENV OTEL_EXPORTER_OTLP_HEADERS="Authorization=$last9_otlp_auth_header"ENV OTEL_RESOURCE_ATTRIBUTES="deployment.environment=docker,service.version=1.0.0"
EXPOSE 8000
# Run Sanic with proper worker configurationCMD ["python", "-m", "sanic", "app.app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]Gunicorn with Sanic
For production deployments with Gunicorn:
# gunicorn_config.pyimport multiprocessingfrom instrumentation import init_telemetry
bind = "0.0.0.0:8000"workers = multiprocessing.cpu_count() * 2 + 1worker_class = "sanic.worker.GunicornWorker"worker_connections = 1000max_requests = 1000max_requests_jitter = 50preload_app = False # Important for Sanic
def post_fork(server, worker): """Initialize OpenTelemetry in each worker process""" init_telemetry() server.log.info(f"OpenTelemetry initialized in worker {worker.pid}")Start with:
gunicorn -c gunicorn_config.py app:appKubernetes Deployment
# kubernetes-sanic.yamlapiVersion: apps/v1kind: Deploymentmetadata: name: sanic-appspec: replicas: 3 selector: matchLabels: app: sanic-app template: metadata: labels: app: sanic-app spec: containers: - name: sanic-app image: your-registry/sanic-app:latest ports: - containerPort: 8000 env: - name: OTEL_SERVICE_NAME value: "sanic-k8s-app" - name: OTEL_EXPORTER_OTLP_ENDPOINT valueFrom: secretKeyRef: name: last9-credentials key: endpoint - name: OTEL_EXPORTER_OTLP_HEADERS valueFrom: secretKeyRef: name: last9-credentials key: auth-header - name: OTEL_RESOURCE_ATTRIBUTES value: "deployment.environment=kubernetes,service.version=1.0.0" resources: requests: memory: "128Mi" cpu: "100m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 5 periodSeconds: 5
---apiVersion: v1kind: Servicemetadata: name: sanic-app-servicespec: selector: app: sanic-app ports: - protocol: TCP port: 80 targetPort: 8000 type: LoadBalancerTroubleshooting
Common Issues
-
No traces appearing:
- Verify
init_telemetry()is called in worker processes - Check environment variables are correctly set
- Enable debug logging:
export OTEL_LOG_LEVEL=debug
- Verify
-
Worker process issues:
- Ensure OpenTelemetry is initialized in each worker
- Use
@app.before_server_startfor initialization - Avoid initializing in main process when using multiple workers
-
Database spans missing:
- Initialize telemetry before creating database connections
- Ensure database instrumentation packages are installed
- Check that connections are created after
init_telemetry()
Debug Mode
Enable detailed logging:
import logginglogging.getLogger("opentelemetry").setLevel(logging.DEBUG)logging.getLogger("instrumentation").setLevel(logging.DEBUG)Or via environment:
export OTEL_LOG_LEVEL=debugPerformance Optimization
Sampling Configuration
# Use ratio-based sampling in productionexport OTEL_TRACES_SAMPLER="traceidratio"export OTEL_TRACES_SAMPLER_ARG="0.1" # Sample 10% of tracesAsync Best Practices
# Use connection pooling for databasesapp.ctx.db = await asyncpg.create_pool( dsn="postgresql://...", min_size=5, max_size=20, command_timeout=60)
# Configure Redis connection poolapp.ctx.redis = await aioredis.from_url( "redis://localhost", max_connections=20)Monitoring Capabilities
This integration automatically captures:
- HTTP Requests: All async route handlers and middleware
- Database Operations: Async PostgreSQL, Redis, SQLAlchemy queries
- HTTP Client Calls: Outbound aiohttp requests
- Custom Business Logic: Through manual instrumentation
- Exception Tracking: Detailed async exception handling
- Request/Response Metrics: Duration, status codes, payload sizes
Your Sanic application will now provide comprehensive async telemetry data to Last9, enabling detailed performance monitoring and debugging of high-performance Python async web applications.