Merge small segments

How to merge small segments in offline tables

In this recipe we'll learn how to merge small segments into larger ones. By doing this, Pinot can benefit from disk storage and query performance. This is done using the Minion merge rollup task (opens in a new tab).

⚠️

You can only merge segments in offline tables that have a time column.

💡

You can also merge segments in real-time tables. For more information see the merge segments in real-time tables guide

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/merge-small-segments

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Dataset

We're going to import a couple of CSVs that contain results from the Australian Open tennis tournament that was held in January 2022. The contents of the files are shown below:

round,winner,loser,score,matchTime
"R128","Miomir Kecmanovic","Salvatore Caruso","6-4 6-2 6-1","2022-01-17 11:00:00"
"R128","Tommy Paul","Mikhail Kukushkin","6-3 6-4 6-2","2022-01-17 11:10:00"
"R128","Oscar Otte","Chun Hsin Tseng","6-4 6-3 6-2","2022-01-17 12:00:00"
"R128","Lorenzo Sonego","Sam Querrey","7-5 6-3 6-3","2022-01-17 13:14:00"
"R128","Gael Monfils","Federico Coria","6-1 6-1 6-3","2022-01-17 15:32:00"

input/matches0.csv

round,winner,loser,score,matchTime
"R128","Sebastian Korda","Cameron Norrie","6-3 6-0 6-4","2022-01-17 11:10:00"
"R128","Corentin Moutet","Lucas Pouille","3-6 6-3 6-4 6-3","2022-01-17 11:03:00"
"R128","Tallon Griekspoor","Fabio Fognini","6-1 6-4 6-4","2022-01-17 13:08:00"
"R128","Pablo Carreno Busta","Tomas Martin Etcheverry","6-1 6-2 7-6(2)","2022-01-17 18:14:00"
"R128","Carlos Alcaraz","Alejandro Tabilo","6-2 6-2 6-3","2022-01-17 16:51:00"

input/matches1.csv

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "matches",
  "dimensionFieldSpecs": [{
      "name": "round",
      "dataType": "STRING"
    },
    {
      "name": "winner",
      "dataType": "STRING"
    },
    {
      "name": "loser",
      "dataType": "STRING"
    },
    {
      "name": "score",
      "dataType": "STRING"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "matchTime",
    "dataType": "TIMESTAMP",
    "format": "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}

config/schema.json

We'll also have the following table config:

{
  "tableName": "matches",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "timeColumnName": "matchTime",
    "schemaName": "matches",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "1day.mergeType": "concat",
        "1day.bucketTimePeriod": "1d",
        "1day.bufferTimePeriod": "5m"
      }
    }
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP"
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    },
    "transformConfigs": []
  },
  "metadata": {}
}

config/table.json

🚫

Our table must specify segmentsConfig.timeColumnName, otherwise the merge process won't merge any segments.

The main thing that we're interested in is the MergeRollupTask, which is extracted below:

"task": {
  "taskTypeConfigsMap": {
    "MergeRollupTask": {
      "1day.mergeType": "concat",
      "1day.bucketTimePeriod": "1d",
      "1day.bufferTimePeriod": "5m"
    }
  }
}

This configuration will bucket records from the same 1 day period into the same segment. It will only process records with a timestamp from more than 5 minutes ago.

