Skip to main content

How to merge small segments in offline tables

In this recipe we'll learn how to merge small segments into larger ones. By doing this, Pinot can benefit from disk storage and query performance. This is done using the Minion merge rollup task.

caution

You can only merge segments in offline tables that have a time column.

tip

You can also merge segments in real-time tables. For more information see the merge segments in real-time tables guide

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/merge-small-segments

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml file on GitHub.

Dataset

We're going to import a couple of CSVs that contain results from the Australian Open tennis tournament that was held in January 2022. The contents of the files are shown below:

input/matches0.csv
round,winner,loser,score,matchTime
"R128","Miomir Kecmanovic","Salvatore Caruso","6-4 6-2 6-1","2022-01-17 11:00:00"
"R128","Tommy Paul","Mikhail Kukushkin","6-3 6-4 6-2","2022-01-17 11:10:00"
"R128","Oscar Otte","Chun Hsin Tseng","6-4 6-3 6-2","2022-01-17 12:00:00"
"R128","Lorenzo Sonego","Sam Querrey","7-5 6-3 6-3","2022-01-17 13:14:00"
"R128","Gael Monfils","Federico Coria","6-1 6-1 6-3","2022-01-17 15:32:00"
input/matches1.csv
round,winner,loser,score,matchTime
"R128","Sebastian Korda","Cameron Norrie","6-3 6-0 6-4","2022-01-17 11:10:00"
"R128","Corentin Moutet","Lucas Pouille","3-6 6-3 6-4 6-3","2022-01-17 11:03:00"
"R128","Tallon Griekspoor","Fabio Fognini","6-1 6-4 6-4","2022-01-17 13:08:00"
"R128","Pablo Carreno Busta","Tomas Martin Etcheverry","6-1 6-2 7-6(2)","2022-01-17 18:14:00"
"R128","Carlos Alcaraz","Alejandro Tabilo","6-2 6-2 6-3","2022-01-17 16:51:00"

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

config/schema.json
{
"schemaName": "matches",
"dimensionFieldSpecs": [{
"name": "round",
"dataType": "STRING"
},
{
"name": "winner",
"dataType": "STRING"
},
{
"name": "loser",
"dataType": "STRING"
},
{
"name": "score",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [{
"name": "matchTime",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

We'll also have the following table config:

config/table.json
{
"tableName": "matches",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "matchTime",
"schemaName": "matches",
"replication": "1",
"replicasPerPartition": "1"
},
"task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"1day.mergeType": "concat",
"1day.bucketTimePeriod": "1d",
"1day.bufferTimePeriod": "5m"
}
}
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": []
},
"metadata": {}
}
danger

Our table must specify segmentsConfig.timeColumnName, otherwise the merge process won't merge any segments.

The main thing that we're interested in is the MergeRollupTask, which is extracted below:

"task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"1day.mergeType": "concat",
"1day.bucketTimePeriod": "1d",
"1day.bufferTimePeriod": "5m"
}
}
}

This configuration will bucket records from the same 1 day period into the same segment. It will only process records with a timestamp from more than 5 minutes ago.

