Elastic Logstash
Forward logs from Logstash to Last9 using OpenTelemetry Collector with comprehensive filtering, transformation, and multi-input support
Forward logs from Elastic Logstash to Last9 using OpenTelemetry Collector for centralized log analysis and monitoring.
Overview
Elastic Logstash is a server-side data processing pipeline that ingests data from multiple sources, transforms it, and sends it to your favorite “stash.” This integration enables forwarding processed logs from Logstash to Last9 through OpenTelemetry Collector, providing comprehensive log analysis capabilities while maintaining your existing Logstash processing pipelines.
Prerequisites
- Logstash 7.x or 8.x installed
- OpenTelemetry Collector (contrib distribution)
- Last9 account with OTLP endpoint configured
- Network connectivity between Logstash and OpenTelemetry Collector
Installation
Install OpenTelemetry Collector
Install OpenTelemetry Collector with contrib distribution for tcplog receiver support:
# Download and install RPM packagewget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.125.0/otelcol-contrib_0.125.0_linux_amd64.rpmsudo rpm -ivh otelcol-contrib_0.125.0_linux_amd64.rpm# Download and install DEB packagewget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.125.0/otelcol-contrib_0.125.0_linux_amd64.debsudo dpkg -i otelcol-contrib_0.125.0_linux_amd64.deb# docker-compose.yamlversion: "3.8"services:otel-collector:image: otel/opentelemetry-collector-contrib:latestcontainer_name: logstash-collectorvolumes:- ./otel-config.yaml:/etc/otel-collector/config.yaml- ./storage:/storageenvironment:- LAST9_OTLP_ENDPOINT=$last9_otlp_endpoint- LAST9_OTLP_AUTH_HEADER=$last9_otlp_auth_headerports:- "2255:2255" # Logstash TCP input- "8888:8888" # Collector metrics- "13133:13133" # Health checkrestart: unless-stoppedConfigure OpenTelemetry Collector
Create collector configuration (
/etc/otel-contrib-collector/config.yaml):receivers:# TCP receiver for Logstash logstcplog/logstash:listen_address: "0.0.0.0:2255"encoding: utf-8resource:service.name: "logstash-forwarder"# Host metrics collectionhostmetrics:collection_interval: 60sscrapers:cpu:metrics:system.cpu.logical.count:enabled: truememory:metrics:system.memory.utilization:enabled: truesystem.memory.limit:enabled: trueload:disk:filesystem:metrics:system.filesystem.utilization:enabled: truenetwork:paging:processes:process:mute_process_user_error: truemute_process_io_error: truemute_process_exe_error: truemetrics:process.cpu.utilization:enabled: trueprocess.memory.utilization:enabled: trueprocess.threads:enabled: trueprocess.paging.faults:enabled: true# File receiver for Logstash logs (alternative)filelog/logstash:include:- /var/log/logstash/*.log- /usr/share/logstash/logs/*.logstart_at: endoperators:- type: regex_parserregex: '^\[(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2},\d{3})\]\[(?P<log_level>\w+)\s*\]\[(?P<logger>[^\]]+)\]\s*(?P<message>.*)'timestamp:parse_from: attributes.timestamplayout: "%Y-%m-%dT%H:%M:%S,%L"severity:parse_from: attributes.log_levelprocessors:batch:timeout: 5ssend_batch_size: 10000send_batch_max_size: 10000memory_limiter:limit_mib: 512resourcedetection/system:detectors: ["system"]system:hostname_sources: ["os"]resourcedetection/cloud:detectors: ["aws", "gcp", "azure"]transform/logstash:log_statements:- context: logstatements:# Parse JSON logs from Logstash- merge_maps(attributes, ParseJSON(body), "insert") where IsMatch(body, "^\\{.*\\}$")# Extract timestamp if present in JSON- set(time_unix_nano, UnixMilli(Int(attributes["@timestamp"]))) where attributes["@timestamp"] != nil- delete_key(attributes, "@timestamp")# Set log level from Logstash fields- set(severity_text, attributes["level"]) where attributes["level"] != nil- set(severity_text, attributes["log_level"]) where attributes["log_level"] != nil# Extract message field- set(body, attributes["message"]) where attributes["message"] != nil and attributes["message"] != ""# Add Logstash-specific attributes- set(attributes["logstash.version"], attributes["version"]) where attributes["version"] != nil- set(attributes["logstash.pipeline"], attributes["pipeline"]) where attributes["pipeline"] != nil- set(attributes["logstash.input"], attributes["input"]) where attributes["input"] != nil# Clean up temporary attributes- delete_key(attributes, "version")- delete_key(attributes, "message")exporters:otlp/last9:endpoint: ${LAST9_OTLP_ENDPOINT}headers:Authorization: ${LAST9_OTLP_AUTH_HEADER}compression: gzipretry_on_failure:enabled: trueinitial_interval: 5smax_interval: 30sdebug:verbosity: basicsampling_initial: 5sampling_thereafter: 1000prometheus:endpoint: "0.0.0.0:8889"namespace: "logstash_integration"service:extensions: [health_check]pipelines:logs:receivers: [tcplog/logstash, filelog/logstash]processors:[memory_limiter,resourcedetection/system,resourcedetection/cloud,transform/logstash,batch,]exporters: [otlp/last9]metrics:receivers: [hostmetrics]processors:[memory_limiter,resourcedetection/system,resourcedetection/cloud,batch,]exporters: [otlp/last9]telemetry:logs:level: infometrics:address: 0.0.0.0:8888extensions:health_check:endpoint: 0.0.0.0:13133Start the OpenTelemetry Collector:
# Start collector servicesudo systemctl enable otelcol-contribsudo systemctl start otelcol-contrib# Check statussudo systemctl status otelcol-contrib# View logssudo journalctl -u otelcol-contrib -fConfigure Logstash Pipeline
Create or update your Logstash pipeline configuration:
# logstash.confinput {# Your existing inputsbeats {port => 5044}file {path => "/var/log/apache2/access.log"start_position => "beginning"type => "apache-access"}}filter {# Your existing filtersif [type] == "apache-access" {grok {match => {"message" => "%{COMBINEDAPACHELOG}"}}date {match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]}}# Add metadata for Last9mutate {add_field => {"[@metadata][pipeline]" => "logstash-to-last9""environment" => "production""logstash_version" => "${LOGSTASH_VERSION}"}}}output {# Forward to OpenTelemetry Collectortcp {codec => json_lineshost => "localhost"port => 2255reconnect_interval => 10}# Optional: Keep existing outputs# elasticsearch {# hosts => ["elasticsearch:9200"]# }}# Enhanced pipeline with error handlinginput {beats {port => 5044add_field => { "input_type" => "beats" }}file {path => ["/var/log/syslog", "/var/log/auth.log"]start_position => "beginning"type => "syslog"add_field => { "input_type" => "file" }}http {port => 8080add_field => { "input_type" => "http" }}}filter {# Add unique event IDuuid {target => "event_id"}# Parse different log typesif [type] == "syslog" {grok {match => {"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{IPORHOST:syslog_server} %{DATA:syslog_program}(?:\\[%{POSINT:syslog_pid}\\])?: %{GREEDYDATA:syslog_message}"}}}# Add common fieldsmutate {add_field => {"[@metadata][target]" => "last9""logstash_host" => "%{host}""pipeline_stage" => "processed""log_source" => "logstash"}# Convert timestamps to ISO formatconvert => {"@timestamp" => "string"}}# Error handlingif "_grokparsefailure" in [tags] {mutate {add_field => {"parse_error" => "grok_failure""original_message" => "%{message}"}add_tag => ["logstash_parse_error"]}}}output {# Primary output to Last9 via OpenTelemetry Collectortcp {codec => json_lineshost => "localhost"port => 2255reconnect_interval => 10# Retry configurationretry_policy => {"max_retries" => 3"retry_interval" => 5}}# Dead letter queue for failed messagesif "logstash_parse_error" in [tags] {file {path => "/var/log/logstash/dead-letter-queue/failed-%{+YYYY.MM.dd}.log"codec => json_lines}}# Debug output (optional)if [@metadata][debug] {stdout {codec => json_lines}}}# Multi-environment configurationinput {beats {port => 5044add_field => {"environment" => "${ENVIRONMENT:dev}""datacenter" => "${DATACENTER:local}"}}}filter {# Environment-specific processingif [environment] == "production" {# More comprehensive parsing in productiongrok {match => {"message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{LOGLEVEL:log_level} %{GREEDYDATA:log_message}"}}# Add production metadatamutate {add_field => {"priority" => "high""retention_days" => "90"}}} else {# Simpler processing for dev/stagingmutate {add_field => {"priority" => "low""retention_days" => "7"}}}}output {# Environment-specific OpenTelemetry Collector endpointsif [environment] == "production" {tcp {codec => json_lineshost => "prod-otel-collector.internal"port => 2255}} else {tcp {codec => json_lineshost => "dev-otel-collector.internal"port => 2255}}}Start Logstash
Start Logstash with your configuration:
# Test configurationsudo /usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/logstash.conf# Start Logstashsudo systemctl enable logstashsudo systemctl start logstash# Check statussudo systemctl status logstash# Monitor logssudo tail -f /var/log/logstash/logstash-plain.log
Advanced Configuration
Performance Tuning
Optimize Logstash for high-throughput log processing:
# High-performance Logstash configurationinput { beats { port => 5044 # Increase worker threads threads => 4 # Enable batching client_inactivity_timeout => 60 }}
filter { # Optimize grok patterns grok { match => { "message" => "%{COMMONAPACHELOG}" } # Use named captures for better performance named_captures_only => true # Enable pattern compilation caching patterns_files_glob => "/etc/logstash/patterns/*" }
# Batch mutations mutate { add_field => { "processed_at" => "%{@timestamp}" "pipeline_id" => "high-throughput" } # Remove unnecessary fields early remove_field => ["agent", "ecs", "host"] }}
output { tcp { codec => json_lines host => "localhost" port => 2255 # Optimize TCP settings reconnect_interval => 1 workers => 2 }}Configure Logstash JVM settings (/etc/logstash/jvm.options):
# Heap size (adjust based on available memory)-Xms2g-Xmx2g
# Garbage collection settings-XX:+UseG1GC-XX:MaxGCPauseMillis=200-XX:G1HeapRegionSize=16m
# JIT optimization-XX:+TieredCompilation-XX:TieredStopAtLevel=1
# Performance monitoring-XX:+PrintGCDetails-XX:+PrintGCTimeStampsSecurity Configuration
Secure log forwarding with TLS:
# OpenTelemetry Collector with TLSreceivers: tcplog/logstash_secure: listen_address: "0.0.0.0:2256" tls: cert_file: /etc/ssl/certs/otel-collector.crt key_file: /etc/ssl/private/otel-collector.key ca_file: /etc/ssl/certs/ca.crt client_ca_file: /etc/ssl/certs/ca.crt resource: service.name: "secure-logstash-forwarder"Logstash TLS output configuration:
output { tcp { codec => json_lines host => "secure-collector.internal" port => 2256
# Enable TLS ssl_enable => true ssl_cert => "/etc/logstash/ssl/client.crt" ssl_key => "/etc/logstash/ssl/client.key" ssl_certificate_authorities => ["/etc/logstash/ssl/ca.crt"] ssl_verify => true }}Log Enrichment and Filtering
Add comprehensive log processing:
filter { # IP geolocation geoip { source => "clientip" target => "geoip" database => "/etc/logstash/geoip/GeoLite2-City.mmdb" }
# User agent parsing useragent { source => "useragent" target => "ua" }
# Log level normalization if [log_level] { translate { field => "log_level" destination => "normalized_level" dictionary => { "TRACE" => "trace" "DEBUG" => "debug" "INFO" => "info" "INFORMATION" => "info" "WARN" => "warn" "WARNING" => "warn" "ERROR" => "error" "FATAL" => "fatal" "CRITICAL" => "fatal" } fallback => "unknown" } }
# Sensitive data filtering mutate { # Remove sensitive fields remove_field => ["password", "secret", "token", "api_key"]
# Mask credit card numbers gsub => [ "message", "\d{4}-\d{4}-\d{4}-\d{4}", "****-****-****-****" ] }
# Add tags based on content if [message] =~ /ERROR|Exception|error/ { mutate { add_tag => ["error", "needs_attention"] } }
if [response] and [response] >= 400 { mutate { add_tag => ["http_error"] } }}Troubleshooting
Common Issues
Logstash not connecting to OpenTelemetry Collector:
# Check network connectivitytelnet localhost 2255
# Verify collector is listeningsudo netstat -tlnp | grep :2255
# Check collector logssudo journalctl -u otelcol-contrib -fLog parsing failures:
# Check Logstash logs for grok failuresgrep "_grokparsefailure" /var/log/logstash/logstash-plain.log
# Test grok patternssudo /usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/logstash.confHigh memory usage:
# Monitor Logstash JVMjstat -gc $(pgrep -f logstash) 5s
# Check dead letter queuels -la /var/log/logstash/dead_letter_queue/
# Adjust batch sizes# In logstash.conf, reduce batch_size and flush_intervalMissing logs in Last9:
# Verify collector receiving logscurl http://localhost:8888/metrics | grep tcplog
# Check Last9 endpoint connectivitycurl -v $last9_otlp_endpoint
# Enable debug output in collector# Set debug exporter in service.pipelines.logs.exporters