Vibe monitoring with Last9 MCP: Ask your agent to fix production issues! Setup →
Last9 Last9

Elasticsearch with Python: A Detailed Guide to Search and Analytics

Know how to use Elasticsearch with Python for indexing, searching, and analyzing data, complete with code, tips, and integration examples.

Jul 11th, ‘25
Elasticsearch with Python: A Detailed Guide to Search and Analytics
See How Last9 Works

Unified observability for all your telemetry. Open standards. Simple pricing.

Talk to us

If you’re using Python for search, log aggregation, or analytics, you’ve probably worked with Elasticsearch. It’s fast, scalable, and fairly complex once you go beyond the basics.

The official Python client gives you raw access to Elasticsearch’s REST API. But getting it to work the way you want, especially under load, can be tricky.

This blog walks through practical ways to index, query, and monitor Elasticsearch from Python code, without getting lost in the docs.

Elasticsearch and Its Role

Elasticsearch is a distributed search and analytics engine built on Apache Lucene. It stores data as JSON and supports fast, flexible search across massive datasets. For Python developers, it’s a go-to choice when building apps that need full-text search, log analytics, or real-time querying.

At its core, Elasticsearch has three key building blocks:

  • Indices are like databases, logical containers for related documents.
  • Documents are JSON objects, each representing a record inside an index.
  • Clusters are groups of nodes that work together to store and query data. Add more nodes, and you get horizontal scale almost out of the box.

Python apps use Elasticsearch for everything from product search and log aggregation to analytics and full-text search in CMSes.

Why do Python developers reach for Elasticsearch?

  • Simple integration. Elasticsearch exposes a clean REST API that plays well with Python HTTP libraries.
  • Ecosystem fit. Libraries like elasticsearch-py, pandas, and fastapi make it easy to query, transform, and serve data.
  • Fresh results, fast. Near real-time indexing means data is searchable within seconds of being written.
💡
If you’re planning to restructure or migrate your data, here’s how the Elasticsearch Reindex API can help.

Best GUI for Managing Elasticsearch

Managing Elasticsearch clusters from the command line can quickly get tedious, especially when you're handling multiple nodes, indices, or performance issues. GUI tools make this easier by giving you a visual layer for cluster health, query debugging, and index operations.

Kibana is the official GUI that comes bundled with the Elastic Stack. It’s the most comprehensive option:

  • Cluster monitoring: Real-time dashboards show node health, memory usage, and indexing stats.
  • Index management: Create, delete, and update indices through a form-based UI instead of handcrafting JSON.
  • Query testing: The Dev Tools console lets you run queries interactively — helpful for debugging before moving to code.
  • Data visualization: Build charts, dashboards, and alerts on top of your indexed data.

ElasticHQ: Lightweight and Focused

ElasticHQ is a leaner alternative that focuses purely on cluster administration:

  • Cluster overview: Inspect node stats, shard distribution, and storage usage at a glance.
  • Search interface: Run queries and view results in a formatted layout.
  • No bloat: You don’t get visualizations, but it's fast and to the point for ops tasks.

Integrates Cleanly with Python Workflows

Both Kibana and ElasticHQ connect to Elasticsearch via the same REST API your code uses. Auth works the same way, too. That means anything you do through the UI can be replicated in Python scripts, making it easy to go from manual testing to automation.

But once you're working with large-scale telemetry logs, traces, and metrics across distributed systems, these tools hit their limits. That’s where Last9 helps you.

It’s a production-grade observability platform that speaks OpenTelemetry natively and works well with high-cardinality datasets. You can plug it into your existing Elasticsearch setup or use it as a standalone backend for streaming observability data, purpose-built for modern infra.

Now, let's understand how to connect Python to Elasticsearch.

Step-by-Step Process to Connect Python to Elasticsearch

Set Up the Connection

To connect a Python app with Elasticsearch, use the official Python client: elasticsearch-py. It wraps the full Elasticsearch REST API and handles connection pooling, request formatting, and response parsing.

