Skip to content
Last9 Last9

Access Cold Storage Logs via AWS Athena

Learn how to query and analyze your cold storage logs in S3 using AWS Athena's SQL interface

Last9 automatically backs up your logs to a configured S3 bucket via Cold Storage. This doc will show you how to access and query these archived logs using AWS Athena, allowing you to perform powerful SQL-based analysis on your historical data.

Create a database on Athena

CREATE DATABASE last9;

Create a table in the database

CREATE EXTERNAL TABLE last9.logs (
`timestamp` bigint,
`traceid` string,
`spanid` string,
`traceflags` int,
`severitytext` string,
`severitynumber` int,
`servicename` string,
`body` string,
`resourceschemaurl` string,
`resourceattributes` array<array<string>>,
`scopeschemaurl` string,
`scopename` string,
`scopeversion` string,
`scopeattributes` array<string>,
`logattributes` array<array<string>>
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://customer_s3_bucket/snappy-files/'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

Export AWS Profile

Before running the script, ensure your AWS profile is properly configured with appropriate permissions to access both your source and destination S3 buckets, as well as Athena.

Move logs to Athena from backup S3 bucket

Save the following Python script as insert_data_into_athena.py.

import argparse
import boto3
import os
import pandas as pd
import tempfile
import lz4.frame
from botocore.exceptions import ClientError
class ParquetProcessor:
def __init__(self):
"""Initialize the processor using AWS credentials from environment"""
self.s3_client = boto3.client('s3')
self.athena_client = boto3.client('athena')
self.temp_dir = tempfile.mkdtemp()
def download_from_s3(self, bucket_name, prefix):
"""Download all .parquet.lz4 files from the specified S3 path"""
downloaded_files = []
try:
print(f"Searching in bucket: {bucket_name}")
print(f"Using prefix: {prefix}")
paginator = self.s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
if 'Contents' in page:
print("\nObjects found:")
for obj in page['Contents']:
print(f"Key: {obj['Key']}")
if obj['Key'].endswith('.parquet.lz4'):
print(f"Found matching file: {obj['Key']}")
local_file = os.path.join(self.temp_dir, os.path.basename(obj['Key']))
self.s3_client.download_file(bucket_name, obj['Key'], local_file)
downloaded_files.append(local_file)
else:
print("No 'Contents' in this page")
if not downloaded_files:
print("No .parquet.lz4 files were found")
return downloaded_files
except ClientError as e:
print(f"Error downloading files: {e}")
return []
def decompress_lz4(self, file_path):
"""Decompress .parquet.lz4 file to .parquet"""
try:
output_file = file_path.replace('.lz4', '')
print(f"Decompressing {file_path} to {output_file}")
with open(file_path, 'rb') as compressed:
compressed_data = compressed.read()
decompressed_data = lz4.frame.decompress(compressed_data)
with open(output_file, 'wb') as decompressed:
decompressed.write(decompressed_data)
os.remove(file_path)
print(f"Successfully decompressed to {output_file}")
return output_file
except Exception as e:
print(f"Error decompressing file {file_path}: {e}")
return None
def convert_to_snappy(self, file_path):
"""Convert decompressed parquet to Snappy compression"""
try:
df = pd.read_parquet(file_path)
df.to_parquet(file_path, compression='snappy')
return file_path
except Exception as e:
print(f"Error converting file {file_path}: {e}")
return None
def upload_to_s3(self, bucket, prefix, file_path):
"""Upload a file to S3"""
try:
file_name = os.path.basename(file_path)
s3_key = os.path.join(prefix.rstrip('/'), file_name)
print(f"Uploading {file_path} to s3://{bucket}/{s3_key}")
self.s3_client.upload_file(file_path, bucket, s3_key)
return True
except Exception as e:
print(f"Error uploading file: {e}")
return False
def cleanup_local_files(self, snappy_files):
"""Clean up temporary local files"""
for file in snappy_files:
try:
os.remove(file)
except Exception as e:
print(f"Error removing file {file}: {e}")
os.rmdir(self.temp_dir)
def process_files(self, source_bucket, source_prefix, snappy_destination, athena_results_location=None):
"""Main process to handle the complete workflow"""
# Download LZ4 files
lz4_files = self.download_from_s3(source_bucket, source_prefix)
if not lz4_files:
print("No .parquet.lz4 files found")
return
# Decompress LZ4 files
decompressed_files = []
for file in lz4_files:
decompressed_file = self.decompress_lz4(file)
if decompressed_file:
decompressed_files.append(decompressed_file)
if not decompressed_files:
print("No files were successfully decompressed")
return
# Convert to Snappy
snappy_files = []
for file in decompressed_files:
snappy_file = self.convert_to_snappy(file)
if snappy_file:
snappy_files.append(snappy_file)
if not snappy_files:
print("No files were successfully converted to Snappy")
return
# Upload to snappy destination
dest_bucket = snappy_destination.split('//')[1].split('/')[0]
dest_prefix = '/'.join(snappy_destination.split('//')[1].split('/')[1:])
for file in snappy_files:
if not self.upload_to_s3(dest_bucket, dest_prefix, file):
print(f"Failed to upload {file}")
continue
# Cleanup local files
self.cleanup_local_files(snappy_files)
print("Processing completed successfully")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Process .parquet.lz4 files and upload to S3')
# S3 and Athena configuration
parser.add_argument('--source-bucket', required=True, help='Source S3 bucket name where parquet.lz4 (last9 saves archives)')
parser.add_argument('--source-prefix', required=True, help='Source S3 prefix path where parquet.lz4 files are stored')
parser.add_argument('--snappy-destination', required=True, help='S3 path for converted snappy files')
parser.add_argument('--athena-results', required=True, help='S3 path for Athena query results')
args = parser.parse_args()
processor = ParquetProcessor()
processor.process_files(
source_bucket=args.source_bucket,
source_prefix=args.source_prefix,
snappy_destination=args.snappy_destination,
athena_results_location=args.athena_results
)

The script insert_data_into_athena.py is used to process .parquet.lz4 files from the backup bucket and upload them to a separate S3 location for processing in Athena.

Help Command

Run the following command to see all available options and parameters:

Terminal window
python insert_data_into_athena.py --help

Usage

usage: insert_data_into_athena.py [-h] --source-bucket SOURCE_BUCKET
--source-prefix SOURCE_PREFIX --snappy-destination SNAPPY_DESTINATION --athena-results ATHENA_RESULTS
Process .parquet.lz4 files and upload to S3
options:
-h, --help show this help message and exit
--source-bucket SOURCE_BUCKET Source S3 bucket name where parquet.lz4
(where Last9 saves backup files)
--source-prefix SOURCE_PREFIX Source S3 prefix path where parquet.lz4 files are stored
--snappy-destination SNAPPY_DESTINATION
S3 path for converted snappy files
--athena-results ATHENA_RESULTS S3 path for Athena query results

Example Command

Here’s a sample command that processes files from your backup bucket to prepare them for Athena queries:

Terminal window
python insert_data_into_athena.py \
--source-bucket last9_backup_bucket \
--source-prefix "path/to/file/" \
--snappy-destination "s3://customer_s3_bucket/snappy-files" \
--athena-results "s3://customer_s3_bucket/athena-results/"

In this example:

  • last9_backup_bucket is your source bucket containing the archived logs
  • path/to/file/ is the directory path where your .parquet.lz4 files are located
  • s3://customer_s3_bucket/snappy-files is where the converted files will be stored
  • s3://customer_s3_bucket/athena-results/ is where Athena will store query results

Check result on Athena

After the data has been uploaded, you can query it using Athena with the following SQL:

SELECT * FROM last9.logs;

This will retrieve all logs from the last9.logs table, allowing you to verify that your data has been successfully uploaded and is accessible through Athena.

Troubleshooting

Please get in touch with us on Discord or Email if you have any questions.