Skip to content

Add implementation for exponential histogram merging and percentiles #131220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 41 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
43dd073
Initial experiments commit
JonasKunz Jul 8, 2025
da159a9
Clean up generator
JonasKunz Jul 8, 2025
fa4efe0
Clean up merger
JonasKunz Jul 8, 2025
fba967f
More tests, a bit of cleanup
JonasKunz Jul 9, 2025
cef3b11
Stash benchmark changes
JonasKunz Jul 9, 2025
aac9f6d
Merge remote-tracking branch 'elastic/main' into exponentional-histos
JonasKunz Jul 10, 2025
6a2b62f
spotless, checkstyle
JonasKunz Jul 10, 2025
eb955cd
more build fixes
JonasKunz Jul 10, 2025
2eb5fdd
Fix license headers
JonasKunz Jul 10, 2025
fd7064e
spotless round 2
JonasKunz Jul 10, 2025
66b5e2c
Fix tests, implement benchmarks
JonasKunz Jul 10, 2025
2f293d0
Reduce max scale to preserve numeric accuracy
JonasKunz Jul 10, 2025
91193bc
Check for sane scale and indices
JonasKunz Jul 10, 2025
92efdcf
Fix and clean percentile computation
JonasKunz Jul 11, 2025
e6924e9
Add some tests based on TDigestTest
JonasKunz Jul 11, 2025
cab3fdf
Clean up, bug fixes and javadoc
JonasKunz Jul 14, 2025
454a9cc
Remove dead code
JonasKunz Jul 14, 2025
486a8bd
A bit more javadoc
JonasKunz Jul 14, 2025
7c6655b
AI-assisted javadoc and spotless
JonasKunz Jul 15, 2025
a980c0e
Readme bullet points
JonasKunz Jul 15, 2025
a46914a
Add readme
JonasKunz Jul 15, 2025
fefd39b
Add testcase verifying index limits are not exceeded on upscaling
JonasKunz Jul 15, 2025
ac804c7
Replaced upscaling floating point arithmetic with faster and more acc…
JonasKunz Jul 15, 2025
5e1ca08
Readme fixes and clarifications
JonasKunz Jul 17, 2025
c091758
Review fixes
JonasKunz Jul 17, 2025
25be13d
Refactor bucket representation
JonasKunz Jul 17, 2025
6100bc6
Add test case for quantile in zero-bucket
JonasKunz Jul 17, 2025
7c99c81
Add more perecentiles for testing
JonasKunz Jul 17, 2025
b308838
Improved quantile algorithm to only iterate once over the buckets
JonasKunz Jul 17, 2025
311f44b
Fix quantile computation and error bound in tests
JonasKunz Jul 18, 2025
54fd41d
Update randomization in remaining tests
JonasKunz Jul 18, 2025
7dad089
Fix javadoc and benchmarks
JonasKunz Jul 18, 2025
522a72a
Checkstyle
JonasKunz Jul 18, 2025
33b626b
Merge remote-tracking branch 'elastic/main' into exponentional-histos
JonasKunz Jul 18, 2025
28ccef1
Remove DoubleStream usage
JonasKunz Jul 22, 2025
fe311d8
Merge remote-tracking branch 'elastic/main' into exponentional-histos
JonasKunz Jul 22, 2025
89cfb02
Update todos
JonasKunz Jul 22, 2025
39257b8
[CI] Auto commit changes from spotless
Jul 22, 2025
57fd335
Review fixes
JonasKunz Jul 24, 2025
cc13e0f
Merge remote-tracking branch 'origin/exponentional-histos' into expon…
JonasKunz Jul 24, 2025
2513ca2
Merge remote-tracking branch 'elastic/main' into exponentional-histos
JonasKunz Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dependencies {
api(project(':x-pack:plugin:esql:compute'))
implementation project(path: ':libs:native')
implementation project(path: ':libs:simdvec')
implementation project(path: ':libs:exponential-histogram')
expression(project(path: ':modules:lang-expression', configuration: 'zip'))
painless(project(path: ':modules:lang-painless', configuration: 'zip'))
nativeLib(project(':libs:native'))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.benchmark.exponentialhistogram;

import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.profile.GCProfiler;
import org.openjdk.jmh.profile.StackProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@Threads(1)
@State(Scope.Thread)
public class ExponentialHistogramGenerationBench {

@Param({ "100", "500", "1000", "5000", "10000", "20000" })
int bucketCount;

@Param({ "NORMAL", "GAUSSIAN" })
String distribution;

Random random;
ExponentialHistogramGenerator histoGenerator;

double[] data = new double[1000000];

@Setup
public void setUp() {
random = ThreadLocalRandom.current();
histoGenerator = new ExponentialHistogramGenerator(bucketCount);

Supplier<Double> nextRandom = () -> distribution.equals("GAUSSIAN") ? random.nextGaussian() : random.nextDouble();

// TODO: why is this here for T-DIGEST?
for (int i = 0; i < 10000; ++i) {
histoGenerator.add(nextRandom.get());
}

for (int i = 0; i < data.length; ++i) {
data[i] = nextRandom.get();
}
}

@State(Scope.Thread)
public static class ThreadState {
int index = 0;
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void add(ThreadState state) {
if (state.index >= data.length) {
state.index = 0;
}
histoGenerator.add(data[state.index++]);
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder().include(".*" + ExponentialHistogramGenerationBench.class.getSimpleName() + ".*")
.warmupIterations(5)
.measurementIterations(5)
.addProfiler(GCProfiler.class)
.addProfiler(StackProfiler.class)
.build();

new Runner(opt).run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.benchmark.exponentialhistogram;

import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@Threads(1)
@State(Scope.Thread)
public class ExponentialHistogramMergeBench {

@Param({ "1000", "5000" })
int bucketCount;

@Param({ "0.01", "0.1", "0.25", "0.5", "1.0", "2.0" })
double mergedHistoSizeFactor;

Random random;
ExponentialHistogramMerger histoMerger;

ExponentialHistogram[] toMerge = new ExponentialHistogram[10_000];

@Setup
public void setUp() {
random = ThreadLocalRandom.current();
histoMerger = new ExponentialHistogramMerger(bucketCount);

ExponentialHistogramGenerator initial = new ExponentialHistogramGenerator(bucketCount);
for (int j = 0; j < bucketCount; j++) {
initial.add(Math.pow(1.001, j));
}
ExponentialHistogram initialHisto = initial.get();
int cnt = getBucketCount(initialHisto);
if (cnt < bucketCount) {
throw new IllegalArgumentException("Expected bucket count to be " + bucketCount + ", but was " + cnt);
}
histoMerger.add(initialHisto);

int dataPointSize = (int) Math.round(bucketCount * mergedHistoSizeFactor);

for (int i = 0; i < toMerge.length; i++) {
ExponentialHistogramGenerator generator = new ExponentialHistogramGenerator(dataPointSize);

int bucketIndex = 0;
for (int j = 0; j < dataPointSize; j++) {
bucketIndex += 1 + random.nextInt(bucketCount) % (Math.max(1, bucketCount / dataPointSize));
generator.add(Math.pow(1.001, bucketIndex));
}
toMerge[i] = generator.get();
cnt = getBucketCount(toMerge[i]);
if (cnt < dataPointSize) {
throw new IllegalArgumentException("Expected bucket count to be " + dataPointSize + ", but was " + cnt);
}
}
}

private static int getBucketCount(ExponentialHistogram histo) {
int cnt = 0;
for (ExponentialHistogram.BucketIterator it : List.of(histo.negativeBuckets(), histo.positiveBuckets())) {
while (it.hasNext()) {
cnt++;
it.advance();
}
}
return cnt;
}

@State(Scope.Thread)
public static class ThreadState {
int index = 0;
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void add(ThreadState state) {
if (state.index >= toMerge.length) {
state.index = 0;
}
histoMerger.add(toMerge[state.index++]);
}
}
5 changes: 5 additions & 0 deletions gradle/verification-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
<sha256 value="3366d2c88fb576e486d830f521184e8f1839f8c15dcd2151a3f6e1f62b0b37a0" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="ch.obermuhlner" name="big-math" version="2.3.2">
<artifact name="big-math-2.3.2.jar">
<sha256 value="693e1bb7c7f5184b448f03c2a2c0c45d07d8e89e4641fdc31ab0a8057027f43d" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="ch.randelshofer" name="fastdoubleparser" version="0.8.0">
<artifact name="fastdoubleparser-0.8.0.jar">
<sha256 value="10fe288fd7a2cdaf5175332b73529f9abf8fd54dcfff317d6967c0c35ffb133b" origin="Generated by Gradle"/>
Expand Down
61 changes: 61 additions & 0 deletions libs/exponential-histogram/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
This library provides an implementation of merging and analysis algorithms for exponential histograms based on the [OpenTelemetry definition](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram). It is designed as a complementary tool to the OpenTelemetry SDK, focusing specifically on efficient histogram merging and accurate percentile estimation.

## Overview

The library implements a sparse storage approach where only populated buckets consume memory and count towards the bucket limit. This differs from the OpenTelemetry implementation, which uses dense storage. While dense storage allows for O(1) time insertion of individual values, our sparse representation requires O(log m) time where m is the bucket capacity. However, the sparse representation enables more efficient storage and provides a simple merging algorithm with runtime linear in the number of populated buckets. In addition, this library also provides an array-backed sparse storage, ensuring cache efficiency.

The sparse storage approach offers significant advantages for [distributions with fewer distinct values](#distributions-with-few-distinct-values) than the bucket count, allowing the library to achieve near-exact representation of such distributions. This makes it suitable not only for exponential histograms but also as a universal solution for handling explicit bucket histograms.

## Merging Algorithm

The merging algorithm works similarly to the merge-step of merge sort.
We simultaneously walk through the buckets of both histograms in order, merging them on the fly as needed.
If the total number of buckets in the end would exceed the bucket limit, we scale down as needed.

Before we merge the buckets, we need to take care of the special zero-bucket and bring both histograms to the same scale.

For the zero-bucket, we merge the zero threshold from both histograms and collapse any overlapping buckets into the resulting new zero bucket.

In order to bring both histograms to the same scale, we can make adjustments in both directions:
We can increase or decrease the scale of histograms as needed.

See the [upscaling section](#upscaling) for details on how the upscaling works.
Upscaling helps prevent the precision of the result histogram merged from many histograms from being dragged down to the lowest scale of a potentially misconfigured input histogram. For example, if a histogram is recorded with a too low zero threshold, this can result in a degraded scale when using dense histogram storage, even if the histogram only contains two points.

### Upscaling

In general, we assume that all values in a bucket lie on a single point: the point of least relative error. This is the point `x` in the bucket such that:

```
(x - l) / l = (u - x) / u
```

Where `l` is the lower bucket boundary and `u` is the upper bucket boundary.

This assumption allows us to increase the scale of histograms without increasing the bucket count. Buckets are simply mapped to the ones in the new scale containing the point of least relative error of the original buckets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that (a) upscaling doubles the number of buckets, and (b) half of the buckets will be empty after upscaling?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the case for a dense representation.

For sparse, after upscaling all values from a bucket will still fall into a single bucket (by design, do to our assumption of all values lying at a single point, the point of least relative error). So upscaling keeps the number of populated buckets exactly the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but conceptually it's the same.. I wonder if a strategy to evenly split the weights between the two "children" of each bucket would be more accurate.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of assuming that all values in a bucket lie on the "point of least relative error" comes from the DDSketch and UDDSketch paper's percentile algorithms. I think if you chose any other distribution for the values within a bucket, you get a worse worst-case relative error.

Also this would make up-scaling expensive if not unfeasible: E.g. upsclaing by 10 would increase the number of populated buckets by a factor of 2^10=1024. While with our iteration approach we wouldn't need to materialize this (so no memory penalty), we would still need to iterate over all of those buckets during merge, costing CPU time.

In addition, you would likely end up with fractional counts for buckets: E.g. what if your bucket has a count of three and you want to upscale it? How do you distribute this count among the new buckets?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense.. It'd be nice to have a comparison of percentile values for various distributions, between TDigest, ExponentialHistogram and HDR.


This can introduce a small error, as the original center might be moved slightly. Therefore, we ensure that the upscaling happens at most once to prevent errors from adding up.
The higher the amount of upscaling, the less the error (higher scale means smaller buckets, which in turn means we get a better fit around the original point of least relative error).

## Distributions with few distinct values

The sparse storage model only requires memory linear to the total number of buckets, while dense storage needs to store the entire range of the smallest and biggest buckets.

This offers significant benefits for distributions with fewer distinct values:
If we have at least as many buckets as we have distinct values to store in the histogram, we can almost exactly represent this distribution.
This can be achieved by simply maintaining the scale at the maximum supported value (so the buckets become the smallest).
At the time of writing, the maximum scale is 38, so the relative distance between the lower and upper bucket boundaries is (2^2(-38)).

This is best explained with a concrete example:
If we store, for example, a duration value of 10^15 nano seconds (= roughly 11.5 days), this value will be stored in a bucket that guarantees a relative error of at most 2^2(-38), so 2.5 microseconds in this case.
As long as the number of values we insert is lower than the bucket count, we are guaranteed that no down-scaling happens: In contrast to dense storage, the scale does not depend on the spread between the smallest and largest bucket index.

### Handling Explicit Bucket Histograms

We can make use of this property to convert explicit bucket histograms (https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram) to exponential ones by again assuming that all values in a bucket lie in a single point:
* For each explicit bucket, we take its point of least relative error and add it to the corresponding exponential histogram bucket with the corresponding count
* The open, upper, and lower buckets, including infinity, will need special treatment, but these are not useful for percentile estimates anyway

This gives us a great solution for universally dealing with histograms:
When merging exponential histograms generated from explicit ones, the result is exact as long as the number of distinct buckets from the original explicit bucket histograms does not exceed the exponential histogram bucket count. As a result, the computed percentiles will be exact with only the error of the original conversion.
In addition, this allows us to compute percentiles on mixed explicit bucket histograms or even mix them with exponential ones by just using the exponential histogram algorithms.
18 changes: 18 additions & 0 deletions libs/exponential-histogram/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

// TODO: publish this when ready?
//apply plugin: 'elasticsearch.publish'
apply plugin: 'elasticsearch.build'

dependencies {
testImplementation(project(":test:framework"))
testImplementation('ch.obermuhlner:big-math:2.3.2')
testImplementation('org.apache.commons:commons-math3:3.6.1')
}
Loading