Install it with pip:

pip install elasticsearch

Once installed, you can create a client instance that points to your cluster:

from elasticsearch import Elasticsearch

# Basic connection for local development
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

# Verify connection
try:
    cluster_info = es.info()
    print(f"Connected to cluster: {cluster_info['cluster_name']}")
    print(f"Elasticsearch version: {cluster_info['version']['number']}")
except Exception as e:
    print(f"Connection failed: {e}")

The client supports both sync and async usage, and works with Python 3.x.

Secure the Connection for Production

In production, you’ll want to secure and harden the connection. That includes:

  • Using HTTPS with SSL/TLS
  • Enabling authentication
  • Setting timeouts and retry logic
  • Managing connection pools

Example:

es = Elasticsearch(
    ['https://node1.example.com:9200', 'https://node2.example.com:9200'],
    http_auth=('username', 'password'),
    use_ssl=True,
    verify_certs=True,
    ssl_show_warn=False,
    timeout=30,
    max_retries=3,
    retry_on_timeout=True
)

These parameters help avoid hanging requests, handle transient failures, and make efficient use of connections when running at scale.

Use API Keys for Authentication

API keys are a cleaner alternative to basic auth. They’re more secure and easier to rotate.

To generate an API key (requires admin privileges):

api_key_response = es.security.create_api_key(
    body={
        "name": "python-app-key",
        "role_descriptors": {
            "python-app-role": {
                "cluster": ["monitor"],
                "index": [
                    {
                        "names": ["logs-*", "metrics-*"],
                        "privileges": ["read", "write", "create_index"]
                    }
                ]
            }
        }
    }
)

api_key_id = api_key_response['id']
api_key_secret = api_key_response['api_key']

Then connect using the key:

es = Elasticsearch(
    ['https://elasticsearch.example.com:9200'],
    api_key=(api_key_id, api_key_secret),
    use_ssl=True,
    verify_certs=True
)

try:
    health = es.cluster.health()
    print(f"Cluster health: {health['status']}")
except Exception as e:
    print(f"Authentication failed: {e}")

You can programmatically manage API keys using the security API, revoke, rotate, or scope them as needed. This works well with secret managers and CI/CD pipelines.

💡
If you're deciding between a document store and a search engine for your project, this MongoDB vs Elasticsearch comparison breaks it down.

How to Use Elasticsearch Indexes in Python

Create and Manage Indexes

Elasticsearch indexes act as containers for documents that share similar characteristics. When you create an index, you can define settings that control how data is stored, analyzed, and searched. These include the number of shards, the number of replicas, refresh intervals, and custom analyzers.

Shards control how data is split and distributed across nodes. More shards allow better parallelism but use more resources. Since the shard count can’t be changed after creation, planning is important.

# Create an index with custom settings
index_settings = {
    'settings': {
        'number_of_shards': 3,
        'number_of_replicas': 1,
        'refresh_interval': '30s',
        'analysis': {
            'analyzer': {
                'custom_analyzer': {
                    'type': 'custom',
                    'tokenizer': 'standard',
                    'filter': ['lowercase', 'stop']
                }
            }
        }
    }
}

es.indices.create(index='articles', body=index_settings)

Once the index is created, you can update its settings with some limitations. Shard count, for example, is fixed. But you can adjust settings like replica count or refresh interval on the fly.

# Update index settings
update_settings = {
    'index': {
        'refresh_interval': '60s',
        'number_of_replicas': 2
    }
}

es.indices.put_settings(index='articles', body=update_settings)

You can also inspect index stats:

# Check index status
index_stats = es.indices.stats(index='articles')
print(f"Document count: {index_stats['indices']['articles']['total']['docs']['count']}")
print(f"Index size: {index_stats['indices']['articles']['total']['store']['size_in_bytes']} bytes")

For time-series data or log streams, index templates help enforce consistency. Templates apply predefined settings and mappings to all indexes that match a naming pattern, making it easier to manage rolling indexes.

