Skip to content
Last9 named a Gartner Cool Vendor in AI for SRE Observability for 2025! Read more →
Last9

Elastic Logstash

Forward logs from Logstash to Last9 using OpenTelemetry Collector with comprehensive filtering, transformation, and multi-input support

Forward logs from Elastic Logstash to Last9 using OpenTelemetry Collector for centralized log analysis and monitoring.

Overview

Elastic Logstash is a server-side data processing pipeline that ingests data from multiple sources, transforms it, and sends it to your favorite “stash.” This integration enables forwarding processed logs from Logstash to Last9 through OpenTelemetry Collector, providing comprehensive log analysis capabilities while maintaining your existing Logstash processing pipelines.

Prerequisites

  • Logstash 7.x or 8.x installed
  • OpenTelemetry Collector (contrib distribution)
  • Last9 account with OTLP endpoint configured
  • Network connectivity between Logstash and OpenTelemetry Collector

Installation

  1. Install OpenTelemetry Collector

    Install OpenTelemetry Collector with contrib distribution for tcplog receiver support:

    # Download and install RPM package
    wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.125.0/otelcol-contrib_0.125.0_linux_amd64.rpm
    sudo rpm -ivh otelcol-contrib_0.125.0_linux_amd64.rpm
  2. Configure OpenTelemetry Collector

    Create collector configuration (/etc/otel-contrib-collector/config.yaml):

    receivers:
    # TCP receiver for Logstash logs
    tcplog/logstash:
    listen_address: "0.0.0.0:2255"
    encoding: utf-8
    resource:
    service.name: "logstash-forwarder"
    # Host metrics collection
    hostmetrics:
    collection_interval: 60s
    scrapers:
    cpu:
    metrics:
    system.cpu.logical.count:
    enabled: true
    memory:
    metrics:
    system.memory.utilization:
    enabled: true
    system.memory.limit:
    enabled: true
    load:
    disk:
    filesystem:
    metrics:
    system.filesystem.utilization:
    enabled: true
    network:
    paging:
    processes:
    process:
    mute_process_user_error: true
    mute_process_io_error: true
    mute_process_exe_error: true
    metrics:
    process.cpu.utilization:
    enabled: true
    process.memory.utilization:
    enabled: true
    process.threads:
    enabled: true
    process.paging.faults:
    enabled: true
    # File receiver for Logstash logs (alternative)
    filelog/logstash:
    include:
    - /var/log/logstash/*.log
    - /usr/share/logstash/logs/*.log
    start_at: end
    operators:
    - type: regex_parser
    regex: '^\[(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2},\d{3})\]\[(?P<log_level>\w+)\s*\]\[(?P<logger>[^\]]+)\]\s*(?P<message>.*)'
    timestamp:
    parse_from: attributes.timestamp
    layout: "%Y-%m-%dT%H:%M:%S,%L"
    severity:
    parse_from: attributes.log_level
    processors:
    batch:
    timeout: 5s
    send_batch_size: 10000
    send_batch_max_size: 10000
    memory_limiter:
    limit_mib: 512
    resourcedetection/system:
    detectors: ["system"]
    system:
    hostname_sources: ["os"]
    resourcedetection/cloud:
    detectors: ["aws", "gcp", "azure"]
    transform/logstash:
    log_statements:
    - context: log
    statements:
    # Parse JSON logs from Logstash
    - merge_maps(attributes, ParseJSON(body), "insert") where IsMatch(body, "^\\{.*\\}$")
    # Extract timestamp if present in JSON
    - set(time_unix_nano, UnixMilli(Int(attributes["@timestamp"]))) where attributes["@timestamp"] != nil
    - delete_key(attributes, "@timestamp")
    # Set log level from Logstash fields
    - set(severity_text, attributes["level"]) where attributes["level"] != nil
    - set(severity_text, attributes["log_level"]) where attributes["log_level"] != nil
    # Extract message field
    - set(body, attributes["message"]) where attributes["message"] != nil and attributes["message"] != ""
    # Add Logstash-specific attributes
    - set(attributes["logstash.version"], attributes["version"]) where attributes["version"] != nil
    - set(attributes["logstash.pipeline"], attributes["pipeline"]) where attributes["pipeline"] != nil
    - set(attributes["logstash.input"], attributes["input"]) where attributes["input"] != nil
    # Clean up temporary attributes
    - delete_key(attributes, "version")
    - delete_key(attributes, "message")
    exporters:
    otlp/last9:
    endpoint: ${LAST9_OTLP_ENDPOINT}
    headers:
    Authorization: ${LAST9_OTLP_AUTH_HEADER}
    compression: gzip
    retry_on_failure:
    enabled: true
    initial_interval: 5s
    max_interval: 30s
    debug:
    verbosity: basic
    sampling_initial: 5
    sampling_thereafter: 1000
    prometheus:
    endpoint: "0.0.0.0:8889"
    namespace: "logstash_integration"
    service:
    extensions: [health_check]
    pipelines:
    logs:
    receivers: [tcplog/logstash, filelog/logstash]
    processors:
    [
    memory_limiter,
    resourcedetection/system,
    resourcedetection/cloud,
    transform/logstash,
    batch,
    ]
    exporters: [otlp/last9]
    metrics:
    receivers: [hostmetrics]
    processors:
    [
    memory_limiter,
    resourcedetection/system,
    resourcedetection/cloud,
    batch,
    ]
    exporters: [otlp/last9]
    telemetry:
    logs:
    level: info
    metrics:
    address: 0.0.0.0:8888
    extensions:
    health_check:
    endpoint: 0.0.0.0:13133

    Start the OpenTelemetry Collector:

    # Start collector service
    sudo systemctl enable otelcol-contrib
    sudo systemctl start otelcol-contrib
    # Check status
    sudo systemctl status otelcol-contrib
    # View logs
    sudo journalctl -u otelcol-contrib -f
  3. Configure Logstash Pipeline

    Create or update your Logstash pipeline configuration:

    # logstash.conf
    input {
    # Your existing inputs
    beats {
    port => 5044
    }
    file {
    path => "/var/log/apache2/access.log"
    start_position => "beginning"
    type => "apache-access"
    }
    }
    filter {
    # Your existing filters
    if [type] == "apache-access" {
    grok {
    match => {
    "message" => "%{COMBINEDAPACHELOG}"
    }
    }
    date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
    }
    # Add metadata for Last9
    mutate {
    add_field => {
    "[@metadata][pipeline]" => "logstash-to-last9"
    "environment" => "production"
    "logstash_version" => "${LOGSTASH_VERSION}"
    }
    }
    }
    output {
    # Forward to OpenTelemetry Collector
    tcp {
    codec => json_lines
    host => "localhost"
    port => 2255
    reconnect_interval => 10
    }
    # Optional: Keep existing outputs
    # elasticsearch {
    # hosts => ["elasticsearch:9200"]
    # }
    }
  4. Start Logstash

    Start Logstash with your configuration:

    # Test configuration
    sudo /usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/logstash.conf
    # Start Logstash
    sudo systemctl enable logstash
    sudo systemctl start logstash
    # Check status
    sudo systemctl status logstash
    # Monitor logs
    sudo tail -f /var/log/logstash/logstash-plain.log

Advanced Configuration

Performance Tuning

Optimize Logstash for high-throughput log processing:

# High-performance Logstash configuration
input {
beats {
port => 5044
# Increase worker threads
threads => 4
# Enable batching
client_inactivity_timeout => 60
}
}
filter {
# Optimize grok patterns
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
# Use named captures for better performance
named_captures_only => true
# Enable pattern compilation caching
patterns_files_glob => "/etc/logstash/patterns/*"
}
# Batch mutations
mutate {
add_field => {
"processed_at" => "%{@timestamp}"
"pipeline_id" => "high-throughput"
}
# Remove unnecessary fields early
remove_field => ["agent", "ecs", "host"]
}
}
output {
tcp {
codec => json_lines
host => "localhost"
port => 2255
# Optimize TCP settings
reconnect_interval => 1
workers => 2
}
}

Configure Logstash JVM settings (/etc/logstash/jvm.options):

# Heap size (adjust based on available memory)
-Xms2g
-Xmx2g
# Garbage collection settings
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
# JIT optimization
-XX:+TieredCompilation
-XX:TieredStopAtLevel=1
# Performance monitoring
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps

Security Configuration

Secure log forwarding with TLS:

# OpenTelemetry Collector with TLS
receivers:
tcplog/logstash_secure:
listen_address: "0.0.0.0:2256"
tls:
cert_file: /etc/ssl/certs/otel-collector.crt
key_file: /etc/ssl/private/otel-collector.key
ca_file: /etc/ssl/certs/ca.crt
client_ca_file: /etc/ssl/certs/ca.crt
resource:
service.name: "secure-logstash-forwarder"

Logstash TLS output configuration:

output {
tcp {
codec => json_lines
host => "secure-collector.internal"
port => 2256
# Enable TLS
ssl_enable => true
ssl_cert => "/etc/logstash/ssl/client.crt"
ssl_key => "/etc/logstash/ssl/client.key"
ssl_certificate_authorities => ["/etc/logstash/ssl/ca.crt"]
ssl_verify => true
}
}

Log Enrichment and Filtering

Add comprehensive log processing:

filter {
# IP geolocation
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/geoip/GeoLite2-City.mmdb"
}
# User agent parsing
useragent {
source => "useragent"
target => "ua"
}
# Log level normalization
if [log_level] {
translate {
field => "log_level"
destination => "normalized_level"
dictionary => {
"TRACE" => "trace"
"DEBUG" => "debug"
"INFO" => "info"
"INFORMATION" => "info"
"WARN" => "warn"
"WARNING" => "warn"
"ERROR" => "error"
"FATAL" => "fatal"
"CRITICAL" => "fatal"
}
fallback => "unknown"
}
}
# Sensitive data filtering
mutate {
# Remove sensitive fields
remove_field => ["password", "secret", "token", "api_key"]
# Mask credit card numbers
gsub => [
"message", "\d{4}-\d{4}-\d{4}-\d{4}", "****-****-****-****"
]
}
# Add tags based on content
if [message] =~ /ERROR|Exception|error/ {
mutate {
add_tag => ["error", "needs_attention"]
}
}
if [response] and [response] >= 400 {
mutate {
add_tag => ["http_error"]
}
}
}

Troubleshooting

Common Issues

Logstash not connecting to OpenTelemetry Collector:

# Check network connectivity
telnet localhost 2255
# Verify collector is listening
sudo netstat -tlnp | grep :2255
# Check collector logs
sudo journalctl -u otelcol-contrib -f

Log parsing failures:

# Check Logstash logs for grok failures
grep "_grokparsefailure" /var/log/logstash/logstash-plain.log
# Test grok patterns
sudo /usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/logstash.conf

High memory usage:

# Monitor Logstash JVM
jstat -gc $(pgrep -f logstash) 5s
# Check dead letter queue
ls -la /var/log/logstash/dead_letter_queue/
# Adjust batch sizes
# In logstash.conf, reduce batch_size and flush_interval

Missing logs in Last9:

# Verify collector receiving logs
curl http://localhost:8888/metrics | grep tcplog
# Check Last9 endpoint connectivity
curl -v $last9_otlp_endpoint
# Enable debug output in collector
# Set debug exporter in service.pipelines.logs.exporters