You can create the table and schema by running the following command:`

docker run \
--network mergesegments \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-mergesegments" \
-exec

Remove the -arm64 suffix if you're not using a Mac M1/M2.

Import Data

Now let's import those CSV files into Pinot, using the following ingestion spec:

config/job-spec.yml
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/input'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/data'
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'matches'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'

You can run the following command to run the import:

docker run \
--network mergesegments \
-v $PWD/config:/config \
-v $PWD/data:/data \
-v $PWD/input:/input \
apachepinot/pinot:0.12.0-arm64 LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml \
-values pinotController=http://pinot-controller-mergesegments:9000

Once this job has run, we can list the created segments by running the following command:

curl -X GET "http://localhost:9000/segments/matches?type=OFFLINE" \
-H "accept: application/json" 2>/dev/null | jq '.'
Output
[
{
"OFFLINE": [
"matches_OFFLINE_1642417200000_1642433520000_0",
"matches_OFFLINE_1642417380000_1642443240000_1"
]
}
]

Let's wrap this command in a function so that we can use it again later:

show_segments () {
table=${1:="matches"}
table_type=${2:="OFFLINE"}
curl -X GET "http://localhost:9000/segments/${table}?type=${table_type}" \
-H "accept: application/json" 2>/dev/null | jq '.'
}

We could then call the function like this:

show_segments

We can check the contents of these segments by writing the following function:

segments_breakdown () {
table=${1:="matches"}
table_type=${2:="OFFLINE"}
segments=$(curl -X GET "http://localhost:9000/segments/${table}?type=${table_type}" 2>/dev/null | jq -r '.[] []')

for segment in $(echo $segments | jq '.[]'); do
metadata=`curl -X GET "http://localhost:9000/segments/${table}/$(echo ${segment} | jq -r)/metadata" \
-H "accept: application/json" 2>/dev/null | jq '.'`
docs=`echo $metadata | jq '."segment.total.docs" | tonumber'`
startTime=`echo $metadata | jq '."segment.start.time" | tonumber'`
endTime=`echo $metadata | jq '."segment.end.time" | tonumber'`
echo "$segment,$docs,$startTime,$endTime"
done
}

We can call it like this:

segments_breakdown
Output
matches_OFFLINE_1642417200000_1642433520000_0,5,1642417200000,1642433520000
matches_OFFLINE_1642417380000_1642443240000_1,5,1642417380000,1642443240000

Merge segments

Now we're going to merge these segments using the Minion merge rollup task.

The configuration that we defined in the matches table is going to bucket records from a 1 day period into the same bucket. Since our events all happened on the same day, we would expect that all records will be merge into a single segment.

We can run the merge rollup task by running the following:

tableName="matches_OFFLINE"
curl -X POST "http://localhost:9000/tasks/schedule?taskType=MergeRollupTask&tableName=${tableName}" \
-H "accept: application/json" 2>/dev/null | jq '.'
Output
{
"MergeRollupTask": "Task_MergeRollupTask_1646909665695"
}

We can then check the Pinot Controller logs to see that it's been triggered:

docker exec -it pinot-controller-csv grep -ri --color "\[MergeRollup" logs/pinot-all.log
Output
2022/03/10 10:51:37.383 INFO [TaskGeneratorRegistry] [main] Initialized TaskGeneratorRegistry with 4 task generators: [MergeRollupTask, RealtimeToOfflineSegmentsTask, ConvertToRawIndexTask, SegmentGenerationAndPushTask] in 716ms
2022/03/10 10:54:25.628 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Start generating task configs for table: matches_OFFLINE for task: MergeRollupTask
2022/03/10 10:54:25.666 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Creating the gauge metric for tracking the merge/roll-up task delay for table: matches_OFFLINE and mergeLevel: 1day.(watermarkMs=1642377600000, bufferTimeMs=86400000, bucketTimeMs=86400000, taskDelayInNumTimeBuckets=0)
2022/03/10 10:54:25.667 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Update watermark for table: matches_OFFLINE, mergeLevel: 1day from: 1642377600000 to: 1642377600000
2022/03/10 10:54:25.695 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Finished generating task configs for table: matches_OFFLINE for task: MergeRollupTask, numTasks: 1

And we can check the Pinot Minion logs to see if the job has run:

docker exec -it pinot-minion-csv grep -ri --color "INFO \[MergeRollup" logs/pinot-all.log
Output
2022/03/10 10:54:37.768 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Starting task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=1d, segmentNamePrefix=merged_1day_1646909665668_0_matches, authToken=null, downloadURL=http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417200000_1642433520000_0,http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417380000_1642443240000_1, mergeType=concat, mergeLevel=1day, segmentName=matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1, tableName=matches_OFFLINE, uploadURL=http://192.168.96.3:9000/segments, enableReplaceSegments=true, roundBucketTimePeriod=null}
2022/03/10 10:54:38.149 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Finished task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=1d, segmentNamePrefix=merged_1day_1646909665668_0_matches, authToken=null, downloadURL=http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417200000_1642433520000_0,http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417380000_1642443240000_1, mergeType=concat, mergeLevel=1day, segmentName=matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1, tableName=matches_OFFLINE, uploadURL=http://192.168.96.3:9000/segments, enableReplaceSegments=true, roundBucketTimePeriod=null}. Total time: 381ms

Let's now check the list of segments again:

show_segments
Output
[
{
"OFFLINE": [
"matches_OFFLINE_1642417200000_1642433520000_0",
"matches_OFFLINE_1642417380000_1642443240000_1",
"merged_1day_1646909665668_0_matches_1642417200000_1642443240000_0"
]
}
]

We can see the new segment, but the initial segments are still there as well. The Pinot broker knows to use the new segment when it processes queries, so this isn't a problem. We can confirm this by running the following query on the Pinot UI:

select $segmentName, * 
from matches
$segmentNamelosermatchTimeroundscorewinner
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Salvatore Caruso2022-01-17 11:00:00.0R1286-4 6-2 6-1Miomir Kecmanovic
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Mikhail Kukushkin2022-01-17 11:10:00.0R1286-3 6-4 6-2Tommy Paul
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Chun Hsin Tseng2022-01-17 12:00:00.0R1286-4 6-3 6-2Oscar Otte
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Sam Querrey2022-01-17 13:14:00.0R1287-5 6-3 6-3Lorenzo Sonego
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Federico Coria2022-01-17 15:32:00.0R1286-1 6-1 6-3Gael Monfils
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Cameron Norrie2022-01-17 11:10:00.0R1286-3 6-0 6-4Sebastian Korda
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Lucas Pouille2022-01-17 11:03:00.0R1283-6 6-3 6-4 6-3Corentin Moutet
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Fabio Fognini2022-01-17 13:08:00.0R1286-1 6-4 6-4Tallon Griekspoor
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Tomas Martin Etcheverry2022-01-17 18:14:00.0R1286-1 6-2 7-6(2)Pablo Carreno Busta
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Alejandro Tabilo2022-01-17 16:51:00.0R1286-2 6-2 6-3Carlos Alcaraz

Also, Pinot's retention manager will take care of removing the old segments the next time that it runs.

We'll see the following messages in the Pinot Controller's logs when the retention manager has run:

docker exec -it pinot-controller-csv grep -ri --color "\[SegmentDeletionManager\|\[RetentionManager" logs/pinot-all.log
Output
2022/03/14 13:11:18.006 INFO [RetentionManager] [main] Starting RetentionManager with runFrequencyInSeconds: 21600, deletedSegmentsRetentionInDays: 7
2022/03/14 13:15:08.024 INFO [RetentionManager] [pool-7-thread-4] Start managing retention for table: matches_OFFLINE
2022/03/14 13:15:08.032 WARN [RetentionManager] [pool-7-thread-4] Invalid retention time: null null for table: matches_OFFLINE, skip
2022/03/14 13:15:08.034 INFO [RetentionManager] [pool-7-thread-4] Start cleaning up segment lineage for table: matches_OFFLINE
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Finished cleaning up segment lineage for table: matches_OFFLINE, deleted segments: matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1 in 9ms
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Segment lineage metadata clean-up is successfully processed for table: matches_OFFLINE
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Removing aged (more than 7 days) deleted segments for all tables
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Moved segment matches_OFFLINE_1642417200000_1642433520000_0 from file:/data/matches/matches_OFFLINE_1642417200000_1642433520000_0 to file:/data/Deleted_Segments/matches/matches_OFFLINE_1642417200000_1642433520000_0
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Moved segment matches_OFFLINE_1642417380000_1642443240000_1 from file:/data/matches/matches_OFFLINE_1642417380000_1642443240000_1 to file:/data/Deleted_Segments/matches/matches_OFFLINE_1642417380000_1642443240000_1
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Deleted 2 segments from table matches_OFFLINE:[matches_OFFLINE_1642417200000_1642433520000_0, matches_OFFLINE_1642417380000_1642443240000_1]

We can then check the contents of that segment by running the following:

segments_breakdown
Output
merged_1day_1646909665668_0_matches_1642417200000_1642443240000_0,10,1642417200000,1642443240000

We now have a single segment that contains all the records from the two CSV files that we ingested.

Automatic scheduling

We can also automatically schedule the merge task by adding the following configuration to the Pinot Controller:

Controller configuration
controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=3600