Define Mappings for Your Data

Mappings define the structure of your documents, which fields exist, what types they use, how they're indexed, and how they're analyzed. While Elasticsearch can infer types automatically, explicit mappings prevent surprises and offer better control.

Field types matter. Use text for full-text search, keyword for exact matches and aggregations, integer for numeric operations, and date for anything temporal.

# Define comprehensive mappings
mappings = {
    'properties': {
        'title': {
            'type': 'text',
            'analyzer': 'standard',
            'fields': {
                'keyword': {
                    'type': 'keyword'
                }
            }
        },
        'content': {
            'type': 'text',
            'analyzer': 'custom_analyzer'
        },
        'publish_date': {
            'type': 'date',
            'format': 'yyyy-MM-dd||epoch_millis'
        },
        'author': {
            'type': 'object',
            'properties': {
                'name': {'type': 'text'},
                'email': {'type': 'keyword'},
                'bio': {'type': 'text'}
            }
        },
        'tags': {
            'type': 'keyword'
        },
        'view_count': {
            'type': 'integer'
        },
        'rating': {
            'type': 'float'
        },
        'metadata': {
            'type': 'object',
            'enabled': False
        }
    }
}

You can apply both the settings and mappings when creating the index:

# Create index with mappings and settings
es.indices.create(
    index='articles',
    body={
        'settings': index_settings,
        'mappings': mappings
    }
)

Multi-field mappings are especially useful when you want to support different types of queries on the same field. A text field can include a keyword sub-field to enable both full-text search and exact filtering.

If your schema evolves, dynamic templates help assign field types automatically based on naming patterns, useful when field names aren't known in advance.

💡
For running Elasticsearch in production, our blog on deploying Elasticsearch on Kubernetes covers setup and scaling basics.

Index and Search Documents in Elasticsearch

Index a Single Document

To store data in Elasticsearch, you index a JSON document into a specific index. The document is assigned an ID (manually or auto-generated), then analyzed and stored based on the index’s mappings.

Here’s how you index a single document using Python:

document = {
    'title': 'Getting Started with Elasticsearch and Python',
    'content': 'This guide covers how to index documents, run searches, and optimize performance.',
    'publish_date': '2024-01-15',
    'author': {
        'name': 'John Developer',
        'email': 'john@example.com',
        'bio': 'Senior Python developer with expertise in search technologies'
    },
    'tags': ['elasticsearch', 'python', 'tutorial', 'search'],
    'view_count': 1250,
    'rating': 4.7,
    'metadata': {
        'source': 'blog',
        'category': 'technical'
    }
}

response = es.index(index='articles', id=1, body=document)
print(f"Document indexed: {response['result']}")

By default, text fields like title and content are analyzed, split into terms, normalized, and stored for full-text search.

Index Documents in Bulk

If you're indexing thousands (or millions) of records, use the bulk helper. It reduces overhead by batching operations into a single request, critical for performance in any ingest-heavy application.

from elasticsearch.helpers import bulk

# Generate and prepare documents
documents = []
for i in range(1000):
    documents.append({
        '_index': 'articles',
        '_id': i,
        '_source': {
            'title': f'Article {i}',
            'content': f'Content for article number {i}',
            'publish_date': '2024-01-15',
            'tags': ['bulk', 'indexing', 'test'],
            'view_count': i * 10,
            'rating': 4.0 + (i % 10) * 0.1
        }
    })

# Bulk indexing
bulk(es, documents, chunk_size=100)
print(f"Indexed {len(documents)} documents")

You can tune chunk_size based on memory and performance. For high-throughput pipelines, this is the preferred method.

Run a Simple Search Query

Elasticsearch supports a flexible query DSL. The most common is a match query, used for full-text search.

search_query = {
    'query': {
        'match': {
            'content': 'elasticsearch python'
        }
    },
    'size': 10
}

response = es.search(index='articles', body=search_query)

