Kafka
Instrument Kafka producers and consumers in Go with the Last9 Go Agent for automatic distributed tracing and metrics
Use the Last9 Go Agent to instrument Kafka producers and consumers with automatic distributed tracing and metrics. Trace context is propagated from producer to consumer automatically — every consumed message carries its producer’s span as the parent.
The integration uses IBM Sarama (formerly Shopify Sarama), the recommended Kafka client for Go.
Prerequisites
- Go 1.22 or higher
- IBM Sarama (
github.com/IBM/sarama) - Last9 account with OTLP credentials
Installation
-
Install the Last9 Go Agent
go get github.com/last9/go-agent -
Set Environment Variables
export OTEL_SERVICE_NAME="your-service"export OTEL_EXPORTER_OTLP_ENDPOINT="$last9_otlp_endpoint"export OTEL_EXPORTER_OTLP_HEADERS="Authorization=$last9_otlp_auth_header"export OTEL_TRACES_SAMPLER="always_on"export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=production" -
Instrument your producer
package mainimport ("context""log""github.com/IBM/sarama""github.com/last9/go-agent"kafkaagent "github.com/last9/go-agent/integrations/kafka")func main() {if err := agent.Start(); err != nil {log.Fatalf("failed to start agent: %v", err)}defer agent.Shutdown()producer, err := kafkaagent.NewSyncProducer(kafkaagent.ProducerConfig{Brokers: []string{"localhost:9092"},})if err != nil {log.Fatal(err)}defer producer.Close()ctx := context.Background()partition, offset, err := producer.SendMessage(ctx, &sarama.ProducerMessage{Topic: "orders",Value: sarama.StringEncoder(`{"order_id": "abc-123"}`),})if err != nil {log.Fatal(err)}log.Printf("sent to partition %d at offset %d", partition, offset)} -
Instrument your consumer
Implement the
sarama.ConsumerGroupHandlerinterface, then wrap it withkafkaagent.WrapConsumerGroupHandler:package mainimport ("context""log""github.com/IBM/sarama""github.com/last9/go-agent"kafkaagent "github.com/last9/go-agent/integrations/kafka")type OrderHandler struct{}func (h *OrderHandler) Setup(session sarama.ConsumerGroupSession) error { return nil }func (h *OrderHandler) Cleanup(session sarama.ConsumerGroupSession) error { return nil }func (h *OrderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {// msg context carries the producer's trace as parent spanctx := session.Context()log.Printf("received: topic=%s partition=%d offset=%d value=%s",msg.Topic, msg.Partition, msg.Offset, string(msg.Value))processOrder(ctx, msg)session.MarkMessage(msg, "")}return nil}func main() {if err := agent.Start(); err != nil {log.Fatalf("failed to start agent: %v", err)}defer agent.Shutdown()consumer, err := kafkaagent.NewConsumerGroup(kafkaagent.ConsumerConfig{Brokers: []string{"localhost:9092"},GroupID: "order-processors",})if err != nil {log.Fatal(err)}defer consumer.Close()// Wrap your handler — trace context from producers is propagated automaticallyhandler := kafkaagent.WrapConsumerGroupHandler(&OrderHandler{})ctx := context.Background()for {if err := consumer.Consume(ctx, []string{"orders"}, handler); err != nil {log.Printf("consumer error: %v", err)}}}func processOrder(ctx context.Context, msg *sarama.ConsumerMessage) {// ctx has the producer's trace as parent — child spans connect automatically}
Custom Sarama Configuration
Pass your own *sarama.Config to override defaults:
cfg := sarama.NewConfig()cfg.Producer.Return.Successes = truecfg.Net.TLS.Enable = truecfg.Version = sarama.V2_6_0_0
producer, err := kafkaagent.NewSyncProducer(kafkaagent.ProducerConfig{ Brokers: []string{"kafka.example.com:9092"}, Config: cfg,})What Gets Traced Automatically
| Signal | What’s captured |
|---|---|
| Traces | Each SendMessage call: topic, partition, offset, message key |
| Traces | Each consumed message: topic, partition, offset, consumer group |
| Traces | Distributed context propagation from producer span to consumer span |
| Metrics | messaging.kafka.messages.sent — messages produced |
| Metrics | messaging.kafka.messages.received — messages consumed |
| Metrics | messaging.kafka.messages.errors — producer errors |
| Metrics | messaging.kafka.receive.errors — consumer errors |
| Metrics | messaging.kafka.send.duration — producer send latency histogram |
| Metrics | messaging.kafka.process.duration — consumer processing time histogram |
| Metrics | messaging.kafka.message.size — message size distribution |
View Traces and Metrics
After running your application, navigate to Trace Explorer and Metrics Explorer in Last9 to view your telemetry data.
Troubleshooting
Please get in touch with us on Discord or Email if you have any questions.