Celery
Instrument Celery task queue applications with OpenTelemetry for distributed tracing and task monitoring
Use OpenTelemetry to instrument your Celery task queue application and send telemetry data to Last9. This integration provides automatic instrumentation for task execution, message brokers, and distributed task monitoring across workers.
Celery is a distributed task queue system that allows you to run background tasks asynchronously. With OpenTelemetry instrumentation, you can track task execution, performance, and failures across your distributed worker infrastructure.
Prerequisites
Before setting up Celery monitoring, ensure you have:
- Python 3.7 or higher installed
- Celery application with tasks defined
- Message broker (Redis, RabbitMQ, or others) configured
- Last9 account with integration credentials
pippackage manager available
-
Install OpenTelemetry Packages
Install the required OpenTelemetry packages for Celery instrumentation:
pip install opentelemetry-instrumentation-celery opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlpYou can also add these packages to your
requirements.txtfile:opentelemetry-instrumentation-celeryopentelemetry-apiopentelemetry-sdkopentelemetry-exporter-otlp -
Set Environment Variables
Configure OpenTelemetry environment variables for your Celery application:
export OTEL_SERVICE_NAME=<service_name>export OTEL_EXPORTER_OTLP_ENDPOINT=$last9_otlp_endpointexport OTEL_EXPORTER_OTLP_HEADERS="Authorization=$last9_otlp_auth_header"export OTEL_TRACES_EXPORTER=otlpexport OTEL_TRACES_SAMPLER="always_on"export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=production"export OTEL_LOG_LEVEL=errorReplace
<service_name>with your Celery service name (e.g.,task-processor,email-worker). -
Instrument Your Celery Application
The key to Celery tracing is proper initialization timing. Tracing and instrumentation must be initialized after the Celery worker process is initialized for threading components like BatchSpanProcessor to work correctly.
Here’s how to properly instrument Celery workers using the
worker_process_initsignal:from opentelemetry.sdk.resources import Resourcefrom opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.semconv.resource import ResourceAttributesfrom opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporterfrom opentelemetry.instrumentation.celery import CeleryInstrumentorfrom celery import Celeryfrom celery.signals import worker_process_initimport os@worker_process_init.connect(weak=False)def init_celery_tracing(*args, **kwargs):"""Initialize OpenTelemetry tracing for Celery worker processes."""CeleryInstrumentor().instrument()resource = Resource(attributes={ResourceAttributes.SERVICE_NAME: os.getenv('OTEL_SERVICE_NAME', 'celery-worker'),ResourceAttributes.SERVICE_VERSION: "1.0.0",ResourceAttributes.DEPLOYMENT_ENVIRONMENT: os.getenv('DEPLOYMENT_ENV', 'production'),})provider = TracerProvider(resource=resource)processor = BatchSpanProcessor(OTLPSpanExporter())provider.add_span_processor(processor)trace.set_tracer_provider(provider)# Configure your Celery appapp = Celery("tasks", broker="redis://localhost:6379/0")# Optional: Configure result backendapp.conf.result_backend = "redis://localhost:6379/0"@app.taskdef process_data(data_id):"""Example task that processes data."""# Your task logic hereprint(f"Processing data ID: {data_id}")# Simulate some workimport timetime.sleep(1)return {"status": "completed", "data_id": data_id}@app.taskdef send_email(recipient, subject, body):"""Example task for sending emails."""# Your email sending logic hereprint(f"Sending email to {recipient}: {subject}")return {"status": "sent", "recipient": recipient}For applications that produce Celery tasks (not workers), you can use simpler instrumentation:
from opentelemetry import configure_oncefrom opentelemetry.instrumentation.celery import CeleryInstrumentorfrom celery import Celeryimport os# Initialize OpenTelemetryconfigure_once()CeleryInstrumentor().instrument()# Configure Celery appapp = Celery("tasks", broker="redis://localhost:6379/0")# Example: Enqueue tasksdef enqueue_tasks():"""Example function to enqueue tasks."""result1 = app.send_task('tasks.process_data', args=[123])result2 = app.send_task('tasks.send_email',args=["user@example.com", "Welcome", "Hello!"])return [result1.id, result2.id]For applications that both produce and consume tasks:
from opentelemetry.sdk.resources import Resourcefrom opentelemetry import trace, configure_oncefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.semconv.resource import ResourceAttributesfrom opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporterfrom opentelemetry.instrumentation.celery import CeleryInstrumentorfrom celery import Celeryfrom celery.signals import worker_process_initimport os# Initialize tracing for producer/web appconfigure_once()CeleryInstrumentor().instrument()@worker_process_init.connect(weak=False)def init_worker_tracing(*args, **kwargs):"""Initialize tracing specifically for worker processes."""# Reinitialize for worker contextresource = Resource(attributes={ResourceAttributes.SERVICE_NAME: os.getenv('OTEL_SERVICE_NAME', 'celery-worker'),ResourceAttributes.SERVICE_VERSION: "1.0.0",ResourceAttributes.DEPLOYMENT_ENVIRONMENT: os.getenv('DEPLOYMENT_ENV', 'production'),})provider = TracerProvider(resource=resource)processor = BatchSpanProcessor(OTLPSpanExporter())provider.add_span_processor(processor)trace.set_tracer_provider(provider)app = Celery("tasks", broker="redis://localhost:6379/0")@app.taskdef complex_task(param1, param2):"""Example of a complex task with custom tracing."""tracer = trace.get_tracer(__name__)with tracer.start_as_current_span("complex_task_processing") as span:span.set_attribute("task.param1", param1)span.set_attribute("task.param2", param2)# Your task logic hereresult = param1 + param2span.set_attribute("task.result", result)return result -
Start Your Celery Workers
Start your Celery workers with the instrumentation enabled:
# Start a single workercelery -A your_app_name worker --loglevel=info# Start multiple workerscelery -A your_app_name worker --loglevel=info --concurrency=4# Start worker with specific queuecelery -A your_app_name worker --loglevel=info --queue=high_priority -
Test Task Execution
Test your setup by enqueuing some tasks:
# In a Python shell or separate scriptfrom your_app_name import app# Enqueue tasksresult1 = app.send_task('tasks.process_data', args=[456])result2 = app.send_task('tasks.send_email',args=["test@example.com", "Test", "Testing tracing"])print(f"Task IDs: {result1.id}, {result2.id}")
Understanding Celery Tracing
What Gets Traced
When you use OpenTelemetry with Celery, the following operations are automatically traced:
- Task Execution: Start, duration, and completion of tasks
- Task Routing: Queue selection and message routing
- Message Broker Operations: Publishing and consuming messages
- Task Retries: Retry attempts and failure handling
- Result Backend: Result storage and retrieval operations
Trace Context Propagation
OpenTelemetry automatically propagates trace context between:
- Producers and Workers: Tasks maintain trace context across process boundaries
- Nested Tasks: Child tasks inherit parent trace context
- Chain Operations: Task chains maintain continuous tracing
- Callback Tasks: Success and failure callbacks preserve context
Custom Attributes and Spans
Add custom attributes to enhance observability:
from opentelemetry import trace
@app.taskdef enhanced_task(user_id, action_type): """Task with custom tracing attributes.""" tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("enhanced_task_execution") as span: # Add custom attributes span.set_attribute("user.id", user_id) span.set_attribute("action.type", action_type) span.set_attribute("task.priority", "high")
# Your task logic try: result = perform_action(user_id, action_type) span.set_attribute("task.status", "success") span.set_attribute("task.result_count", len(result)) return result except Exception as e: span.set_attribute("task.status", "error") span.set_attribute("error.message", str(e)) raiseAdvanced Configuration
Sampling Configuration
Control trace sampling for high-volume task processing:
# Production: Sample 10% of tracesexport OTEL_TRACES_SAMPLER="traceidratio"export OTEL_TRACES_SAMPLER_ARG="0.1"
# Development: Sample all tracesexport OTEL_TRACES_SAMPLER="always_on"Resource Attributes
Add comprehensive service metadata:
export OTEL_RESOURCE_ATTRIBUTES="service.name=task-processor,service.version=2.1.0,deployment.environment=production,team=backend,service.instance.id=worker-001"Broker-Specific Configuration
app = Celery("tasks", broker="redis://localhost:6379/0")app.conf.update( result_backend="redis://localhost:6379/0", task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True,)app = Celery("tasks", broker="amqp://guest@localhost//")app.conf.update( result_backend="rpc://", task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True,)Verification
-
Check Worker Startup
When starting Celery workers, you should see OpenTelemetry initialization messages in the console output.
-
Generate Task Load
Create some test tasks to generate telemetry data:
# Enqueue various types of tasksfor i in range(10):app.send_task('tasks.process_data', args=[i])app.send_task('tasks.send_email', args=[f"user{i}@test.com", f"Subject {i}", "Body"]) -
Monitor Worker Logs
Check that workers are processing tasks successfully:
# Monitor worker outputtail -f celery_worker.log# Or check journald if using systemdjournalctl -u celery-worker -f -
Verify Traces in Last9
Log into your Last9 account and check that Celery traces are being received in the Traces dashboard.
Look for:
- Task execution spans with timing information
- Message broker operations
- Task retry attempts and error traces
- Cross-service trace propagation
Troubleshooting
Common Issues
Tasks Not Being Traced
Ensure the worker process initialization is correct:
# Make sure this is called in worker processes@worker_process_init.connect(weak=False)def init_celery_tracing(*args, **kwargs): CeleryInstrumentor().instrument() # ... rest of initializationMissing Trace Context
Verify environment variables are set correctly:
env | grep OTEL_Worker Process Crashes
Check for threading issues with BatchSpanProcessor:
# View detailed worker logscelery -A your_app_name worker --loglevel=debugPerformance Considerations
- Sampling: Use appropriate sampling rates for high-volume task processing
- Batch Processing: BatchSpanProcessor is recommended for performance
- Resource Limits: Monitor memory usage in workers with tracing enabled
- Queue Management: Consider separate queues for traced vs non-traced tasks
Best Practices
- Service Naming: Use descriptive names that distinguish between producers and workers
- Environment Segregation: Use different service names per environment
- Task Categorization: Add meaningful attributes to distinguish task types
- Error Handling: Implement proper exception handling with span status updates
- Monitoring: Set up alerts for task failure rates and execution times
Need Help?
If you encounter any issues or have questions:
- Join our Discord community for real-time support
- Contact our support team at support@last9.io