print(f"Found {response['hits']['total']['value']} documents")
for hit in response['hits']['hits']:
    print(f"Score: {hit['_score']}, Title: {hit['_source']['title']}")

A match query analyzes the search string and compares it to the analyzed fields. You get relevance scoring (_score) and ranked results out of the box.

Build Complex Queries with Bool

To combine filters, conditions, and relevance tuning, use a bool query. This gives you full control over how documents are matched and scored.

complex_query = {
    'query': {
        'bool': {
            'must': [
                {'match': {'content': 'python'}}
            ],
            'filter': [
                {'term': {'tags': 'tutorial'}},
                {'range': {'rating': {'gte': 4.0}}}
            ],
            'should': [
                {'match': {'title': 'elasticsearch'}}
            ]
        }
    },
    'sort': [
        {'publish_date': {'order': 'desc'}},
        {'rating': {'order': 'desc'}}
    ],
    'highlight': {
        'fields': {
            'content': {},
            'title': {}
        }
    }
}

response = es.search(index='articles', body=complex_query)
  • must is required and scored.
  • filter is required but not scored, faster, ideal for structured conditions.
  • should is optional but boosts relevance if matched.

Sorting and highlighting help build richer UIs, like blog listings or product search results.

Use Aggregations to Analyze Data

Aggregations let you compute stats, group data, and build dashboards, all within Elasticsearch, with no need to pull data into another tool.

agg_query = {
    'size': 0,
    'aggs': {
        'popular_tags': {
            'terms': {
                'field': 'tags',
                'size': 10
            }
        },
        'avg_rating': {
            'avg': {
                'field': 'rating'
            }
        },
        'monthly_posts': {
            'date_histogram': {
                'field': 'publish_date',
                'calendar_interval': 'month'
            }
        }
    }
}

response = es.search(index='articles', body=agg_query)

You can extract results like this:

print(f"Average rating: {response['aggregations']['avg_rating']['value']}")

for bucket in response['aggregations']['popular_tags']['buckets']:
    print(f"Tag: {bucket['key']}, Count: {bucket['doc_count']}")
  • terms: like SQL GROUP BY
  • avg: returns the average value of a field
  • date_histogram: buckets data by time intervals, useful for trend analysis

Limitations of Elasticsearch in Python

Elasticsearch is a powerful search engine, but it comes with constraints you should be aware of when building Python applications.

Document Size Limits

Elasticsearch supports documents up to 2GB, but in practice, large documents impact performance.

  • Large documents consume more memory during indexing and querying
  • Text analysis and term indexing become slower
  • It's better to split large documents into smaller parts
def split_large_document(large_doc, max_size=10000):
    content = large_doc['content']
    chunks = []

    for i in range(0, len(content), max_size):
        chunks.append({
            'parent_id': large_doc['id'],
            'chunk_index': i // max_size,
            'content': content[i:i + max_size],
            'metadata': large_doc['metadata']
        })
    
    return chunks

# Index the chunks
large_document = {
    'id': 'large_doc_1',
    'content': 'Very large content...' * 1000,
    'metadata': {'type': 'manual', 'source': 'upload'}
}

chunks = split_large_document(large_document)
for chunk in chunks:
    es.index(index='document_chunks', body=chunk)

Mapping Constraints

Elasticsearch has limits around how documents can be structured:

  • Max field name length: 1000 characters
  • Max nested object depth: 20
  • Max number of fields per index (default): 1000

If you're generating dynamic fields or deeply nested JSON, these limits can cause failures. Plan your mappings accordingly.

Memory Usage and High-Cardinality Fields

Elasticsearch loads field data into memory for sorting and aggregations. High-cardinality fields (with many unique values) use more memory.

  • Sorting or aggregating on fields like user_id or email can trigger memory limits
  • Circuit breakers are used to stop requests that could crash the cluster

To avoid issues:

  • Avoid sorting or aggregating on high-cardinality fields
  • Use doc_values: false where aggregation is not needed
  • Monitor heap usage regularly
