Access Cold Storage Logs via AWS Athena
Learn how to query and analyze your cold storage logs in S3 using AWS Athena's SQL interface
Last9 automatically backs up your logs to a configured S3 bucket via Cold Storage. This doc will show you how to access and query these archived logs using AWS Athena, allowing you to perform powerful SQL-based analysis on your historical data.
Create a database on Athena
CREATE DATABASE last9;
Create a table in the database
CREATE EXTERNAL TABLE last9.logs ( `timestamp` bigint, `traceid` string, `spanid` string, `traceflags` int, `severitytext` string, `severitynumber` int, `servicename` string, `body` string, `resourceschemaurl` string, `resourceattributes` array<array<string>>, `scopeschemaurl` string, `scopename` string, `scopeversion` string, `scopeattributes` array<string>, `logattributes` array<array<string>>)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 's3://customer_s3_bucket/snappy-files/'TBLPROPERTIES ('parquet.compression'='SNAPPY');
Export AWS Profile
Before running the script, ensure your AWS profile is properly configured with appropriate permissions to access both your source and destination S3 buckets, as well as Athena.
Move logs to Athena from backup S3 bucket
Save the following Python script as insert_data_into_athena.py
.
import argparseimport boto3import osimport pandas as pdimport tempfileimport lz4.framefrom botocore.exceptions import ClientError
class ParquetProcessor: def __init__(self): """Initialize the processor using AWS credentials from environment""" self.s3_client = boto3.client('s3') self.athena_client = boto3.client('athena') self.temp_dir = tempfile.mkdtemp()
def download_from_s3(self, bucket_name, prefix): """Download all .parquet.lz4 files from the specified S3 path""" downloaded_files = [] try: print(f"Searching in bucket: {bucket_name}") print(f"Using prefix: {prefix}")
paginator = self.s3_client.get_paginator('list_objects_v2') for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix): if 'Contents' in page: print("\nObjects found:") for obj in page['Contents']: print(f"Key: {obj['Key']}")
if obj['Key'].endswith('.parquet.lz4'): print(f"Found matching file: {obj['Key']}") local_file = os.path.join(self.temp_dir, os.path.basename(obj['Key'])) self.s3_client.download_file(bucket_name, obj['Key'], local_file) downloaded_files.append(local_file) else: print("No 'Contents' in this page")
if not downloaded_files: print("No .parquet.lz4 files were found")
return downloaded_files except ClientError as e: print(f"Error downloading files: {e}") return []
def decompress_lz4(self, file_path): """Decompress .parquet.lz4 file to .parquet""" try: output_file = file_path.replace('.lz4', '') print(f"Decompressing {file_path} to {output_file}")
with open(file_path, 'rb') as compressed: compressed_data = compressed.read()
decompressed_data = lz4.frame.decompress(compressed_data)
with open(output_file, 'wb') as decompressed: decompressed.write(decompressed_data)
os.remove(file_path) print(f"Successfully decompressed to {output_file}") return output_file except Exception as e: print(f"Error decompressing file {file_path}: {e}") return None
def convert_to_snappy(self, file_path): """Convert decompressed parquet to Snappy compression""" try: df = pd.read_parquet(file_path) df.to_parquet(file_path, compression='snappy') return file_path except Exception as e: print(f"Error converting file {file_path}: {e}") return None
def upload_to_s3(self, bucket, prefix, file_path): """Upload a file to S3""" try: file_name = os.path.basename(file_path) s3_key = os.path.join(prefix.rstrip('/'), file_name) print(f"Uploading {file_path} to s3://{bucket}/{s3_key}") self.s3_client.upload_file(file_path, bucket, s3_key) return True except Exception as e: print(f"Error uploading file: {e}") return False
def cleanup_local_files(self, snappy_files): """Clean up temporary local files""" for file in snappy_files: try: os.remove(file) except Exception as e: print(f"Error removing file {file}: {e}") os.rmdir(self.temp_dir)
def process_files(self, source_bucket, source_prefix, snappy_destination, athena_results_location=None): """Main process to handle the complete workflow""" # Download LZ4 files lz4_files = self.download_from_s3(source_bucket, source_prefix) if not lz4_files: print("No .parquet.lz4 files found") return
# Decompress LZ4 files decompressed_files = [] for file in lz4_files: decompressed_file = self.decompress_lz4(file) if decompressed_file: decompressed_files.append(decompressed_file)
if not decompressed_files: print("No files were successfully decompressed") return
# Convert to Snappy snappy_files = [] for file in decompressed_files: snappy_file = self.convert_to_snappy(file) if snappy_file: snappy_files.append(snappy_file)
if not snappy_files: print("No files were successfully converted to Snappy") return
# Upload to snappy destination dest_bucket = snappy_destination.split('//')[1].split('/')[0] dest_prefix = '/'.join(snappy_destination.split('//')[1].split('/')[1:])
for file in snappy_files: if not self.upload_to_s3(dest_bucket, dest_prefix, file): print(f"Failed to upload {file}") continue
# Cleanup local files self.cleanup_local_files(snappy_files) print("Processing completed successfully")
if __name__ == "__main__": parser = argparse.ArgumentParser(description='Process .parquet.lz4 files and upload to S3')
# S3 and Athena configuration parser.add_argument('--source-bucket', required=True, help='Source S3 bucket name where parquet.lz4 (last9 saves archives)') parser.add_argument('--source-prefix', required=True, help='Source S3 prefix path where parquet.lz4 files are stored')
parser.add_argument('--snappy-destination', required=True, help='S3 path for converted snappy files') parser.add_argument('--athena-results', required=True, help='S3 path for Athena query results')
args = parser.parse_args() processor = ParquetProcessor() processor.process_files( source_bucket=args.source_bucket, source_prefix=args.source_prefix, snappy_destination=args.snappy_destination, athena_results_location=args.athena_results )
The script insert_data_into_athena.py
is used to process .parquet.lz4
files from the backup bucket and upload them to a separate S3 location for processing in Athena.
Help Command
Run the following command to see all available options and parameters:
python insert_data_into_athena.py --help
Usage
usage: insert_data_into_athena.py [-h] --source-bucket SOURCE_BUCKET--source-prefix SOURCE_PREFIX --snappy-destination SNAPPY_DESTINATION --athena-results ATHENA_RESULTS
Process .parquet.lz4 files and upload to S3
options: -h, --help show this help message and exit
--source-bucket SOURCE_BUCKET Source S3 bucket name where parquet.lz4 (where Last9 saves backup files)
--source-prefix SOURCE_PREFIX Source S3 prefix path where parquet.lz4 files are stored
--snappy-destination SNAPPY_DESTINATION S3 path for converted snappy files
--athena-results ATHENA_RESULTS S3 path for Athena query results
Example Command
Here’s a sample command that processes files from your backup bucket to prepare them for Athena queries:
python insert_data_into_athena.py \ --source-bucket last9_backup_bucket \ --source-prefix "path/to/file/" \ --snappy-destination "s3://customer_s3_bucket/snappy-files" \ --athena-results "s3://customer_s3_bucket/athena-results/"
In this example:
last9_backup_bucket
is your source bucket containing the archived logspath/to/file/
is the directory path where your .parquet.lz4 files are locateds3://customer_s3_bucket/snappy-files
is where the converted files will be storeds3://customer_s3_bucket/athena-results/
is where Athena will store query results
Check result on Athena
After the data has been uploaded, you can query it using Athena with the following SQL:
SELECT * FROM last9.logs;
This will retrieve all logs from the last9.logs
table, allowing you to verify that your data has been successfully uploaded and is accessible through Athena.
Troubleshooting
Please get in touch with us on Discord or Email if you have any questions.