Skip to content
Last9
Book demo

Kafka

Instrument Kafka producers and consumers in Go with the Last9 Go Agent for automatic distributed tracing and metrics

Use the Last9 Go Agent to instrument Kafka producers and consumers with automatic distributed tracing and metrics. Trace context is propagated from producer to consumer automatically — every consumed message carries its producer’s span as the parent.

The integration uses IBM Sarama (formerly Shopify Sarama), the recommended Kafka client for Go.

Prerequisites

  • Go 1.22 or higher
  • IBM Sarama (github.com/IBM/sarama)
  • Last9 account with OTLP credentials

Installation

  1. Install the Last9 Go Agent

    go get github.com/last9/go-agent
  2. Set Environment Variables

    export OTEL_SERVICE_NAME="your-service"
    export OTEL_EXPORTER_OTLP_ENDPOINT="$last9_otlp_endpoint"
    export OTEL_EXPORTER_OTLP_HEADERS="Authorization=$last9_otlp_auth_header"
    export OTEL_TRACES_SAMPLER="always_on"
    export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=production"
  3. Instrument your producer

    package main
    import (
    "context"
    "log"
    "github.com/IBM/sarama"
    "github.com/last9/go-agent"
    kafkaagent "github.com/last9/go-agent/integrations/kafka"
    )
    func main() {
    if err := agent.Start(); err != nil {
    log.Fatalf("failed to start agent: %v", err)
    }
    defer agent.Shutdown()
    producer, err := kafkaagent.NewSyncProducer(kafkaagent.ProducerConfig{
    Brokers: []string{"localhost:9092"},
    })
    if err != nil {
    log.Fatal(err)
    }
    defer producer.Close()
    ctx := context.Background()
    partition, offset, err := producer.SendMessage(ctx, &sarama.ProducerMessage{
    Topic: "orders",
    Value: sarama.StringEncoder(`{"order_id": "abc-123"}`),
    })
    if err != nil {
    log.Fatal(err)
    }
    log.Printf("sent to partition %d at offset %d", partition, offset)
    }
  4. Instrument your consumer

    Implement the sarama.ConsumerGroupHandler interface, then wrap it with kafkaagent.WrapConsumerGroupHandler:

    package main
    import (
    "context"
    "log"
    "github.com/IBM/sarama"
    "github.com/last9/go-agent"
    kafkaagent "github.com/last9/go-agent/integrations/kafka"
    )
    type OrderHandler struct{}
    func (h *OrderHandler) Setup(session sarama.ConsumerGroupSession) error { return nil }
    func (h *OrderHandler) Cleanup(session sarama.ConsumerGroupSession) error { return nil }
    func (h *OrderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
    // msg context carries the producer's trace as parent span
    ctx := session.Context()
    log.Printf("received: topic=%s partition=%d offset=%d value=%s",
    msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
    processOrder(ctx, msg)
    session.MarkMessage(msg, "")
    }
    return nil
    }
    func main() {
    if err := agent.Start(); err != nil {
    log.Fatalf("failed to start agent: %v", err)
    }
    defer agent.Shutdown()
    consumer, err := kafkaagent.NewConsumerGroup(kafkaagent.ConsumerConfig{
    Brokers: []string{"localhost:9092"},
    GroupID: "order-processors",
    })
    if err != nil {
    log.Fatal(err)
    }
    defer consumer.Close()
    // Wrap your handler — trace context from producers is propagated automatically
    handler := kafkaagent.WrapConsumerGroupHandler(&OrderHandler{})
    ctx := context.Background()
    for {
    if err := consumer.Consume(ctx, []string{"orders"}, handler); err != nil {
    log.Printf("consumer error: %v", err)
    }
    }
    }
    func processOrder(ctx context.Context, msg *sarama.ConsumerMessage) {
    // ctx has the producer's trace as parent — child spans connect automatically
    }

Custom Sarama Configuration

Pass your own *sarama.Config to override defaults:

cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
cfg.Net.TLS.Enable = true
cfg.Version = sarama.V2_6_0_0
producer, err := kafkaagent.NewSyncProducer(kafkaagent.ProducerConfig{
Brokers: []string{"kafka.example.com:9092"},
Config: cfg,
})

What Gets Traced Automatically

SignalWhat’s captured
TracesEach SendMessage call: topic, partition, offset, message key
TracesEach consumed message: topic, partition, offset, consumer group
TracesDistributed context propagation from producer span to consumer span
Metricsmessaging.kafka.messages.sent — messages produced
Metricsmessaging.kafka.messages.received — messages consumed
Metricsmessaging.kafka.messages.errors — producer errors
Metricsmessaging.kafka.receive.errors — consumer errors
Metricsmessaging.kafka.send.duration — producer send latency histogram
Metricsmessaging.kafka.process.duration — consumer processing time histogram
Metricsmessaging.kafka.message.size — message size distribution

View Traces and Metrics

After running your application, navigate to Trace Explorer and Metrics Explorer in Last9 to view your telemetry data.


Troubleshooting

Please get in touch with us on Discord or Email if you have any questions.