💡
If you're indexing detailed logs or metrics, understanding high cardinality can help avoid performance issues.

Timeout Settings

Long-running queries can hang and block resources. Set timeouts to avoid this.

Global client-level timeout:

es = Elasticsearch(
    ['localhost:9200'],
    timeout=30,
    max_retries=3,
    retry_on_timeout=True
)

Per-query timeout:

query = {
    'query': {'match_all': {}},
    'timeout': '10s'
}
response = es.search(index='articles', body=query)

Set appropriate timeouts based on query complexity.

Indexing Performance and Refresh Intervals

By default, Elasticsearch refreshes the index every second, which makes documents searchable. For bulk operations, you can disable refresh temporarily to improve performance.

# Turn off refresh
es.indices.put_settings(
    index='articles',
    body={'index': {'refresh_interval': '-1'}}
)

# Perform bulk indexing
bulk(es, document_list, chunk_size=1000)

# Restore refresh interval
es.indices.put_settings(
    index='articles',
    body={'index': {'refresh_interval': '30s'}}
)

# Manually refresh if needed
es.indices.refresh(index='articles')

Mapping Conflicts

Field types in Elasticsearch are fixed once assigned. For example, if a field is first indexed as a number, later attempts to index it as a string will fail.

def safe_index_document(index, doc_id, document):
    try:
        return es.index(index=index, id=doc_id, body=document)
    except Exception as e:
        if 'mapper_parsing_exception' in str(e):
            print(f"Mapping conflict for document {doc_id}: {e}")
            return None
        else:
            raise

# Example usage
doc1 = {'price': 29.99}         # Float
doc2 = {'price': 'twenty'}      # String

safe_index_document('products', 1, doc1)
safe_index_document('products', 2, doc2)  # Triggers conflict

Use explicit mappings where possible to avoid type guessing.

Resource Exhaustion

Expensive queries, large aggregations, and wildcard searches can exhaust system resources if not managed carefully.

  • Monitor node heap memory
  • Avoid unbounded wildcard or regex filters
  • Watch for unassigned or relocating shards
def check_cluster_health():
    health = es.cluster.health()
    stats = es.nodes.stats()

    print(f"Cluster status: {health['status']}")

    for node_id, node in stats['nodes'].items():
        heap_used = node['jvm']['mem']['heap_used_percent']
        print(f"Node {node_id} heap usage: {heap_used}%")
        if heap_used > 85:
            print(f"Warning: High heap usage on node {node_id}")

    return health['status'] == 'green'

These limits aren't blockers, but you need to design around them. Define clear mappings, set timeouts, and monitor resource usage early in development to avoid costly surprises later.

How to Use the Elasticsearch Python Client

The official Elasticsearch client for Python, elasticsearch-py, exposes all Elasticsearch APIs using Python objects and functions. It manages connections, serializes requests, parses responses, and handles retries or errors internally.

You can use it for:

  • Indexing, updating, and deleting documents
  • Running search queries
  • Creating and managing indexes
  • Monitoring cluster health and settings
from elasticsearch import Elasticsearch

# Connect to a local cluster
es = Elasticsearch(['localhost:9200'])

# Index a document
doc = {
    'title': 'Elasticsearch Python Integration',
    'content': 'Complete guide to using Elasticsearch with Python',
    'timestamp': '2024-01-15T10:30:00'
}

response = es.index(index='guides', body=doc)
doc_id = response['_id']

# Retrieve the same document
retrieved = es.get(index='guides', id=doc_id)
print(f"Title: {retrieved['_source']['title']}")

# Update it
es.update(
    index='guides',
    id=doc_id,
    body={'doc': {
        'content': 'Updated content with more details',
        'last_modified': '2024-01-16T14:22:00'
    }}
)

# Delete it
es.delete(index='guides', id=doc_id)

Use Bulk Indexing for Large Datasets

