Karafka
Use OpenTelemetry to instrument your Karafka application and send telemetry data to Last9
Use OpenTelemetry to instrument your Karafka application and send telemetry data to Last9. You can either run OpenTelemetry Collector or send the telemetry directly from the application to Last9.
Note: This integration should be used if you are using Karafka outside of Ruby on Rails. If you are using it inside a Ruby on Rails application, then use the Ruby on Rails integration.
Install OpenTelemetry Packages
Install the following gems.
gem 'opentelemetry-sdk'gem 'opentelemetry-exporter-otlp'gem 'opentelemetry-instrumentation-all'Set the Environment Variables
OTEL_SERVICE_NAME=<your-app-name>OTEL_EXPORTER_OTLP_ENDPOINT={{ .Logs.WriteURL }}OTEL_EXPORTER_OTLP_HEADERS="Authorization={{ .Logs.AuthValue }}"OTEL_TRACES_EXPORTER=otlpOTEL_RESOURCE_ATTRIBUTES="deployment.environment=local"OTEL_LOG_LEVEL=errorNote: In case you are running an OpenTelemetry Collector, replace the
OTEL_EXPORTER_OTLP_ENDPOINTwith the collector URL.
Instrument the Karafka application
Create a file lib/otel_setup.rb, add the following code to instrument your application:
require 'opentelemetry/sdk'require 'opentelemetry/exporter/otlp'require 'opentelemetry/instrumentation/all'
class OtelSetup def initialize @otel_exporter = OpenTelemetry::Exporter::OTLP::Exporter.new @processor = OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.new(@otel_exporter) end
def process OpenTelemetry::SDK.configure do |c| # Exporter and Processor configuration c.add_span_processor(@processor) # Created above this SDK.configure block
# Resource configuration c.resource = OpenTelemetry::SDK::Resources::Resource.create({ OpenTelemetry::SemanticConventions::Resource::DEPLOYMENT_ENVIRONMENT => "staging" # Change to deployment environment })
c.use_all() # enables all instrumentation! end endendInclude this file in your application setup as early as possible, immediately after bundler is setup or eager loading is done.
ENV['KARAFKA_ENV'] ||= 'development'Bundler.require(:default, ENV['KARAFKA_ENV'])
# Zeitwerk custom loader for loading the app components before the whole# Karafka framework configurationAPP_LOADER = Zeitwerk::Loader.new
%w[ lib app/consumers].each(&APP_LOADER.method(:push_dir))
APP_LOADER.setupAPP_LOADER.eager_load
# Setup OpenTelemetry instrumentationOtelSetup.new.process
## Rest of the codeThis code snippet configures the OpenTelemetry SDK to use the OTLP exporter and enables instrumentation for Karafka application.
Tracking Producer Errors
Karafka’s producer (WaterDrop) emits an error.occurred event for delivery failures and other producer-side errors. These errors are raised inside rdkafka’s polling thread, so they have no parent OpenTelemetry context and are not picked up by the default auto-instrumentation. Subscribe to the event and create a root span with record_exception to send the full error (class, message, and stacktrace) to Last9 — it appears in the Exceptions dashboard alongside the rest of the service’s errors.
Karafka.producer.monitor.subscribe("error.occurred") do |event| begin error = event[:error] type = event[:type] producer_id = event[:producer_id] topic = event[:topic] partition = event[:partition]
OpenTelemetry.tracer_provider.tracer("karafka.producer").in_span( "kafka.producer.error", kind: :internal ) do |span| span.status = OpenTelemetry::Trace::Status.error(error.message) span.set_attribute("messaging.system", "kafka") span.set_attribute("messaging.operation", "publish") span.set_attribute("kafka.error.type", type.to_s) span.set_attribute("kafka.producer_id", producer_id.to_s) if producer_id span.set_attribute("kafka.topic", topic.to_s) if topic span.set_attribute("kafka.partition", partition.to_i) if partition span.record_exception(error) end rescue StandardError => e Rails.logger.error("Kafka producer OTel error: #{e.message}") endendA few things worth knowing about this pattern:
- Root span, no parent: rdkafka fires
error.occurredfrom its own polling thread, so there is no active OpenTelemetry context to attach to. Creating a fresh top-level span is the correct approach. kind: :internal: at the point the subscriber runs, the produce attempt has already failed. The span represents observing the error, not an active publish operation, so:internalis more accurate than:producer.record_exception(error): serializes the exception class, message, and full backtrace as a span event. This is what populates Last9’s Exceptions dashboard — the equivalent of an error-trackernotifycall.- Defensive
rescue: keeps any instrumentation error from disrupting the producer thread. The producer should never crash because of telemetry.
If you also want a metric for alerting on error spikes (counts per minute or topic), emit a counter increment alongside the span — spans give you per-error context, while a counter is the right primitive for “did this exceed N in the last 5 minutes?”
Run the Application
bundle exec karafka sThe application will start sending traces to Last9.
Find sample code example of a Standalone Karafka application with OpenTelemetry here.
Troubleshooting
Please get in touch with us on Discord or Email if you have any questions.