How to merge small segments in offline tables
In this recipe we'll learn how to merge small segments into larger ones. By doing this, Pinot can benefit from disk storage and query performance. This is done using the Minion merge rollup task (opens in a new tab).
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/merge-small-segments |
You can only merge segments in offline tables that have a time column.
You can also merge segments in real-time tables. For more information see the merge segments in real-time tables guide
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/merge-small-segments
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Dataset
We're going to import a couple of CSVs that contain results from the Australian Open tennis tournament that was held in January 2022. The contents of the files are shown below:
round,winner,loser,score,matchTime
"R128","Miomir Kecmanovic","Salvatore Caruso","6-4 6-2 6-1","2022-01-17 11:00:00"
"R128","Tommy Paul","Mikhail Kukushkin","6-3 6-4 6-2","2022-01-17 11:10:00"
"R128","Oscar Otte","Chun Hsin Tseng","6-4 6-3 6-2","2022-01-17 12:00:00"
"R128","Lorenzo Sonego","Sam Querrey","7-5 6-3 6-3","2022-01-17 13:14:00"
"R128","Gael Monfils","Federico Coria","6-1 6-1 6-3","2022-01-17 15:32:00"
input/matches0.csv
round,winner,loser,score,matchTime
"R128","Sebastian Korda","Cameron Norrie","6-3 6-0 6-4","2022-01-17 11:10:00"
"R128","Corentin Moutet","Lucas Pouille","3-6 6-3 6-4 6-3","2022-01-17 11:03:00"
"R128","Tallon Griekspoor","Fabio Fognini","6-1 6-4 6-4","2022-01-17 13:08:00"
"R128","Pablo Carreno Busta","Tomas Martin Etcheverry","6-1 6-2 7-6(2)","2022-01-17 18:14:00"
"R128","Carlos Alcaraz","Alejandro Tabilo","6-2 6-2 6-3","2022-01-17 16:51:00"
input/matches1.csv
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "matches",
"dimensionFieldSpecs": [{
"name": "round",
"dataType": "STRING"
},
{
"name": "winner",
"dataType": "STRING"
},
{
"name": "loser",
"dataType": "STRING"
},
{
"name": "score",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [{
"name": "matchTime",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
config/schema.json
We'll also have the following table config:
{
"tableName": "matches",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "matchTime",
"schemaName": "matches",
"replication": "1",
"replicasPerPartition": "1"
},
"task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"1day.mergeType": "concat",
"1day.bucketTimePeriod": "1d",
"1day.bufferTimePeriod": "5m"
}
}
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": []
},
"metadata": {}
}
config/table.json
Our table must specify segmentsConfig.timeColumnName
, otherwise the merge process won't merge any segments.
The main thing that we're interested in is the MergeRollupTask
, which is extracted below:
"task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"1day.mergeType": "concat",
"1day.bucketTimePeriod": "1d",
"1day.bufferTimePeriod": "5m"
}
}
}
This configuration will bucket records from the same 1 day period into the same segment. It will only process records with a timestamp from more than 5 minutes ago.
You can create the table and schema by running the following command:`
docker run \
--network mergesegments \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-mergesegments" \
-exec
Remove the -arm64
suffix if you're not using a Mac M1/M2.
Import Data
Now let's import those CSV files into Pinot, using the following ingestion spec:
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/input'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/data'
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'matches'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
pushJobSpec:
pushAttempts: 2
pushRetryIntervalMillis: 1000
config/job-spec.yml
You can run the following command to run the import:
docker run \
--network mergesegments \
-v $PWD/config:/config \
-v $PWD/data:/data \
-v $PWD/input:/input \
apachepinot/pinot:1.0.0 LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml \
-values pinotController=http://pinot-controller-mergesegments:9000
Once this job has run, we can list the created segments by running the following command:
curl -X GET "http://localhost:9000/segments/matches?type=OFFLINE" \
-H "accept: application/json" 2>/dev/null | jq '.'
Output
[
{
"OFFLINE": [
"matches_OFFLINE_1642417200000_1642433520000_0",
"matches_OFFLINE_1642417380000_1642443240000_1"
]
}
]
Let's wrap this command in a function so that we can use it again later:
show_segments () {
table=${1:="matches"}
table_type=${2:="OFFLINE"}
curl -X GET "http://localhost:9000/segments/${table}?type=${table_type}" \
-H "accept: application/json" 2>/dev/null | jq '.'
}
We could then call the function like this:
show_segments
We can check the contents of these segments by writing the following function:
segments_breakdown () {
table=${1:="matches"}
table_type=${2:="OFFLINE"}
segments=$(curl -X GET "http://localhost:9000/segments/${table}?type=${table_type}" 2>/dev/null | jq -r '.[] []')
for segment in $(echo $segments | jq '.[]'); do
metadata=`curl -X GET "http://localhost:9000/segments/${table}/$(echo ${segment} | jq -r)/metadata" \
-H "accept: application/json" 2>/dev/null | jq '.'`
docs=`echo $metadata | jq '."segment.total.docs" | tonumber'`
startTime=`echo $metadata | jq '."segment.start.time" | tonumber'`
endTime=`echo $metadata | jq '."segment.end.time" | tonumber'`
echo "$segment,$docs,$startTime,$endTime"
done
}
We can call it like this:
segments_breakdown
Output
matches_OFFLINE_1642417200000_1642433520000_0,5,1642417200000,1642433520000
matches_OFFLINE_1642417380000_1642443240000_1,5,1642417380000,1642443240000
Merge segments
Now we're going to merge these segments using the Minion merge rollup task (opens in a new tab).
The configuration that we defined in the matches
table is going to bucket records from a 1 day period into the same bucket.
Since our events all happened on the same day, we would expect that all records will be merge into a single segment.
We can run the merge rollup task by running the following:
tableName="matches_OFFLINE"
curl -X POST "http://localhost:9000/tasks/schedule?taskType=MergeRollupTask&tableName=${tableName}" \
-H "accept: application/json" 2>/dev/null | jq '.'
Output
{
"MergeRollupTask": "Task_MergeRollupTask_1646909665695"
}
We can then check the Pinot Controller logs to see that it's been triggered:
docker exec -it pinot-controller-csv grep -ri --color "\[MergeRollup" logs/pinot-all.log
Output
2022/03/10 10:51:37.383 INFO [TaskGeneratorRegistry] [main] Initialized TaskGeneratorRegistry with 4 task generators: [MergeRollupTask, RealtimeToOfflineSegmentsTask, ConvertToRawIndexTask, SegmentGenerationAndPushTask] in 716ms
2022/03/10 10:54:25.628 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Start generating task configs for table: matches_OFFLINE for task: MergeRollupTask
2022/03/10 10:54:25.666 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Creating the gauge metric for tracking the merge/roll-up task delay for table: matches_OFFLINE and mergeLevel: 1day.(watermarkMs=1642377600000, bufferTimeMs=86400000, bucketTimeMs=86400000, taskDelayInNumTimeBuckets=0)
2022/03/10 10:54:25.667 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Update watermark for table: matches_OFFLINE, mergeLevel: 1day from: 1642377600000 to: 1642377600000
2022/03/10 10:54:25.695 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Finished generating task configs for table: matches_OFFLINE for task: MergeRollupTask, numTasks: 1
And we can check the Pinot Minion logs to see if the job has run:
docker exec -it pinot-minion-csv grep -ri --color "INFO \[MergeRollup" logs/pinot-all.log
Output
2022/03/10 10:54:37.768 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Starting task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=1d, segmentNamePrefix=merged_1day_1646909665668_0_matches, authToken=null, downloadURL=http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417200000_1642433520000_0,http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417380000_1642443240000_1, mergeType=concat, mergeLevel=1day, segmentName=matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1, tableName=matches_OFFLINE, uploadURL=http://192.168.96.3:9000/segments, enableReplaceSegments=true, roundBucketTimePeriod=null}
2022/03/10 10:54:38.149 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Finished task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=1d, segmentNamePrefix=merged_1day_1646909665668_0_matches, authToken=null, downloadURL=http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417200000_1642433520000_0,http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417380000_1642443240000_1, mergeType=concat, mergeLevel=1day, segmentName=matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1, tableName=matches_OFFLINE, uploadURL=http://192.168.96.3:9000/segments, enableReplaceSegments=true, roundBucketTimePeriod=null}. Total time: 381ms
Let's now check the list of segments again:
show_segments
Output
[
{
"OFFLINE": [
"matches_OFFLINE_1642417200000_1642433520000_0",
"matches_OFFLINE_1642417380000_1642443240000_1",
"merged_1day_1646909665668_0_matches_1642417200000_1642443240000_0"
]
}
]
We can see the new segment, but the initial segments are still there as well. The Pinot broker knows to use the new segment when it processes queries, so this isn't a problem. We can confirm this by running the following query on the Pinot UI (opens in a new tab):
select $segmentName, *
from matches
$segmentName | loser | matchTime | round | score | winner |
---|---|---|---|---|---|
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Salvatore Caruso | 2022-01-17 11:00:00.0 | R128 | 6-4 6-2 6-1 | Miomir Kecmanovic |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Mikhail Kukushkin | 2022-01-17 11:10:00.0 | R128 | 6-3 6-4 6-2 | Tommy Paul |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Chun Hsin Tseng | 2022-01-17 12:00:00.0 | R128 | 6-4 6-3 6-2 | Oscar Otte |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Sam Querrey | 2022-01-17 13:14:00.0 | R128 | 7-5 6-3 6-3 | Lorenzo Sonego |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Federico Coria | 2022-01-17 15:32:00.0 | R128 | 6-1 6-1 6-3 | Gael Monfils |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Cameron Norrie | 2022-01-17 11:10:00.0 | R128 | 6-3 6-0 6-4 | Sebastian Korda |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Lucas Pouille | 2022-01-17 11:03:00.0 | R128 | 3-6 6-3 6-4 6-3 | Corentin Moutet |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Fabio Fognini | 2022-01-17 13:08:00.0 | R128 | 6-1 6-4 6-4 | Tallon Griekspoor |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Tomas Martin Etcheverry | 2022-01-17 18:14:00.0 | R128 | 6-1 6-2 7-6(2) | Pablo Carreno Busta |
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0 | Alejandro Tabilo | 2022-01-17 16:51:00.0 | R128 | 6-2 6-2 6-3 | Carlos Alcaraz |
Also, Pinot's retention manager will take care of removing the old segments the next time that it runs.
We'll see the following messages in the Pinot Controller's logs when the retention manager has run:
docker exec -it pinot-controller-csv grep -ri --color "\[SegmentDeletionManager\|\[RetentionManager" logs/pinot-all.log
Output
2022/03/14 13:11:18.006 INFO [RetentionManager] [main] Starting RetentionManager with runFrequencyInSeconds: 21600, deletedSegmentsRetentionInDays: 7
2022/03/14 13:15:08.024 INFO [RetentionManager] [pool-7-thread-4] Start managing retention for table: matches_OFFLINE
2022/03/14 13:15:08.032 WARN [RetentionManager] [pool-7-thread-4] Invalid retention time: null null for table: matches_OFFLINE, skip
2022/03/14 13:15:08.034 INFO [RetentionManager] [pool-7-thread-4] Start cleaning up segment lineage for table: matches_OFFLINE
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Finished cleaning up segment lineage for table: matches_OFFLINE, deleted segments: matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1 in 9ms
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Segment lineage metadata clean-up is successfully processed for table: matches_OFFLINE
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Removing aged (more than 7 days) deleted segments for all tables
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Moved segment matches_OFFLINE_1642417200000_1642433520000_0 from file:/data/matches/matches_OFFLINE_1642417200000_1642433520000_0 to file:/data/Deleted_Segments/matches/matches_OFFLINE_1642417200000_1642433520000_0
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Moved segment matches_OFFLINE_1642417380000_1642443240000_1 from file:/data/matches/matches_OFFLINE_1642417380000_1642443240000_1 to file:/data/Deleted_Segments/matches/matches_OFFLINE_1642417380000_1642443240000_1
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Deleted 2 segments from table matches_OFFLINE:[matches_OFFLINE_1642417200000_1642433520000_0, matches_OFFLINE_1642417380000_1642443240000_1]
We can then check the contents of that segment by running the following:
segments_breakdown
Output
merged_1day_1646909665668_0_matches_1642417200000_1642443240000_0,10,1642417200000,1642443240000
We now have a single segment that contains all the records from the two CSV files that we ingested.
Automatic scheduling
We can also automatically schedule the merge task by adding the following configuration to the Pinot Controller:
controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=3600
Controller configuration