When working with thousands of documents, switch to the bulk helper from elasticsearch.helpers. It reduces the number of network calls and improves indexing speed.

from elasticsearch.helpers import bulk

def bulk_index_documents(documents, index_name):
    actions = [
        {'_index': index_name, '_source': doc}
        for doc in documents
    ]
    
    success, failed = bulk(es, actions, chunk_size=1000)
    print(f"Indexed: {success} documents")
    
    if failed:
        print(f"Failed to index {len(failed)} documents")
        for item in failed:
            print(f"Error: {item}")

# Sample dataset
sample_docs = [{'title': f'Doc {i}', 'content': f'Content {i}'} for i in range(5000)]
bulk_index_documents(sample_docs, 'bulk_test')

Scroll API for Large Search Results

If you need to retrieve more than 10,000 results (default limit), use the Scroll API to paginate through large datasets.

from elasticsearch.helpers import scan

def process_large_dataset(index_name):
    total = 0
    
    for doc in scan(es, index=index_name, query={'query': {'match_all': {}}}, size=1000, scroll='2m'):
        process(doc['_source'])
        total += 1

    print(f"Processed {total} documents")

def process(doc):
    # Replace this with your logic
    pass

process_large_dataset('large_index')

Real-Time Indexing and Search Behavior

Elasticsearch is near real-time; indexed documents become searchable after the next refresh. The default refresh interval is 1s.

To make a document available for immediate search, you can either:

  1. Manually refresh the index
  2. Use the refresh='wait_for' parameter
# Manual refresh example
es.indices.refresh(index='realtime_demo')

# Index with immediate search availability
es.index(
    index='alerts',
    body={
        'alert_type': 'critical',
        'message': 'System failure detected',
        'timestamp': '2024-01-15T10:30:00'
    },
    refresh='wait_for'
)

# Now this will find the document
res = es.search(index='alerts', body={'query': {'term': {'alert_type': 'critical'}}})
print(f"Found: {res['hits']['total']['value']} critical alerts")

Use elasticsearch-dsl for Query Building

The elasticsearch-dsl library is a higher-level abstraction over elasticsearch-py that provides a fluent way to build queries and aggregations. It’s useful for complex queries and readable logic.

from elasticsearch_dsl import Search, Q
from elasticsearch_dsl.connections import connections

# Connect DSL to the cluster
connections.create_connection(hosts=['localhost:9200'])

# Build a search query
s = Search(index='articles')
s = s.query('match', title='elasticsearch')
s = s.filter('term', status='published')

# Run query
res = s.execute()
for hit in res:
    print(hit.title)

Construct Complex Queries with DSL

You can compose queries using Python logic instead of nested JSON. This helps when combining filters, ranges, and aggregations.

def build_query(search_terms, tags=None, date_range=None, min_rating=None):
    s = Search(index='articles')
    
    if search_terms:
        s = s.query('multi_match', query=search_terms, fields=['title^2', 'content'])
    
    if tags:
        s = s.filter('terms', tags=tags)
    
    if date_range:
        s = s.filter('range', publish_date=date_range)
    
    if min_rating:
        s = s.filter('range', rating={'gte': min_rating})
    
    s = s.highlight('title', 'content')
    s = s.sort('-publish_date', '-rating')
    
    return s

query = build_query(
    search_terms='python elasticsearch',
    tags=['tutorial'],
    date_range={'gte': '2024-01-01'},
    min_rating=4.0
)

results = query.execute()
for hit in results:
    print(hit.title, hit.meta.score)

Run Aggregations with DSL

DSL also supports all kinds of aggregations with clean syntax:

