Merge real-time segments

How to merge small segments in real-time tables

To improve disk storage and query performance, we recommend merging segments in real-time Pinot tables using the Minion merge rollup task (opens in a new tab).

To learn how to merge segments in real-time Pinot tables, watch the following video, or complete the tutorial below, starting with Prerequites.

If your use case supports aggregating data, you may also want to rollup segments in real-time tables. For more information, check out the following video.

💡

You can also merge segments in offline tables. For more information see the merge small segments in offline tables guide

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/merge-small-segments-realtime

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Controller configuration

The Pinot controller is launched with the following custom configuration:

controller.access.protocols.http.port=9000
controller.zk.str=zookeeper-mergesegments:2181
controller.helix.cluster.name=PinotCluster
controller.host=pinot-controller-mergesegments
controller.port=9000
controller.data.dir=/data
 
controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=5m

controller-conf.conf

This configuration is needed to enabled Pinot's task scheduler, which we'll use to automatically merge segments.

🚫

We've configured the task scheduler to run every 5 minutes, but you'd set that to an hour or more in a production system.

Dataset

We've got the following data generator that generates data in bursts depending on the minute of the hour:

import datetime as dt
import uuid
import random
import json
import time
 
while True:
    now = dt.datetime.now()
    ts = int(now.timestamp() * 1000)
    id = str(uuid.uuid4())
    count = random.randint(0, 1000)
    print(
        json.dumps({"ts": ts, "uuid": id, "count": count})
    )
 
    sleep_time = 0.01 if now.minute % 2 == 0 else 0.00005
    time.sleep(sleep_time)

datagen.py

If the minute is divisible by 2, we generate around 20,000 messages per second and if it isn't we generate around 100 messages per second.

An example of a message produced by this script is shown below:

{
  "ts": 1680260357275,
  "uuid": "0d4c5cc7-cdc7-42d0-924d-27babc18ad94",
  "count": 126
}

We can write that data into Kafka by running the following command:

python datagen.py  2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

config/schema.json

We'll also have the following table config:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "kafka-mergesegments:9093",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "realtime.segment.flush.threshold.rows":"1000000",
      "realtime.segment.flush.threshold.time":"1m"
    }
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    },
    "transformConfigs": []
  },
  "tenants": {},
  "metadata": {},
  
  "task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "5m_2m.mergeType": "concat",
        "5m_2m.bucketTimePeriod": "5m",
        "5m_2m.bufferTimePeriod": "2m"
      }
    }
  }
  
}

config/table.json

There are a few bits of config that we're interested in.

First up is segment threshold, which is defined like so:

"realtime.segment.flush.threshold.rows":"1000000",
"realtime.segment.flush.threshold.time":"1m"

This means that a segment will be committed once it contains 1m records or every minute, whichever comes first. For more on configuring the segment threshold, see the segment threshold guide.

Next is the batchIngestionConfig:

"batchIngestionConfig": {
  "segmentIngestionType": "APPEND",
  "segmentIngestionFrequency": "DAILY"
}
⚠️

This shouldn't be strictly necessary because it's a config that's usually used for offline tables, but the merge/roll-up logic in the current version (0.12.1) relies on it being there. The reliance on this config existing has already been fixed (opens in a new tab) in the main branch and therefore this config won't be required when 0.13.0 is released.

We're also interested in the MergeRollupTask, which is extracted below:

"task": {
  "taskTypeConfigsMap": {
    "MergeRollupTask": {
      "5m_2m.mergeType": "concat",
      "5m_2m.bucketTimePeriod": "5m",
      "5m_2m.bufferTimePeriod": "2m"
    }
  }
}

This configuration will bucket records from the same 5 minute period and will only process records with a timestamp from more than 2 minutes ago.

🚫

We are intentionally using very small values for the bucketTimePeriod and bufferTimePeriod for the purposes of this example. You'll want to use larger values for production systems.

