How to merge small segments in real-time tables
To improve disk storage and query performance, we recommend merging segments in real-time Pinot tables using the Minion merge rollup task (opens in a new tab).
To learn how to merge segments in real-time Pinot tables, watch the following video, or complete the tutorial below, starting with Prerequites.
If your use case supports aggregating data, you may also want to rollup segments in real-time tables. For more information, check out the following video.
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/merge-small-segments-realtime |
You can also merge segments in offline tables. For more information see the merge small segments in offline tables guide
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/merge-small-segments-realtime
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Controller configuration
The Pinot controller is launched with the following custom configuration:
controller.access.protocols.http.port=9000
controller.zk.str=zookeeper-mergesegments:2181
controller.helix.cluster.name=PinotCluster
controller.host=pinot-controller-mergesegments
controller.port=9000
controller.data.dir=/data
controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=5m
controller-conf.conf
This configuration is needed to enabled Pinot's task scheduler, which we'll use to automatically merge segments.
We've configured the task scheduler to run every 5 minutes, but you'd set that to an hour or more in a production system.
Dataset
We've got the following data generator that generates data in bursts depending on the minute of the hour:
import datetime as dt
import uuid
import random
import json
import time
while True:
now = dt.datetime.now()
ts = int(now.timestamp() * 1000)
id = str(uuid.uuid4())
count = random.randint(0, 1000)
print(
json.dumps({"ts": ts, "uuid": id, "count": count})
)
sleep_time = 0.01 if now.minute % 2 == 0 else 0.00005
time.sleep(sleep_time)
datagen.py
If the minute is divisible by 2, we generate around 20,000 messages per second and if it isn't we generate around 100 messages per second.
An example of a message produced by this script is shown below:
{
"ts": 1680260357275,
"uuid": "0d4c5cc7-cdc7-42d0-924d-27babc18ad94",
"count": 126
}
We can write that data into Kafka by running the following command:
python datagen.py 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "events",
"dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
"metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
config/schema.json
We'll also have the following table config:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-mergesegments:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows":"1000000",
"realtime.segment.flush.threshold.time":"1m"
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": []
},
"tenants": {},
"metadata": {},
"task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"5m_2m.mergeType": "concat",
"5m_2m.bucketTimePeriod": "5m",
"5m_2m.bufferTimePeriod": "2m"
}
}
}
}
config/table.json
There are a few bits of config that we're interested in.
First up is segment threshold, which is defined like so:
"realtime.segment.flush.threshold.rows":"1000000",
"realtime.segment.flush.threshold.time":"1m"
This means that a segment will be committed once it contains 1m records or every minute, whichever comes first. For more on configuring the segment threshold, see the segment threshold guide.
Next is the batchIngestionConfig
:
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
This shouldn't be strictly necessary because it's a config that's usually used for offline tables, but the merge/roll-up logic in the current version (0.12.1) relies on it being there. The reliance on this config existing has already been fixed (opens in a new tab) in the main branch and therefore this config won't be required when 0.13.0 is released.
We're also interested in the MergeRollupTask
, which is extracted below:
"task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"5m_2m.mergeType": "concat",
"5m_2m.bucketTimePeriod": "5m",
"5m_2m.bufferTimePeriod": "2m"
}
}
}
This configuration will bucket records from the same 5 minute period and will only process records with a timestamp from more than 2 minutes ago.
We are intentionally using very small values for the bucketTimePeriod
and bufferTimePeriod
for the purposes of this example.
You'll want to use larger values for production systems.
You can create the table and schema by running the following command:
docker run \
--network mergesegments \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-mergesegments" \
-exec
Remove the -arm64
suffix if you're not using a Mac M1/M2.
Viewing segments
We can navigate to the Pinot UI (opens in a new tab) and run the following query to see the segments that have been created and the number of records that they contain:
select $segmentName, count(*),
ToDateTime(min(ts), 'YYYY-MM-dd HH:mm:ss') AS minDate,
ToDateTime(max(ts), 'YYYY-MM-dd HH:mm:ss') AS maxDate
from events
group by $segmentName
order by max(ts) DESC
limit 50
We'll see something like the following output:
segmentName | count(*) | minDate | maxDate |
---|---|---|---|
events__0__6__20230331T1122Z | 15137 | 2023-03-31 11:21:58 | 2023-03-31 11:22:14 |
events__0__5__20230331T1120Z | 778623 | 2023-03-31 11:20:54 | 2023-03-31 11:21:58 |
events__0__4__20230331T1119Z | 110868 | 2023-03-31 11:19:51 | 2023-03-31 11:20:54 |
events__0__3__20230331T1118Z | 691993 | 2023-03-31 11:18:46 | 2023-03-31 11:19:51 |
events__0__2__20230331T1117Z | 182076 | 2023-03-31 11:17:46 | 2023-03-31 11:18:46 |
events__0__1__20230331T1116Z | 602990 | 2023-03-31 11:16:40 | 2023-03-31 11:17:46 |
events__0__0__20230331T1115Z | 118298 | 2023-03-31 11:15:51 | 2023-03-31 11:16:40 |
Query Results
Merge segments
The job to merge segments runs every 5 minutes, so if we wait a little while it will eventually start.
If you want to manually trigger the merge segments job, see the merge segments section of the merge small segments guide.
We can check the Pinot Controller logs to see that it's been triggered:
docker exec -it pinot-controller-mergesegments \
grep -ri --color "\[MergeRollupTaskGenerator" logs/pinot-all.log
Output
2023/03/31 11:19:15.651 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Start generating task configs for table: events_REALTIME for task: MergeRollupTask
2023/03/31 11:19:15.658 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Creating the gauge metric for tracking the merge/roll-up task delay for table: events_REALTIME and mergeLevel: 5m_2m.(watermarkMs=1680261300000, bufferTimeMs=120000, bucketTimeMs=300000, taskDelayInNumTimeBuckets=0)
2023/03/31 11:19:15.658 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Bucket with start: 1680261300000 and end: 1680261600000 (table : events_REALTIME, mergeLevel : 5m_2m) cannot be merged yet
2023/03/31 11:19:15.663 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Finished generating task configs for table: events_REALTIME for task: MergeRollupTask, numTasks: 0
2023/03/31 11:24:15.672 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Start generating task configs for table: events_REALTIME for task: MergeRollupTask
2023/03/31 11:24:15.682 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Update watermark for table: events_REALTIME, mergeLevel: 5m_2m from: 1680261300000 to: 1680261300000
2023/03/31 11:24:15.696 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Finished generating task configs for table: events_REALTIME for task: MergeRollupTask, numTasks: 1
And we can check the Pinot Minion logs to see if the job has run:
docker exec -it pinot-minion-mergesegments \
grep -ri --color "INFO \[MergeRollup" logs/pinot-all.log
Output
2023/03/31 11:24:16.718 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Starting task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=5m, segmentNamePrefix=merged_5m_2m_1680261855683_0_events, authToken=null, downloadURL=http://pinot-controller-mergesegments:9000/segments/events/events__0__0__20230331T1115Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__1__20230331T1116Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__2__20230331T1117Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__3__20230331T1118Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__4__20230331T1119Z, mergeType=concat, mergeLevel=5m_2m, 5m_2m.bufferTimePeriod=2m, 5m_2m.bucketTimePeriod=5m, push.mode=TAR, segmentName=events__0__0__20230331T1115Z,events__0__1__20230331T1116Z,events__0__2__20230331T1117Z,events__0__3__20230331T1118Z,events__0__4__20230331T1119Z, tableName=events_REALTIME, uploadURL=http://pinot-controller-mergesegments:9000/segments, enableReplaceSegments=true, push.controllerUri=http://pinot-controller-mergesegments:9000, roundBucketTimePeriod=null, 5m_2m.mergeType=concat, TASK_ID=Task_MergeRollupTask_cd467be0-472d-44a9-99d3-6e6fcdf69d54_1680261855713_0, overwriteOutput=false}
2023/03/31 11:24:22.497 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Finished task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=5m, segmentNamePrefix=merged_5m_2m_1680261855683_0_events, authToken=null, downloadURL=http://pinot-controller-mergesegments:9000/segments/events/events__0__0__20230331T1115Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__1__20230331T1116Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__2__20230331T1117Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__3__20230331T1118Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__4__20230331T1119Z, mergeType=concat, mergeLevel=5m_2m, 5m_2m.bufferTimePeriod=2m, 5m_2m.bucketTimePeriod=5m, push.mode=TAR, segmentName=events__0__0__20230331T1115Z,events__0__1__20230331T1116Z,events__0__2__20230331T1117Z,events__0__3__20230331T1118Z,events__0__4__20230331T1119Z, tableName=events_REALTIME, uploadURL=http://pinot-controller-mergesegments:9000/segments, enableReplaceSegments=true, push.controllerUri=http://pinot-controller-mergesegments:9000, roundBucketTimePeriod=null, 5m_2m.mergeType=concat, TASK_ID=Task_MergeRollupTask_cd467be0-472d-44a9-99d3-6e6fcdf69d54_1680261855713_0, overwriteOutput=false}. Total time: 5779ms
Let's now run the segments query again:
segmentName | count(*) | minDate | maxDate |
---|---|---|---|
events__0__8__20230331T1124Z | 109890 | 2023-03-31 11:23:59 | 2023-03-31 11:25:08 |
events__0__7__20230331T1123Z | 743572 | 2023-03-31 11:23:04 | 2023-03-31 11:23:59 |
events__0__6__20230331T1122Z | 72745 | 2023-03-31 11:21:58 | 2023-03-31 11:23:04 |
events__0__5__20230331T1120Z | 778623 | 2023-03-31 11:20:54 | 2023-03-31 11:21:58 |
merged_5m_2m_1680261855683_0 _events_1680261600000_1680261654662_1 | 4567 | 2023-03-31 11:20:00 | 2023-03-31 11:20:54 |
merged_5m_2m_1680261855683_0 _events_1680261351086_1680261599999_0 | 1701658 | 2023-03-31 11:15:51 | 2023-03-31 11:19:59 |
We can see that the merge job has rolled up segments 0-4 into two larger segments, containing data from 5 minute windows.
If we wait a few more minutes, the next time it runs it will roll up the rest of the data for the 11:20 to 11:25 window, as well as some of the next window. We can see the output of the next few runs below:
segmentName | count(*) | minDate | maxDate |
---|---|---|---|
events__0__18__20230331T1134Z | 124044 | 2023-03-31 11:34:34 | 2023-03-31 11:35:09 |
events__0__17__20230331T1133Z | 343966 | 2023-03-31 11:33:33 | 2023-03-31 11:34:34 |
events__0__16__20230331T1132Z | 443483 | 2023-03-31 11:32:26 | 2023-03-31 11:33:33 |
events__0__15__20230331T1131Z | 439137 | 2023-03-31 11:31:26 | 2023-03-31 11:32:26 |
events__0__14__20230331T1130Z | 355319 | 2023-03-31 11:30:19 | 2023-03-31 11:31:26 |
merged_5m_2m_1680262455984_0 _events_1680262200000_1680262219170_1 | 1591 | 2023-03-31 11:30:00 | 2023-03-31 11:30:19 |
merged_5m_2m_1680262455984_0 _events_1680261900010_1680262199999_0 | 2368619 | 2023-03-31 11:25:00 | 2023-03-31 11:29:59 |
merged_5m_2m_1680262155835_0 _events_1680261600000_1680261899994_0 | 1604525 | 2023-03-31 11:20:00 | 2023-03-31 11:24:59 |
merged_5m_2m_1680261855683_0 _events_1680261351086_1680261599999_0 | 1701658 | 2023-03-31 11:15:51 | 2023-03-31 11:19:59 |