def analyze_tags():
    s = Search(index='articles')[:0]  # No documents, only aggregations
    
    # Top tags
    s.aggs.bucket('top_tags', 'terms', field='tags', size=10)
    
    # Monthly post trends
    s.aggs.bucket('monthly_posts', 'date_histogram', field='publish_date', calendar_interval='month')
    
    # Avg rating per tag
    tag_agg = s.aggs.bucket('tag_stats', 'terms', field='tags')
    tag_agg.metric('avg_rating', 'avg', field='rating')
    
    res = s.execute()
    
    for bucket in res.aggregations.top_tags.buckets:
        print(f"Tag: {bucket.key}, Count: {bucket.doc_count}")

    for bucket in res.aggregations.tag_stats.buckets:
        print(f"{bucket.key}: Avg rating {bucket.avg_rating.value}")

analyze_tags()
💡
For application-level insights that go beyond logs and metrics, check out how Elastic APM fits into the observability stack.

Integrating with Other Tools

Elasticsearch often acts as the final stop in a data pipeline. It integrates well with other tools like Logstash and Kafka for ingestion, transformation, and real-time streaming.

Sending Data to Logstash

Logstash is typically used to preprocess logs or metrics, parsing, enriching, and forwarding them to Elasticsearch. While Python can index directly into Elasticsearch, you might route data through Logstash to apply filtering rules or convert formats.

Here’s how to send data from Python to a Logstash TCP input:

import json
import socket
from datetime import datetime

def send_to_logstash(data, host='localhost', port=5000):
    """Send a JSON log entry to Logstash over TCP."""
    if 'timestamp' not in data:
        data['timestamp'] = datetime.utcnow().isoformat()

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((host, port))
        sock.send(json.dumps(data).encode('utf-8') + b'\n')
        sock.close()
        print(f"Sent: {data}")
    except Exception as e:
        print(f"Logstash send failed: {e}")

Example:

send_to_logstash({
    'level': 'INFO',
    'message': 'User login successful',
    'user_id': 12345,
    'ip_address': '192.168.1.100'
})

This requires a matching TCP input setup in your Logstash config, often using the tcp input plugin.

Streaming Data from Kafka

For high-throughput pipelines, Apache Kafka is a popular buffer between producers and Elasticsearch. It decouples data ingestion and indexing, making the system more resilient and scalable.

Python can consume Kafka messages using kafka-python and index them directly into Elasticsearch.

from kafka import KafkaConsumer
import json
from datetime import datetime

def kafka_to_elasticsearch():
    consumer = KafkaConsumer(
        'user_events',
        bootstrap_servers=['localhost:9092'],
        group_id='elasticsearch_indexer',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    for msg in consumer:
        try:
            doc = msg.value
            doc['processed_at'] = datetime.utcnow().isoformat()
            doc['kafka_offset'] = msg.offset
            doc['kafka_partition'] = msg.partition

            es.index(index='user_events', body=doc)
        except Exception as e:
            print(f"Kafka processing error: {e}")

To run the pipeline in the background:

import threading

def start_kafka_pipeline():
    thread = threading.Thread(target=kafka_to_elasticsearch)
    thread.daemon = True
    thread.start()
    return thread

# Example
# pipeline = start_kafka_pipeline()

If you're dealing with large volumes, consider batching messages and using the bulk API for better indexing performance.

💡
Monitoring Kafka pipelines? These Kafka producer metrics help spot throughput issues before they affect Elasticsearch ingest.

Final Thoughts

Elasticsearch works well as a fast, scalable search engine. But managing logs, metrics, and traces across distributed systems can get complicated, especially at scale.

Last9 simplifies that layer. It integrates with your existing OpenTelemetry setup, supports high-cardinality data without memory bloat, and lets you trace requests across services with minimal config.

If you're indexing telemetry into Elasticsearch, you can forward that data to Last9 to:

  • Correlate logs, metrics, and traces automatically using semantic conventions.
  • Query telemetry using structured filters, no need to craft JSON manually.
  • Debug production issues with Last9 MCP, which brings real request context into your IDE.

This makes Last9 a useful layer on top of Elasticsearch, not a replacement, especially for teams working with Python services and OpenTelemetry-based observability.

Get started for free with us today!

FAQs

What is Elasticsearch in Python?
Elasticsearch in Python typically refers to using the elasticsearch-py client library, which allows Python applications to interact with an Elasticsearch cluster, indexing data, running searches, and managing indices programmatically.

What is the best GUI for Elasticsearch?
The most widely used GUI is Kibana, which is officially maintained by Elastic. For OpenSearch (formerly Open Distro), there's OpenSearch Dashboards. Other community GUIs include ElasticHQ, Cerebro, and Kopf.

How to connect Elasticsearch with Python?
Install the official client using pip:

pip install elasticsearch

Then connect using:

from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])

