Apr 15th, ‘23/7 min read

High Cardinality? No Problem! Stream Aggregation FTW

Managing high cardinality in time series data is tough but crucial. Learn how Levitate’s streaming aggregations can help tackle it efficiently.

High Cardinality? No Problem! Stream Aggregation FTW

I want to observe the latency of requests in my application. I use a Prometheus instrumentation library to emit the rightful metrics for this. I visit the Prometheus page and duly instrument my handlers, and right away, the metrics start flowing.

# HELP http_request_duration_seconds Duration of all HTTP requests
# TYPE http_request_duration_seconds histogram
http_request_duration_seconds_bucket{code="200",handler="login",method="get",le="+Inf"} 5
http_request_duration_seconds_count{code="200",handler="login",method="get"} 5

# HELP http_requests_total Count of all HTTP requests
# TYPE http_requests_total counter
http_requests_total{code="200",method="get"} 5

The metric is available across the following labels

┌────────────┬──────────────┐
│ Labels     │  Sample      │
├────────────┼──────────────┤
│ code       │          200 │
│ method     │       /login │
│ handler    │         POST │
│ le         │         14.5 │
└────────────┴──────────────┘

These are good enough to answer a handful of questions:

  • How many requests errored out in the last x minutes
  • Average latency of /login requests
  • P95 of /login requests that did not result in an error
However, this data model is limited; How do we answer if the requests fail only across one availability zone or one set of Pods?

Suppose my instrumented application running on 2 EC2 instances emits the same metric and is pushed to a Time series database (TSDB henceforth) like Thanos and Mimir. Is there any difference between these 2-time series?

http_request_duration_seconds_count{code="200",handler="login",method="get"} 5

http_request_duration_seconds_count{code="200",handler="login",method="get"} 10

So they are duplicates. When duplicate metrics are encountered, a TSDB may choose to do the following:

  • Drop one of them — the one with the highest or lowest value.
  • Aggregate them — through sum, min, max, etc.

Dropping them is easy. But what kind of aggregate is the right one? The answer — it depends.

One cannot simply assume that the two metrics should be summed. While summing is valid, it may not be for other metrics like cpu_free. Summing cpu_free would result in incorrect data.

Also, these must be handled as duplicates as the database cannot tell if the metric line is resubmitted in case of crash replay. The only way to deal with this is to introduce another distinct label. For example, say we are running these on virtual machines. We will add the hostname as another label.

The label can be added:

  • Either via Instrumentation
  • Or at scrape time

So the new metric will look like this:

http_request_duration_seconds_count{hostname="10.0.0.1", code="200",handler="login",method="get"} 5

http_request_duration_seconds_count{hostname="10.0.0.2", code="200",handler="login",method="get"} 10
This is how Cardinality grows. For better/granular answers, we are forced to emit finer metrics, and they result in an explosive number of TimeSeries.
┌────────────┬──────────────┐
│ Labels     │  Cardinality │
├────────────┼──────────────┤
│ code       │           10 │
│ method     │            4 │
│ handler    │           15 │
│ le         │           60 │
└────────────┴──────────────┘
  • There are 10 possible status codes
  • 4 methods (POST, GET, DELETE, PUT), and
  • 15 handlers.
  • Latency is tracked in 60 buckets ranging from milliseconds to hours.

This means there are 36,000 (10 * 4 * 15 * 60) unique combinations, also known as 36,000 TimeSeries, for the http_request_duration_seconds_count metric.

┌────────────┬──────────────┐
│ Labels     │  Cardinality │
├────────────┼──────────────┤
│ code       │           10 │
│ method     │            4 │
│ handler    │           15 │
│ le         │           60 │
│ hostname   │          100 │
└────────────┴──────────────┘

If I add a hostname to the label and have 100 hosts, the number becomes 3,600,000, or 3 million. This illustrates how quickly cardinality can grow — it multiplies rapidly!

How Cardinality grows Source: Grafana
Source: Grafana

Many providers and client libraries address this issue by dropping extra labels. However, as seen in the example above, having a high cardinality is often necessary to obtain meaningful answers.

Attempting to drop the labels results in the same problem of duplication that we began with. If I remove the hostname from the metric above, we are back to the problem of the database seeing the two records as duplicates!!

There is another answer to this.


Streaming Aggregations

Ordinarily, as data comes in, it’s written as it is to the store. Once it’s queried, the querier fetches the data, partitions it by time and groupers (if specified), and runs a mathematical operator to return the results. This is done every time the query is sent.

Regular Query flow example for a query: sum by (type) (metric{}[3m])

Normal query flow
Normal query flow

Scanning large and massive samples can be slow and resource-consuming. It can slow down queries and overload the system, which can only handle a limited number of parallel queries.

Pre-Aggregated Data

Ways to expedite and reduce the load is

  • Drop the labels and reduce the cardinality and resolution
  • Keep increasing the resources for computation and aggregation
  • Use materialized views like constructs to rely on pre-aggregation

When you enable Streaming Aggregates, the most expensive queries are combined ahead of time.

Here’s the query when Pre-Aggregation is enabled.