You can create the table and schema by running the following command:

docker run \
   --network mergesegments \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-mergesegments" \
    -exec

Remove the -arm64 suffix if you're not using a Mac M1/M2.

Viewing segments

We can navigate to the Pinot UI (opens in a new tab) and run the following query to see the segments that have been created and the number of records that they contain:

select $segmentName, count(*), 
       ToDateTime(min(ts), 'YYYY-MM-dd HH:mm:ss') AS minDate, 
       ToDateTime(max(ts), 'YYYY-MM-dd HH:mm:ss') AS maxDate
from events 
group by $segmentName
order by max(ts) DESC
limit 50

We'll see something like the following output:

segmentNamecount(*)minDatemaxDate
events__0__6__20230331T1122Z151372023-03-31 11:21:582023-03-31 11:22:14
events__0__5__20230331T1120Z7786232023-03-31 11:20:542023-03-31 11:21:58
events__0__4__20230331T1119Z1108682023-03-31 11:19:512023-03-31 11:20:54
events__0__3__20230331T1118Z6919932023-03-31 11:18:462023-03-31 11:19:51
events__0__2__20230331T1117Z1820762023-03-31 11:17:462023-03-31 11:18:46
events__0__1__20230331T1116Z6029902023-03-31 11:16:402023-03-31 11:17:46
events__0__0__20230331T1115Z1182982023-03-31 11:15:512023-03-31 11:16:40

Query Results

Merge segments

The job to merge segments runs every 5 minutes, so if we wait a little while it will eventually start.

💡

If you want to manually trigger the merge segments job, see the merge segments section of the merge small segments guide.

We can check the Pinot Controller logs to see that it's been triggered:

docker exec -it pinot-controller-mergesegments \
  grep -ri --color "\[MergeRollupTaskGenerator" logs/pinot-all.log

Output

2023/03/31 11:19:15.651 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Start generating task configs for table: events_REALTIME for task: MergeRollupTask
2023/03/31 11:19:15.658 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Creating the gauge metric for tracking the merge/roll-up task delay for table: events_REALTIME and mergeLevel: 5m_2m.(watermarkMs=1680261300000, bufferTimeMs=120000, bucketTimeMs=300000, taskDelayInNumTimeBuckets=0)
2023/03/31 11:19:15.658 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Bucket with start: 1680261300000 and end: 1680261600000 (table : events_REALTIME, mergeLevel : 5m_2m) cannot be merged yet
2023/03/31 11:19:15.663 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Finished generating task configs for table: events_REALTIME for task: MergeRollupTask, numTasks: 0
2023/03/31 11:24:15.672 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Start generating task configs for table: events_REALTIME for task: MergeRollupTask
2023/03/31 11:24:15.682 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Update watermark for table: events_REALTIME, mergeLevel: 5m_2m from: 1680261300000 to: 1680261300000
2023/03/31 11:24:15.696 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Finished generating task configs for table: events_REALTIME for task: MergeRollupTask, numTasks: 1

And we can check the Pinot Minion logs to see if the job has run:

docker exec -it pinot-minion-mergesegments \
  grep -ri --color "INFO \[MergeRollup" logs/pinot-all.log

Output