You can create the table and schema by running the following command:`

docker run \
   --network mergesegments \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-mergesegments" \
    -exec

Remove the -arm64 suffix if you're not using a Mac M1/M2.

Import Data

Now let's import those CSV files into Pinot, using the following ingestion spec:

executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/input'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/data'
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'matches'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'
pushJobSpec:
  pushAttempts: 2
  pushRetryIntervalMillis: 1000

config/job-spec.yml

You can run the following command to run the import:

docker run \
   --network mergesegments \
   -v $PWD/config:/config \
   -v $PWD/data:/data \
   -v $PWD/input:/input \
   apachepinot/pinot:1.0.0 LaunchDataIngestionJob \
    -jobSpecFile /config/job-spec.yml \
    -values pinotController=http://pinot-controller-mergesegments:9000

Once this job has run, we can list the created segments by running the following command:

curl -X GET "http://localhost:9000/segments/matches?type=OFFLINE" \
  -H "accept: application/json" 2>/dev/null | jq '.' 

Output

[
  {
    "OFFLINE": [
      "matches_OFFLINE_1642417200000_1642433520000_0",
      "matches_OFFLINE_1642417380000_1642443240000_1"
    ]
  }
]

Let's wrap this command in a function so that we can use it again later:

show_segments () {
  table=${1:="matches"}
  table_type=${2:="OFFLINE"}
  curl -X GET "http://localhost:9000/segments/${table}?type=${table_type}" \
    -H "accept: application/json" 2>/dev/null | jq '.' 
}

We could then call the function like this:

show_segments

We can check the contents of these segments by writing the following function:

segments_breakdown () {
  table=${1:="matches"}
  table_type=${2:="OFFLINE"}
  segments=$(curl -X GET "http://localhost:9000/segments/${table}?type=${table_type}"  2>/dev/null | jq -r '.[] []')
 
  for segment in $(echo $segments | jq '.[]'); do 
    metadata=`curl -X GET "http://localhost:9000/segments/${table}/$(echo ${segment} | jq -r)/metadata" \
      -H "accept: application/json" 2>/dev/null | jq '.'`
    docs=`echo $metadata | jq '."segment.total.docs" | tonumber'`
    startTime=`echo $metadata | jq '."segment.start.time" | tonumber'`
    endTime=`echo $metadata | jq '."segment.end.time" | tonumber'`
    echo "$segment,$docs,$startTime,$endTime"
  done
}

We can call it like this:

segments_breakdown

Output

matches_OFFLINE_1642417200000_1642433520000_0,5,1642417200000,1642433520000
matches_OFFLINE_1642417380000_1642443240000_1,5,1642417380000,1642443240000

Merge segments

Now we're going to merge these segments using the Minion merge rollup task (opens in a new tab).

The configuration that we defined in the matches table is going to bucket records from a 1 day period into the same bucket. Since our events all happened on the same day, we would expect that all records will be merge into a single segment.

We can run the merge rollup task by running the following:

tableName="matches_OFFLINE"
curl -X POST "http://localhost:9000/tasks/schedule?taskType=MergeRollupTask&tableName=${tableName}" \
  -H "accept: application/json" 2>/dev/null | jq '.'

Output

{
  "MergeRollupTask": "Task_MergeRollupTask_1646909665695"
}
 

We can then check the Pinot Controller logs to see that it's been triggered:

docker exec -it pinot-controller-csv grep -ri --color "\[MergeRollup" logs/pinot-all.log

Output

2022/03/10 10:51:37.383 INFO [TaskGeneratorRegistry] [main] Initialized TaskGeneratorRegistry with 4 task generators: [MergeRollupTask, RealtimeToOfflineSegmentsTask, ConvertToRawIndexTask, SegmentGenerationAndPushTask] in 716ms
2022/03/10 10:54:25.628 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Start generating task configs for table: matches_OFFLINE for task: MergeRollupTask
2022/03/10 10:54:25.666 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Creating the gauge metric for tracking the merge/roll-up task delay for table: matches_OFFLINE and mergeLevel: 1day.(watermarkMs=1642377600000, bufferTimeMs=86400000, bucketTimeMs=86400000, taskDelayInNumTimeBuckets=0)
2022/03/10 10:54:25.667 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Update watermark for table: matches_OFFLINE, mergeLevel: 1day from: 1642377600000 to: 1642377600000
2022/03/10 10:54:25.695 INFO [MergeRollupTaskGenerator] [grizzly-http-server-25] Finished generating task configs for table: matches_OFFLINE for task: MergeRollupTask, numTasks: 1

And we can check the Pinot Minion logs to see if the job has run:

docker exec -it pinot-minion-csv grep -ri --color "INFO \[MergeRollup" logs/pinot-all.log

Output

2022/03/10 10:54:37.768 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Starting task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=1d, segmentNamePrefix=merged_1day_1646909665668_0_matches, authToken=null, downloadURL=http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417200000_1642433520000_0,http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417380000_1642443240000_1, mergeType=concat, mergeLevel=1day, segmentName=matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1, tableName=matches_OFFLINE, uploadURL=http://192.168.96.3:9000/segments, enableReplaceSegments=true, roundBucketTimePeriod=null}
2022/03/10 10:54:38.149 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Finished task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=1d, segmentNamePrefix=merged_1day_1646909665668_0_matches, authToken=null, downloadURL=http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417200000_1642433520000_0,http://192.168.96.3:9000/segments/matches/matches_OFFLINE_1642417380000_1642443240000_1, mergeType=concat, mergeLevel=1day, segmentName=matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1, tableName=matches_OFFLINE, uploadURL=http://192.168.96.3:9000/segments, enableReplaceSegments=true, roundBucketTimePeriod=null}. Total time: 381ms

Let's now check the list of segments again:

show_segments

Output

[
  {
    "OFFLINE": [
      "matches_OFFLINE_1642417200000_1642433520000_0",
      "matches_OFFLINE_1642417380000_1642443240000_1",
      "merged_1day_1646909665668_0_matches_1642417200000_1642443240000_0"
    ]
  }
]

We can see the new segment, but the initial segments are still there as well. The Pinot broker knows to use the new segment when it processes queries, so this isn't a problem. We can confirm this by running the following query on the Pinot UI (opens in a new tab):

select $segmentName, * 
from matches
$segmentNamelosermatchTimeroundscorewinner
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Salvatore Caruso2022-01-17 11:00:00.0R1286-4 6-2 6-1Miomir Kecmanovic
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Mikhail Kukushkin2022-01-17 11:10:00.0R1286-3 6-4 6-2Tommy Paul
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Chun Hsin Tseng2022-01-17 12:00:00.0R1286-4 6-3 6-2Oscar Otte
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Sam Querrey2022-01-17 13:14:00.0R1287-5 6-3 6-3Lorenzo Sonego
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Federico Coria2022-01-17 15:32:00.0R1286-1 6-1 6-3Gael Monfils
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Cameron Norrie2022-01-17 11:10:00.0R1286-3 6-0 6-4Sebastian Korda
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Lucas Pouille2022-01-17 11:03:00.0R1283-6 6-3 6-4 6-3Corentin Moutet
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Fabio Fognini2022-01-17 13:08:00.0R1286-1 6-4 6-4Tallon Griekspoor
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Tomas Martin Etcheverry2022-01-17 18:14:00.0R1286-1 6-2 7-6(2)Pablo Carreno Busta
merged_1day_1680183805991_0_matches_1642417200000_1642443240000_0Alejandro Tabilo2022-01-17 16:51:00.0R1286-2 6-2 6-3Carlos Alcaraz

Also, Pinot's retention manager will take care of removing the old segments the next time that it runs.

We'll see the following messages in the Pinot Controller's logs when the retention manager has run:

docker exec -it pinot-controller-csv grep -ri --color "\[SegmentDeletionManager\|\[RetentionManager" logs/pinot-all.log

Output

2022/03/14 13:11:18.006 INFO [RetentionManager] [main] Starting RetentionManager with runFrequencyInSeconds: 21600, deletedSegmentsRetentionInDays: 7
2022/03/14 13:15:08.024 INFO [RetentionManager] [pool-7-thread-4] Start managing retention for table: matches_OFFLINE
2022/03/14 13:15:08.032 WARN [RetentionManager] [pool-7-thread-4] Invalid retention time: null null for table: matches_OFFLINE, skip
2022/03/14 13:15:08.034 INFO [RetentionManager] [pool-7-thread-4] Start cleaning up segment lineage for table: matches_OFFLINE
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Finished cleaning up segment lineage for table: matches_OFFLINE, deleted segments: matches_OFFLINE_1642417200000_1642433520000_0,matches_OFFLINE_1642417380000_1642443240000_1 in 9ms
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Segment lineage metadata clean-up is successfully processed for table: matches_OFFLINE
2022/03/14 13:15:08.043 INFO [RetentionManager] [pool-7-thread-4] Removing aged (more than 7 days) deleted segments for all tables
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Moved segment matches_OFFLINE_1642417200000_1642433520000_0 from file:/data/matches/matches_OFFLINE_1642417200000_1642433520000_0 to file:/data/Deleted_Segments/matches/matches_OFFLINE_1642417200000_1642433520000_0
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Moved segment matches_OFFLINE_1642417380000_1642443240000_1 from file:/data/matches/matches_OFFLINE_1642417380000_1642443240000_1 to file:/data/Deleted_Segments/matches/matches_OFFLINE_1642417380000_1642443240000_1
2022/03/14 13:15:10.053 INFO [SegmentDeletionManager] [PinotHelixResourceManagerExecutorService] Deleted 2 segments from table matches_OFFLINE:[matches_OFFLINE_1642417200000_1642433520000_0, matches_OFFLINE_1642417380000_1642443240000_1]

We can then check the contents of that segment by running the following:

segments_breakdown

Output

merged_1day_1646909665668_0_matches_1642417200000_1642443240000_0,10,1642417200000,1642443240000

We now have a single segment that contains all the records from the two CSV files that we ingested.

Automatic scheduling

We can also automatically schedule the merge task by adding the following configuration to the Pinot Controller:

controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=3600 

Controller configuration