Apache Druid Rollups with ‘first’ and ’last’ Aggregations

blog-post

Since Apache Druid 29, first and last aggregations for numerical values can be performed at ingestion. This expands the use cases of roll-ups. We’ll showcase an example and provide insights into additional use cases for Druid rollups.

What is a Rollup?

A rollup is a pre-aggregation done at ingestion time, which can dramatically reduce storage requirements for a Druid datasource (table). The trade-off is sacrificing the ability to query individual rows and losing precise timestamp granularity. Refer to the official documentation on rollup for an in-depth overview.

Rollup is a pre-aggregation, and first and last are aggregation functions. Prior to Apache Druid 29, first and last pre-aggregation only supported string data types. Since version 29, first and last also support numerical values for pre-aggregation, leading to more use cases for rollups and enabling quick data granularity reduction.

Here are some use cases for the first and last aggregation functions:

  • Tracking opening and closing stock market prices.
  • Creating discrete sensor readings for reports, such as hourly temperature readings.
  • Creating session tracking activity for users accessing a website.

Defining a first and last Ingestion Rollup in Apache Druid

To create a first or last pre-aggregation, use the numerical data type with a First or Last suffix:

{
  "type": "doubleLast",
  "name": "latitude",
  "fieldName": "latitude"
}
{
  "type": "longFirst",
  "name": "speed",
  "fieldName": "speed"
}

Under the Hood

When pre-aggregations of first and last are computed, the timestamp of the aggregation is also stored. This ensures that when query time aggregation is performed, the correct value can be used. Query the column without the aggregations function to see, the resulting value will appear as JSON with an lhs element for the actual timestamp of the value and rhs for the actual value.

SELECT "latitude" FROM opensky LIMIT 1
{"lhs":1728344664000,"rhs":48.2982}

The actual datatype is a [COMPLEX<serializablePairLongDouble>]. Check out the package org.apache.druid.query.aggregation to fully appreciate this feature. While it is displayed as JSON, it is not actually JSON (JSON functions, such as json_query(), do not operate on them).

Prior to Apache Druid 29, only string data types were handled this way; numerical values preserved only the value (the rhs), resulting in scenarios where the actual first or last aggregations were ambiguous.

Use Case: Sub-sampling Historical Data

For geospatial data, real-time information on all data points is vital for current events, so rollups may seem unsuitable. However, over time, keeping all those data points becomes less important. Using Druid rollups with the first or last aggregate provides an elegant way to sub-sample historical data, reducing storage costs.

At ingestion time, set the queryGranularity to none, while also enabling rollups.


  "metricsSpec": [
    {
    "type": "doubleLast",
    "name": "latitude",
    "fieldName": "latitude"
    },
    {
    "type": "doubleLast",
    "name": "longitude",
    "fieldName": "longitude"
    }
    ...      
  ],      
  "granularitySpec": {
    "type": "uniform",
    "segmentGranularity": "fifteen_minute",
    "queryGranularity": "none",
    "rollup": true,
    "intervals": null
  }

To track aircraft activity, simply query the dataset as though it is an aggregation. For data that has not been rolled up, it acts as if it was queried without aggregation.

SELECT
       __time,
       id,
       callsign,
       latest("latitude") "latitude",
       latest("longitude") "longitude",
       latest("velocity") "velocity"
  FROM opensky
 GROUP BY 1, 2, 3
 ORDER BY __time ASC

The benefit is realized when the need for all data points diminishes, leveraging Druid compaction to quickly purge data.

Compaction

Compaction is performed by running a compaction task. The primary components are the datasource, time interval, and granularity specification. In this example, we increase segment granularity from 15 minutes to 1 hour and, more importantly, set the query granularity to 15 minutes.

{
  "type" : "compact",
  "dataSource" : "opensky",
  "interval": "2024-10-09T00:00:00.000Z/2024-10-08T02:00:00.000Z",
  "tuningConfig" : {
    "type" : "index_parallel",
    "maxRowsPerSegment" : 500000,
    "maxRowsInMemory" : 25000
  },
  "granularitySpec": {
    "type": "uniform",
    "segmentGranularity": "hour",
    "queryGranularity": "fifteen_minute",
    "rollup": true
  }
}

With this single administration task (no coding necessary), data is reduced once the compaction is completed. Due to the last aggregation being performed on all metrics (latitude, longitude, position, velocity, altitude, etc.), the last data point for a given aircraft is the one that remains in this datasource.

Using the same query pulls in the aggregated data for the completed historical data, with no query changes necessary.

Storage Savings (How Effective Are the Rollups)

It’s always good to verify how well rollups work.

SELECT 
       'opensky'                         "Datasource",
       SUM("count")                      "Logical Count",
       COUNT("count")                    "Physical Count",
       SUM("count")/(COUNT("count")*1.0) "Rollup Factor"
  FROM opensky
 GROUP BY 1

Before Compaction

Before compaction, exact matches can lead to rollups when source systems provide duplicate events, such as opensky.

DatasourceLogical CountPhysical CountRollup Factor
opensky758303267118921.1298

After Compaction

With timestamps rounded to the query granularity and matching dimensions, a rollup occurs.

DatasourceLogical CountPhysical CountRollup Factor
opensky758303235955121.0903

Query

The query is written as an aggregation, and if the result of the query handles the compaction. If the rollup is perfect, then no additional aggregation is done, other than the single aggregated row. However, if the rollup is imperfect, that aggregation is then completed by this query.

SELECT
       __time,
       id,
       callsign,
       latest("latitude") "latitude",
       latest("longitude") "longitude",
       latest("velocity") "velocity",
       sum("count") "count"
  FROM opensky
 GROUP BY 1, 2, 3
 ORDER BY __time ASC
 LIMIT 5

The count metric tracks how many records were “rolled up”.

__timeidcallsignlatitudelongitudevelocitycount
2024-10-09T12:35:00.000Z151e5bPBD52656.689643.767727.86
2024-10-09T12:35:00.000Z300990LSI10244.614333.9275396.7891
2024-10-09T12:35:00.000Z800732AIC12151.74249.6643244.946
2024-10-09T12:40:00.000Z151de3SDM685757.598238.410328.0367
2024-10-09T12:40:00.000Z800736AIC11351.591250.1971244.9419

Geospatial Data Types

Geospatial data types (geo and spatial) are not supported, which is why this example uses latitude and longitude as double metrics.

Takeaways

  • Check out the druid-rollup-doubleLast within the Github dev-local-demos repository for an example used in this article. It leverages a container-based ecosystem provided at dev-local. This repository also includes demonstrations like druid-late.

  • If you are running on Mac Silicon, check out the druid-m1 project for building an arm64-based image. The project has been updated to use version 30.0.1 of Apache Druid.

  • With first and last support for numerical data, there are more opportunities to leverage rollups within Druid, as illustrated in this article. The inclusion of the timestamp in the internal complex data type ensures the last value is truly the last value (and the first value is the first value).

  • Rollups are a compromise of precision for storage, and the inclusion of a given dimension or metric can dramatically reduce rollup effectiveness. Verify the effectiveness of your rollup after any datasource configuration updates.

  • Always measure rollup effectiveness and recheck whenever a datasource schema is modified and when data changes.

Reach Out

Please contact us if you would like to discuss online analytic processing or event-streaming.