Skip to content
Last9
Book demo

Karafka

Use OpenTelemetry to instrument your Karafka application and send telemetry data to Last9

Use OpenTelemetry to instrument your Karafka application and send telemetry data to Last9. You can either run OpenTelemetry Collector or send the telemetry directly from the application to Last9.

Note: This integration should be used if you are using Karafka outside of Ruby on Rails. If you are using it inside a Ruby on Rails application, then use the Ruby on Rails integration.

Install OpenTelemetry Packages

Install the following gems.

gem 'opentelemetry-sdk'
gem 'opentelemetry-exporter-otlp'
gem 'opentelemetry-instrumentation-all'

Set the Environment Variables

OTEL_SERVICE_NAME=<your-app-name>
OTEL_EXPORTER_OTLP_ENDPOINT={{ .Logs.WriteURL }}
OTEL_EXPORTER_OTLP_HEADERS="Authorization={{ .Logs.AuthValue }}"
OTEL_TRACES_EXPORTER=otlp
OTEL_RESOURCE_ATTRIBUTES="deployment.environment=local"
OTEL_LOG_LEVEL=error

Note: In case you are running an OpenTelemetry Collector, replace the OTEL_EXPORTER_OTLP_ENDPOINT with the collector URL.

Instrument the Karafka application

Create a file lib/otel_setup.rb, add the following code to instrument your application:

require 'opentelemetry/sdk'
require 'opentelemetry/exporter/otlp'
require 'opentelemetry/instrumentation/all'
class OtelSetup
def initialize
@otel_exporter = OpenTelemetry::Exporter::OTLP::Exporter.new
@processor = OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.new(@otel_exporter)
end
def process
OpenTelemetry::SDK.configure do |c|
# Exporter and Processor configuration
c.add_span_processor(@processor) # Created above this SDK.configure block
# Resource configuration
c.resource = OpenTelemetry::SDK::Resources::Resource.create({
OpenTelemetry::SemanticConventions::Resource::DEPLOYMENT_ENVIRONMENT => "staging" # Change to deployment environment
})
c.use_all() # enables all instrumentation!
end
end
end

Include this file in your application setup as early as possible, immediately after bundler is setup or eager loading is done.

ENV['KARAFKA_ENV'] ||= 'development'
Bundler.require(:default, ENV['KARAFKA_ENV'])
# Zeitwerk custom loader for loading the app components before the whole
# Karafka framework configuration
APP_LOADER = Zeitwerk::Loader.new
%w[
lib
app/consumers
].each(&APP_LOADER.method(:push_dir))
APP_LOADER.setup
APP_LOADER.eager_load
# Setup OpenTelemetry instrumentation
OtelSetup.new.process
## Rest of the code

This code snippet configures the OpenTelemetry SDK to use the OTLP exporter and enables instrumentation for Karafka application.

Tracking Producer Errors

Karafka’s producer (WaterDrop) emits an error.occurred event for delivery failures and other producer-side errors. These errors are raised inside rdkafka’s polling thread, so they have no parent OpenTelemetry context and are not picked up by the default auto-instrumentation. Subscribe to the event and create a root span with record_exception to send the full error (class, message, and stacktrace) to Last9 — it appears in the Exceptions dashboard alongside the rest of the service’s errors.

Karafka.producer.monitor.subscribe("error.occurred") do |event|
begin
error = event[:error]
type = event[:type]
producer_id = event[:producer_id]
topic = event[:topic]
partition = event[:partition]
OpenTelemetry.tracer_provider.tracer("karafka.producer").in_span(
"kafka.producer.error",
kind: :internal
) do |span|
span.status = OpenTelemetry::Trace::Status.error(error.message)
span.set_attribute("messaging.system", "kafka")
span.set_attribute("messaging.operation", "publish")
span.set_attribute("kafka.error.type", type.to_s)
span.set_attribute("kafka.producer_id", producer_id.to_s) if producer_id
span.set_attribute("kafka.topic", topic.to_s) if topic
span.set_attribute("kafka.partition", partition.to_i) if partition
span.record_exception(error)
end
rescue StandardError => e
Rails.logger.error("Kafka producer OTel error: #{e.message}")
end
end

A few things worth knowing about this pattern:

  • Root span, no parent: rdkafka fires error.occurred from its own polling thread, so there is no active OpenTelemetry context to attach to. Creating a fresh top-level span is the correct approach.
  • kind: :internal: at the point the subscriber runs, the produce attempt has already failed. The span represents observing the error, not an active publish operation, so :internal is more accurate than :producer.
  • record_exception(error): serializes the exception class, message, and full backtrace as a span event. This is what populates Last9’s Exceptions dashboard — the equivalent of an error-tracker notify call.
  • Defensive rescue: keeps any instrumentation error from disrupting the producer thread. The producer should never crash because of telemetry.

If you also want a metric for alerting on error spikes (counts per minute or topic), emit a counter increment alongside the span — spans give you per-error context, while a counter is the right primitive for “did this exceed N in the last 5 minutes?”

Run the Application

bundle exec karafka s

The application will start sending traces to Last9.

Find sample code example of a Standalone Karafka application with OpenTelemetry here.


Troubleshooting

Please get in touch with us on Discord or Email if you have any questions.