Since Apache Druid 29, first
and last
aggregations for numerical values can be performed at ingestion.
This expands the use cases of roll-ups.
We’ll showcase an example and provide insights into additional use cases for Druid rollups.
What is a Rollup?
A rollup is a pre-aggregation done at ingestion time, which can dramatically reduce storage requirements for a Druid datasource (table). The trade-off is sacrificing the ability to query individual rows and losing precise timestamp granularity. Refer to the official documentation on rollup for an in-depth overview.
Rollup is a pre-aggregation, and first
and last
are aggregation functions.
Prior to Apache Druid 29, first
and last
pre-aggregation only supported string data types.
Since version 29, first
and last
also support numerical values for pre-aggregation, leading to more use cases for rollups and enabling quick data granularity reduction.
Here are some use cases for the first
and last
aggregation functions:
- Tracking opening and closing stock market prices.
- Creating discrete sensor readings for reports, such as hourly temperature readings.
- Creating session tracking activity for users accessing a website.
Defining a first
and last
Ingestion Rollup in Apache Druid
To create a first or last pre-aggregation, use the numerical data type with a First
or Last
suffix:
{
"type": "doubleLast",
"name": "latitude",
"fieldName": "latitude"
}
{
"type": "longFirst",
"name": "speed",
"fieldName": "speed"
}
Under the Hood
When pre-aggregations of first
and last
are computed, the timestamp of the aggregation is also stored.
This ensures that when query time aggregation is performed, the correct value can be used.
Query the column without the aggregations function to see, the resulting value will appear as JSON with an lhs
element for the actual timestamp of the value and rhs
for the actual value.
SELECT "latitude" FROM opensky LIMIT 1
{"lhs":1728344664000,"rhs":48.2982}
The actual datatype is a [COMPLEX<serializablePairLongDouble>]
.
Check out the package org.apache.druid.query.aggregation
to fully appreciate this feature.
While it is displayed as JSON, it is not actually JSON (JSON functions, such as json_query()
, do not operate on them).
Prior to Apache Druid 29, only string data types were handled this way; numerical values preserved only the value (the rhs
), resulting in scenarios where the actual first
or last
aggregations were ambiguous.
Use Case: Sub-sampling Historical Data
For geospatial data, real-time information on all data points is vital for current events, so rollups may seem unsuitable.
However, over time, keeping all those data points becomes less important.
Using Druid rollups with the first
or last
aggregate provides an elegant way to sub-sample historical data, reducing storage costs.
At ingestion time, set the queryGranularity
to none, while also enabling rollups.
"metricsSpec": [
{
"type": "doubleLast",
"name": "latitude",
"fieldName": "latitude"
},
{
"type": "doubleLast",
"name": "longitude",
"fieldName": "longitude"
}
...
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "fifteen_minute",
"queryGranularity": "none",
"rollup": true,
"intervals": null
}
To track aircraft activity, simply query the dataset as though it is an aggregation. For data that has not been rolled up, it acts as if it was queried without aggregation.
SELECT
__time,
id,
callsign,
latest("latitude") "latitude",
latest("longitude") "longitude",
latest("velocity") "velocity"
FROM opensky
GROUP BY 1, 2, 3
ORDER BY __time ASC
The benefit is realized when the need for all data points diminishes, leveraging Druid compaction to quickly purge data.
Compaction
Compaction is performed by running a compaction task. The primary components are the datasource, time interval, and granularity specification. In this example, we increase segment granularity from 15 minutes to 1 hour and, more importantly, set the query granularity to 15 minutes.
{
"type" : "compact",
"dataSource" : "opensky",
"interval": "2024-10-09T00:00:00.000Z/2024-10-08T02:00:00.000Z",
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 500000,
"maxRowsInMemory" : 25000
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "fifteen_minute",
"rollup": true
}
}
With this single administration task (no coding necessary), data is reduced once the compaction is completed. Due to the last aggregation being performed on all metrics (latitude, longitude, position, velocity, altitude, etc.), the last data point for a given aircraft is the one that remains in this datasource.
Using the same query pulls in the aggregated data for the completed historical data, with no query changes necessary.
Storage Savings (How Effective Are the Rollups)
It’s always good to verify how well rollups work.
SELECT
'opensky' "Datasource",
SUM("count") "Logical Count",
COUNT("count") "Physical Count",
SUM("count")/(COUNT("count")*1.0) "Rollup Factor"
FROM opensky
GROUP BY 1
Before Compaction
Before compaction, exact matches can lead to rollups when source systems provide duplicate events, such as opensky.
Datasource | Logical Count | Physical Count | Rollup Factor |
---|---|---|---|
opensky | 7583032 | 6711892 | 1.1298 |
After Compaction
With timestamps rounded to the query granularity and matching dimensions, a rollup occurs.
Datasource | Logical Count | Physical Count | Rollup Factor |
---|---|---|---|
opensky | 7583032 | 359551 | 21.0903 |
Query
The query is written as an aggregation, and if the result of the query handles the compaction. If the rollup is perfect, then no additional aggregation is done, other than the single aggregated row. However, if the rollup is imperfect, that aggregation is then completed by this query.
SELECT
__time,
id,
callsign,
latest("latitude") "latitude",
latest("longitude") "longitude",
latest("velocity") "velocity",
sum("count") "count"
FROM opensky
GROUP BY 1, 2, 3
ORDER BY __time ASC
LIMIT 5
The count metric tracks how many records were “rolled up”.
__time | id | callsign | latitude | longitude | velocity | count |
---|---|---|---|---|---|---|
2024-10-09T12:35:00.000Z | 151e5b | PBD526 | 56.6896 | 43.7677 | 27.8 | 6 |
2024-10-09T12:35:00.000Z | 300990 | LSI102 | 44.6143 | 33.9275 | 396.78 | 91 |
2024-10-09T12:35:00.000Z | 800732 | AIC121 | 51.742 | 49.6643 | 244.94 | 6 |
2024-10-09T12:40:00.000Z | 151de3 | SDM6857 | 57.5982 | 38.4103 | 28.03 | 67 |
2024-10-09T12:40:00.000Z | 800736 | AIC113 | 51.5912 | 50.1971 | 244.94 | 19 |
Geospatial Data Types
Geospatial data types (geo
and spatial
) are not supported, which is why this example uses latitude and longitude as double metrics.
Takeaways
Check out the
druid-rollup-doubleLast
within the Github dev-local-demos repository for an example used in this article. It leverages a container-based ecosystem provided at dev-local. This repository also includes demonstrations likedruid-late
.If you are running on Mac Silicon, check out the druid-m1 project for building an arm64-based image. The project has been updated to use version 30.0.1 of Apache Druid.
With
first
andlast
support for numerical data, there are more opportunities to leverage rollups within Druid, as illustrated in this article. The inclusion of the timestamp in the internal complex data type ensures the last value is truly the last value (and the first value is the first value).Rollups are a compromise of precision for storage, and the inclusion of a given dimension or metric can dramatically reduce rollup effectiveness. Verify the effectiveness of your rollup after any datasource configuration updates.
Always measure rollup effectiveness and recheck whenever a datasource schema is modified and when data changes.
Reach Out
Please contact us if you would like to discuss online analytic processing or event-streaming.