Query flow when pre-aggregated
Query flow when pre-aggregated

So in the previous case where every unique host sends the same metric, I can create multiple streaming aggregations that answer the following questions:

  • Total sum of errors without saving the hostname as cardinality
  • Buckets of request Latency, without the headache of hostname in cardinality
  • Number of hosts where error_ratio > 1%
sum without (instance, pod) (grpc_client_handling_seconds_bucket{})

This is what the query would have looked like. It would have to scan through myriads of time series blocks. But when this data is pre-aggregated, there are benefits:

  • Economical. Less Computation while retrieving the blocks
  • Fetched faster. No more slow Dashboards
  • Reduced Maintenance. The lesser the number of servers, the smaller the possibility of Murphy’s law

Impact on Cardinality

Let’s re-evaluate what the numbers would look like to answer the same set of questions.

This table provides the necessary labels to serve each query.

┌─────────────────────────────────────────┬────────────────────────────────────┐
│                Query                    │             Labels Needed          |  
├─────────────────────────────────────────┼────────────────────────────── ─────┤
│ Total errors in last 5 minutes          │  code, hostname                    |  
│ Average latency of /login requests      │  le, handler, hostname             |  
│ P95 of successful POST /login requests  │  le, handler, hostname, code, method 
└─────────────────────────────────────────┴──────────────────────────── ───────┘

Here is a comparison of the required cardinality in the database before and after streaming aggregation (without hostname).

┌─────────────────────────────────────────┬─────────────┬─────────┐
│                Query                    │   Before    │  After  │
├─────────────────────────────────────────┼─────────────┼─────────┤
│ Total errors in last 5 minutes          │      1,000  │      10 │
│ Average latency of /login requests      │     90,000  │     900 │
│ P95 of successful POST /login requests  │  3,600,000  │  36,000 │
└─────────────────────────────────────────┴─────────────┴─────────┘
💡
Explanation: The Average latency of /login requests query requires le, handler, and hostname. However, since this query doesn't care about code or method, the time series required to serve this query is limited to 15 handlers with 60 buckets each, resulting in 900 Time Series. However, since these come from approximately 100 hosts, the number of time series rises to 90,000. This results in 90,000 samples saved and queried every single time. But when these time series are saved as pre-aggregated without the hostname, the number drops down to 900.

The use cases of Streaming Aggregation are not just limited to saving storage space. Once cardinality is limited:

  • It can expedite query performance.
  • It can reduce the resources required for the same set of queries.

Sometimes, pre-defined aggregations may not be enough and the resulting output table may contain millions of rows. In such cases, one can define streaming aggregate rules for mission-specific queries that are routinely needed. For example, finding pods/hosts where 5xx > 100 can be achieved using a regular PromQL query.

sum(increase(http_request_duration_seconds_count{code=~"5.*"}[1m])) \\
    by (hostname) > 100

The current pipeline can be transformed into a more efficient streaming aggregation pipeline that saves a comprehensive list of rogue hostnames where errors may originate over time and saves that as a separate metric. By doing so, the database can avoid scanning millions of time series before reducing the data set to match the query, saving significant processing time and resources. Additionally, this new metric, which is fast to query, can be used to flag real-time anomalies, thereby facilitating proactive monitoring of the system and improving its overall reliability.


Aggregating over time

There are two choices of Windowing when Aggregating:

  • Tumbling Window
  • Sliding Window

Tumbling Windows represent the stream’s consistent and separate time intervals. First, we group all metrics into buckets of n minutes, which we specify using [n], and then group them by metric name + labels (or mentioned as properties).

This is an illustration for 1 minute recorded over a 3-minute interval. For instance, the samples with timestamp values [0:00:00–0:01:00) are in the first period. Samples with timestamp values [0:01:00–0:02:00) are in the second period. And so on.

https://cdn-images-1.medium.com/max/1600/1*1jEX6pZl4CQcHwO_rkYOpw.png

Achieving Complex Pipelines Like Histograms

Effectively, a Histogram requires three metrics.

  • <metric_name>_bucket
  • <metric_name>_sum
  • <metric_name>_count

So, when dealing with Histograms, you would apply the aggregation across three metrics instead of one.


With Levitate, we recently launched PromQL-based Streaming Aggregation. Levitate absorbs all complexities of the windowing, grouping, and aggregating at Scale.

Learn more about differences between Streaming Aggregations vs. Recording Rules.

We understand your High Cardinality needs and offer a way to deal with them rather than avoid them. Head onto our Streaming Aggregations section to learn more.

🚿 Streaming Aggregation
This document explains how to setup streaming aggregation pipelines in Levitate.

Streaming aggregation has been a long-standing concept in the data processing world, providing new observability capabilities that may have otherwise been avoided due to concerns about performance or resource requirements to store the necessary data. This powerful concept can significantly boost SLO queries, enhancing customer experiences. Furthermore, it can effectively monitor system performance in real-time and detect anomalous components of the infrastructure without requiring large amounts of resources behind the query and storage layers. Achieving such efficiency is worth striving for.

Newsletter

Stay updated on the latest from Last9.

Handcrafted Related Posts