Skip to content
Last9 named a Gartner Cool Vendor in AI for SRE Observability for 2025! Read more →
Last9

Celery

Instrument Celery task queue applications with OpenTelemetry for distributed tracing and task monitoring

Use OpenTelemetry to instrument your Celery task queue application and send telemetry data to Last9. This integration provides automatic instrumentation for task execution, message brokers, and distributed task monitoring across workers.

Celery is a distributed task queue system that allows you to run background tasks asynchronously. With OpenTelemetry instrumentation, you can track task execution, performance, and failures across your distributed worker infrastructure.

Prerequisites

Before setting up Celery monitoring, ensure you have:

  • Python 3.7 or higher installed
  • Celery application with tasks defined
  • Message broker (Redis, RabbitMQ, or others) configured
  • Last9 account with integration credentials
  • pip package manager available
  1. Install OpenTelemetry Packages

    Install the required OpenTelemetry packages for Celery instrumentation:

    pip install opentelemetry-instrumentation-celery opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp

    You can also add these packages to your requirements.txt file:

    opentelemetry-instrumentation-celery
    opentelemetry-api
    opentelemetry-sdk
    opentelemetry-exporter-otlp
  2. Set Environment Variables

    Configure OpenTelemetry environment variables for your Celery application:

    export OTEL_SERVICE_NAME=<service_name>
    export OTEL_EXPORTER_OTLP_ENDPOINT=$last9_otlp_endpoint
    export OTEL_EXPORTER_OTLP_HEADERS="Authorization=$last9_otlp_auth_header"
    export OTEL_TRACES_EXPORTER=otlp
    export OTEL_TRACES_SAMPLER="always_on"
    export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=production"
    export OTEL_LOG_LEVEL=error

    Replace <service_name> with your Celery service name (e.g., task-processor, email-worker).

  3. Instrument Your Celery Application

    The key to Celery tracing is proper initialization timing. Tracing and instrumentation must be initialized after the Celery worker process is initialized for threading components like BatchSpanProcessor to work correctly.

    Here’s how to properly instrument Celery workers using the worker_process_init signal:

    from opentelemetry.sdk.resources import Resource
    from opentelemetry import trace
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    from opentelemetry.semconv.resource import ResourceAttributes
    from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
    from opentelemetry.instrumentation.celery import CeleryInstrumentor
    from celery import Celery
    from celery.signals import worker_process_init
    import os
    @worker_process_init.connect(weak=False)
    def init_celery_tracing(*args, **kwargs):
    """Initialize OpenTelemetry tracing for Celery worker processes."""
    CeleryInstrumentor().instrument()
    resource = Resource(attributes={
    ResourceAttributes.SERVICE_NAME: os.getenv('OTEL_SERVICE_NAME', 'celery-worker'),
    ResourceAttributes.SERVICE_VERSION: "1.0.0",
    ResourceAttributes.DEPLOYMENT_ENVIRONMENT: os.getenv('DEPLOYMENT_ENV', 'production'),
    })
    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(OTLPSpanExporter())
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)
    # Configure your Celery app
    app = Celery("tasks", broker="redis://localhost:6379/0")
    # Optional: Configure result backend
    app.conf.result_backend = "redis://localhost:6379/0"
    @app.task
    def process_data(data_id):
    """Example task that processes data."""
    # Your task logic here
    print(f"Processing data ID: {data_id}")
    # Simulate some work
    import time
    time.sleep(1)
    return {"status": "completed", "data_id": data_id}
    @app.task
    def send_email(recipient, subject, body):
    """Example task for sending emails."""
    # Your email sending logic here
    print(f"Sending email to {recipient}: {subject}")
    return {"status": "sent", "recipient": recipient}
  4. Start Your Celery Workers

    Start your Celery workers with the instrumentation enabled:

    # Start a single worker
    celery -A your_app_name worker --loglevel=info
    # Start multiple workers
    celery -A your_app_name worker --loglevel=info --concurrency=4
    # Start worker with specific queue
    celery -A your_app_name worker --loglevel=info --queue=high_priority
  5. Test Task Execution

    Test your setup by enqueuing some tasks:

    # In a Python shell or separate script
    from your_app_name import app
    # Enqueue tasks
    result1 = app.send_task('tasks.process_data', args=[456])
    result2 = app.send_task('tasks.send_email',
    args=["test@example.com", "Test", "Testing tracing"])
    print(f"Task IDs: {result1.id}, {result2.id}")