You can now use es.index(), es.search(), etc., to interact with Elasticsearch.

What is the limit of Elasticsearch in Python?
Limits come from Elasticsearch itself, not the client. Key constraints include:

  • Document size: Max 2GB per document (but much lower in practice).
  • Field limits: Default max 1,000 fields per index.
  • Nested objects: Limited to 20 levels deep.
  • Memory usage: Aggregations on high-cardinality fields can trigger circuit breakers.

The Python client will reflect any errors related to these constraints during requests.

What Is the Elasticsearch Python Client?
The elasticsearch-py client is the official low-level Python client for Elasticsearch. It wraps Elasticsearch’s REST API and handles connection pooling, retries, and serialization. There's also elasticsearch-dsl, a higher-level, Pythonic abstraction built on top of it.

How do I install and set up Elasticsearch with Python?

  1. Install Elasticsearch (locally or via Docker).
  2. Install the Python client:
pip install elasticsearch
  1. Connect and index a document:
es = Elasticsearch()
es.index(index='test', body={'message': 'hello world'})

Can I use Elasticsearch with Python for machine learning tasks?
Yes, with limitations. Elasticsearch’s ML features (like anomaly detection, forecasting, classification) are available through X-Pack and require a commercial license. You can configure these ML jobs using Python via the client, but for custom ML models, use Python libraries like scikit-learn or TensorFlow and store results in Elasticsearch.

Can I delete only certain data from within indices?
Yes. Use the delete_by_query API to remove documents matching specific criteria:

es.delete_by_query(index='logs', body={
    'query': {
        'range': {
            'timestamp': {'lt': 'now-30d'}
        }
    }
})

Can we use Python Elasticsearch Client or elasticsearch-dsl library with Open Distro version?
Yes. Both elasticsearch-py and elasticsearch-dsl are compatible with Open Distro and OpenSearch, as long as the APIs you use are supported by the Open Distro version you’re on. Some X-Pack-only features won’t work unless explicitly supported.

How to connect Elasticsearch with Django?
Here’s a basic setup:

  1. Install the client:
pip install elasticsearch
  1. Use it in a Django view or task:
from elasticsearch import Elasticsearch
es = Elasticsearch()

def index_blog_post(post):
    doc = {
        'title': post.title,
        'content': post.content,
        'published_at': post.published_at.isoformat()
    }
    es.index(index='blog', id=post.id, body=doc)

For full integration (with signals, real-time sync, search views), consider using django-elasticsearch-dsl.

How do I perform a search query in Elasticsearch using Python?
You can use a match query like this:

search_body = {
    'query': {
        'match': {
            'content': 'elasticsearch python'
        }
    }
}
response = es.search(index='articles', body=search_body)

for hit in response['hits']['hits']:
    print(hit['_source']['title'])

For more complex queries, consider using the elasticsearch-dsl library.

How do I perform a search query in Elasticsearch using Python?
You can use a match query like this:

search_body = {
    'query': {
        'match': {
            'content': 'elasticsearch python'
        }
    }
}
response = es.search(index='articles', body=search_body)

for hit in response['hits']['hits']:
    print(hit['_source']['title'])

For more complex queries, consider using the elasticsearch-dsl library.

Authors
Anjali Udasi

Anjali Udasi

Helping to make the tech a little less intimidating. I

Contents

Do More with Less

Unlock high cardinality monitoring for your teams.