2023/03/31 11:24:16.718 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Starting task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=5m, segmentNamePrefix=merged_5m_2m_1680261855683_0_events, authToken=null, downloadURL=http://pinot-controller-mergesegments:9000/segments/events/events__0__0__20230331T1115Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__1__20230331T1116Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__2__20230331T1117Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__3__20230331T1118Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__4__20230331T1119Z, mergeType=concat, mergeLevel=5m_2m, 5m_2m.bufferTimePeriod=2m, 5m_2m.bucketTimePeriod=5m, push.mode=TAR, segmentName=events__0__0__20230331T1115Z,events__0__1__20230331T1116Z,events__0__2__20230331T1117Z,events__0__3__20230331T1118Z,events__0__4__20230331T1119Z, tableName=events_REALTIME, uploadURL=http://pinot-controller-mergesegments:9000/segments, enableReplaceSegments=true, push.controllerUri=http://pinot-controller-mergesegments:9000, roundBucketTimePeriod=null, 5m_2m.mergeType=concat, TASK_ID=Task_MergeRollupTask_cd467be0-472d-44a9-99d3-6e6fcdf69d54_1680261855713_0, overwriteOutput=false}
2023/03/31 11:24:22.497 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Finished task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=5m, segmentNamePrefix=merged_5m_2m_1680261855683_0_events, authToken=null, downloadURL=http://pinot-controller-mergesegments:9000/segments/events/events__0__0__20230331T1115Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__1__20230331T1116Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__2__20230331T1117Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__3__20230331T1118Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__4__20230331T1119Z, mergeType=concat, mergeLevel=5m_2m, 5m_2m.bufferTimePeriod=2m, 5m_2m.bucketTimePeriod=5m, push.mode=TAR, segmentName=events__0__0__20230331T1115Z,events__0__1__20230331T1116Z,events__0__2__20230331T1117Z,events__0__3__20230331T1118Z,events__0__4__20230331T1119Z, tableName=events_REALTIME, uploadURL=http://pinot-controller-mergesegments:9000/segments, enableReplaceSegments=true, push.controllerUri=http://pinot-controller-mergesegments:9000, roundBucketTimePeriod=null, 5m_2m.mergeType=concat, TASK_ID=Task_MergeRollupTask_cd467be0-472d-44a9-99d3-6e6fcdf69d54_1680261855713_0, overwriteOutput=false}. Total time: 5779ms

Let's now run the segments query again:

segmentNamecount(*)minDatemaxDate
events__0__8__20230331T1124Z1098902023-03-31 11:23:592023-03-31 11:25:08
events__0__7__20230331T1123Z7435722023-03-31 11:23:042023-03-31 11:23:59
events__0__6__20230331T1122Z727452023-03-31 11:21:582023-03-31 11:23:04
events__0__5__20230331T1120Z7786232023-03-31 11:20:542023-03-31 11:21:58
merged_5m_2m_1680261855683_0
_events_1680261600000_1680261654662_1
45672023-03-31 11:20:002023-03-31 11:20:54
merged_5m_2m_1680261855683_0
_events_1680261351086_1680261599999_0
17016582023-03-31 11:15:512023-03-31 11:19:59

We can see that the merge job has rolled up segments 0-4 into two larger segments, containing data from 5 minute windows.

If we wait a few more minutes, the next time it runs it will roll up the rest of the data for the 11:20 to 11:25 window, as well as some of the next window. We can see the output of the next few runs below:

segmentNamecount(*)minDatemaxDate
events__0__18__20230331T1134Z1240442023-03-31 11:34:342023-03-31 11:35:09
events__0__17__20230331T1133Z3439662023-03-31 11:33:332023-03-31 11:34:34
events__0__16__20230331T1132Z4434832023-03-31 11:32:262023-03-31 11:33:33
events__0__15__20230331T1131Z4391372023-03-31 11:31:262023-03-31 11:32:26
events__0__14__20230331T1130Z3553192023-03-31 11:30:192023-03-31 11:31:26
merged_5m_2m_1680262455984_0
_events_1680262200000_1680262219170_1
15912023-03-31 11:30:002023-03-31 11:30:19
merged_5m_2m_1680262455984_0
_events_1680261900010_1680262199999_0
23686192023-03-31 11:25:002023-03-31 11:29:59
merged_5m_2m_1680262155835_0
_events_1680261600000_1680261899994_0
16045252023-03-31 11:20:002023-03-31 11:24:59
merged_5m_2m_1680261855683_0
_events_1680261351086_1680261599999_0
17016582023-03-31 11:15:512023-03-31 11:19:59