Understanding Celery Tracing

What Gets Traced

When you use OpenTelemetry with Celery, the following operations are automatically traced:

  • Task Execution: Start, duration, and completion of tasks
  • Task Routing: Queue selection and message routing
  • Message Broker Operations: Publishing and consuming messages
  • Task Retries: Retry attempts and failure handling
  • Result Backend: Result storage and retrieval operations

Trace Context Propagation

OpenTelemetry automatically propagates trace context between:

  • Producers and Workers: Tasks maintain trace context across process boundaries
  • Nested Tasks: Child tasks inherit parent trace context
  • Chain Operations: Task chains maintain continuous tracing
  • Callback Tasks: Success and failure callbacks preserve context

Custom Attributes and Spans

Add custom attributes to enhance observability:

from opentelemetry import trace
@app.task
def enhanced_task(user_id, action_type):
"""Task with custom tracing attributes."""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("enhanced_task_execution") as span:
# Add custom attributes
span.set_attribute("user.id", user_id)
span.set_attribute("action.type", action_type)
span.set_attribute("task.priority", "high")
# Your task logic
try:
result = perform_action(user_id, action_type)
span.set_attribute("task.status", "success")
span.set_attribute("task.result_count", len(result))
return result
except Exception as e:
span.set_attribute("task.status", "error")
span.set_attribute("error.message", str(e))
raise

Advanced Configuration

Sampling Configuration

Control trace sampling for high-volume task processing:

# Production: Sample 10% of traces
export OTEL_TRACES_SAMPLER="traceidratio"
export OTEL_TRACES_SAMPLER_ARG="0.1"
# Development: Sample all traces
export OTEL_TRACES_SAMPLER="always_on"

Resource Attributes

Add comprehensive service metadata:

export OTEL_RESOURCE_ATTRIBUTES="service.name=task-processor,service.version=2.1.0,deployment.environment=production,team=backend,service.instance.id=worker-001"

Broker-Specific Configuration

app = Celery("tasks", broker="redis://localhost:6379/0")
app.conf.update(
result_backend="redis://localhost:6379/0",
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)

Verification

  1. Check Worker Startup

    When starting Celery workers, you should see OpenTelemetry initialization messages in the console output.

  2. Generate Task Load

    Create some test tasks to generate telemetry data:

    # Enqueue various types of tasks
    for i in range(10):
    app.send_task('tasks.process_data', args=[i])
    app.send_task('tasks.send_email', args=[f"user{i}@test.com", f"Subject {i}", "Body"])
  3. Monitor Worker Logs

    Check that workers are processing tasks successfully:

    # Monitor worker output
    tail -f celery_worker.log
    # Or check journald if using systemd
    journalctl -u celery-worker -f
  4. Verify Traces in Last9

    Log into your Last9 account and check that Celery traces are being received in the Traces dashboard.

    Look for:

    • Task execution spans with timing information
    • Message broker operations
    • Task retry attempts and error traces
    • Cross-service trace propagation

Troubleshooting

Common Issues

Tasks Not Being Traced

Ensure the worker process initialization is correct:

# Make sure this is called in worker processes
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
# ... rest of initialization

Missing Trace Context

Verify environment variables are set correctly:

env | grep OTEL_

Worker Process Crashes

Check for threading issues with BatchSpanProcessor:

# View detailed worker logs
celery -A your_app_name worker --loglevel=debug

Performance Considerations

  • Sampling: Use appropriate sampling rates for high-volume task processing
  • Batch Processing: BatchSpanProcessor is recommended for performance
  • Resource Limits: Monitor memory usage in workers with tracing enabled
  • Queue Management: Consider separate queues for traced vs non-traced tasks

Best Practices

  • Service Naming: Use descriptive names that distinguish between producers and workers
  • Environment Segregation: Use different service names per environment
  • Task Categorization: Add meaningful attributes to distinguish task types
  • Error Handling: Implement proper exception handling with span status updates
  • Monitoring: Set up alerts for task failure rates and execution times

Need Help?

If you encounter any issues or have questions: