Skip to main content

Configuring the segment threshold

In this guide we'll learn how to configure the segment threshold for real-time tables.

What is the segment threshold?

The segment threshold determines when a segment is committed in real-time tables.

When data is first ingested from a streaming provider like Kafka, it gets stored in a consuming segment. The data in the consuming segment is stored on the disk of the server(s) that are processing a particular partition from the streaming provider.

However, it's not until a segment is committed that the segment is written to the deep store. The segment threshold decides when that should happen.

Why should we care about it?

We care about the segment threshold because we want to make sure that our segments are a reasonable size.

  • Queries are processed at the segment level, so if segments are too small it may result in higher query latencies as there is increased overhead when processing queries (in terms of number of threads spawned, meta data processing, etc).
  • If they're too big, this may result in servers running out of memory. If a server is restarted the consuming segment will need to start consuming from the first row again, which will cause lag between Pinot and the streaming provider.

Configuration parameters

The segment threshold is configured using the following parameters:

PropertyDescription
realtime.segment.flush.threshold.rowsRow count flush threshold.
realtime.segment.flush.threshold.timeTime threshold that will keep a segment open for it is flushed.
realtime.segment.flush.threshold.segment.sizeThe desired size of a completed segment.

There are two main approaches for configuring these parameters:

Desired row threshold

We can define a manual row threshold by specifying a value for realtime.segment.flush.threshold.rows.

Pinot will complete/flush segments as soon as the consuming segment contains the specified number of rows. This will generally result in each segment having the same number of rows.

However, if the time threshold defined by realtime.segment.flush.threshold.time is reached, a segment will be completed even if the row count flush threshold has not yet been reached.

tip

If realtime.segment.flush.threshold.rows is set to a value great than 0, realtime.segment.flush.threshold.segment.size is ignored.

Desired segment size

Alternatively we can set realtime.segment.flush.threshold.rows to 0, in which case Pinot will instead attempt to make sure that every segment has the desired size defined by realtime.segment.flush.threshold.segment.size.

When configuring the segment threshold this way, the minimum number of rows in a segment is 10,000.

The first segment for a new partition will have 100,000 rows. For subsequent segments Pinot will slowly adjust the number of rows to get closer to the desired segment size. This means that the first few segments might differ in size, but over time the segment size will approach the desired size.

info

The algorithm used in this approach is described in more detail in the Auto-tuning Pinot real-time consumption blog post.

A worked example

Let's see how this works with a worked example.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/configuring-segment-threshold

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml file on GitHub.

Pinot Schema and Table

Let's create a Pinot Schema and Table.

The schema is defined below:

config/schema.json
{
"schemaName":"events",
"dimensionFieldSpecs":[
{
"name":"uuid",
"dataType":"STRING"
}
],
"metricFieldSpecs":[
{
"name":"count",
"dataType":"INT"
}
],
"dateTimeFieldSpecs":[
{
"name":"ts",
"dataType":"TIMESTAMP",
"format":"1:MILLISECONDS:EPOCH",
"granularity":"1:MILLISECONDS"
}
]
}

And the table config below:

config/table.json*
{
"tableName":"events",
"tableType":"REALTIME",
"segmentsConfig":{
"timeColumnName":"ts",
"schemaName":"events",
"replication":"1",
"replicasPerPartition":"1"
},
"tableIndexConfig":{
"loadMode":"MMAP",
"streamConfigs":{
"streamType":"kafka",
"stream.kafka.topic.name":"events",
"stream.kafka.broker.list":"kafka-segment:9093",
"stream.kafka.consumer.type":"lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset":"smallest",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows":"0",
"realtime.segment.flush.threshold.time":"1h",
"realtime.segment.flush.threshold.segment.size":"5M"
}
},
"ingestionConfig":{
"batchIngestionConfig":{
"segmentIngestionType":"APPEND",
"segmentIngestionFrequency":"DAILY"
}
},
"tenants":{

},
"metadata":{

}
}

Pinot will try to ensure that the size of our segments is 5MB. It will complete a segment after 1 hour if it hasn't already been flushed.

Create the table and schema by running the following command:

docker exec -it pinot-controller-segment bin/pinot-admin.sh AddTable   \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json \
-exec

Ingesting Data

Next, we're going to ingest some data into Kafka:

while true; do
ts=`date +%s%N | cut -b1-13`;
uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
count=$[ $RANDOM % 1000 + 0 ]
echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done | docker exec -i kafka-segment /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events;

Inspecting segment sizes

After the ingestion script has run for a while we can inspect the size of the segments that have been completed.

You can return the size of each completed segment by running the following command:

curl -X GET "http://localhost:9000/tables/events/size?detailed=true" \
-H "accept: application/json" 2>/dev/null |
jq -r '.realtimeSegments .segments | .[] | .serverInfo | .[]' |
jq -sc 'sort_by(.segmentName) | .[]'
Output
{"segmentName":"events__0__0__20220301T1457Z","diskSizeInBytes":5144515}
{"segmentName":"events__0__1__20220301T1504Z","diskSizeInBytes":5242744}
{"segmentName":"events__0__2__20220301T1511Z","diskSizeInBytes":5242744}
{"segmentName":"events__0__3__20220301T1518Z","diskSizeInBytes":5242744}
{"segmentName":"events__0__4__20220301T1525Z","diskSizeInBytes":5242744}
{"segmentName":"events__0__5__20220301T1532Z","diskSizeInBytes":5242744}
{"segmentName":"events__0__6__20220301T1539Z","diskSizeInBytes":5242797}
{"segmentName":"events__0__7__20220301T1546Z","diskSizeInBytes":5242797}

We can see that all the segments have a size of just over 5 million bytes, which is close to 5 MB. The first segment had a size of 4.9 MB, but after that the size increased to 4.9998 MB until the last two segments where it's increased to 4.9999 MB. Presumably adding an extra row would then take us further away from 5 MB.

Now let's see how many rows are contained in each segment. You can do that for one of the segments by running the following command:

curl -X GET "http://localhost:9000/segments/events/events__0__1__20220301T1504Z/metadata" \
-H "accept: application/json" 2>/dev/null
Output
{
"segment.crc": "1090576205",
"segment.creation.time": "1646147045318",
"segment.end.time": "1646147463422",
"segment.flush.threshold.size": "101912",
"segment.index.version": "v3",
"segment.name": "events__0__1__20220301T1504Z",
"segment.realtime.download.url": "http://192.168.0.3:9000/segments/events/events__0__1__20220301T1504Z",
"segment.realtime.endOffset": "201912",
"segment.realtime.numReplicas": "1",
"segment.realtime.startOffset": "100000",
"segment.realtime.status": "DONE",
"segment.start.time": "1646147042226",
"segment.table.name": "events",
"segment.time.unit": "MILLISECONDS",
"segment.total.docs": "101912",
"segment.type": "REALTIME"
}

The segment.flush.threshold.size property indicates that this segment contains 101,912 rows.

We can check how many rows are stored in all segments by running the following script:

segments=`curl -X GET "http://localhost:9000/tables/events/size?detailed=true" \
-H "accept: application/json" 2>/dev/null |
jq -r '.realtimeSegments .segments | .[] | .serverInfo | .[]' |
jq -sc 'sort_by(.segmentName) | .[]'`

for segment in ${segments}; do
segmentName=`echo $segment | jq -r '.segmentName'`
rows=`curl -X GET "http://localhost:9000/segments/events/${segmentName}/metadata" \
-H "accept: application/json" 2>/dev/null |
jq '."segment.flush.threshold.size" | tonumber'`
echo $segment | jq --arg rows $rows -c '. + {"rows": $rows}'
done
Output
{"segmentName":"events__0__0__20220301T1457Z","diskSizeInBytes":5144515,"rows":"100000"}
{"segmentName":"events__0__1__20220301T1504Z","diskSizeInBytes":5242744,"rows":"101912"}
{"segmentName":"events__0__2__20220301T1511Z","diskSizeInBytes":5242744,"rows":"101912"}
{"segmentName":"events__0__3__20220301T1518Z","diskSizeInBytes":5242744,"rows":"101912"}
{"segmentName":"events__0__4__20220301T1525Z","diskSizeInBytes":5242744,"rows":"101912"}
{"segmentName":"events__0__5__20220301T1532Z","diskSizeInBytes":5242744,"rows":"101912"}
{"segmentName":"events__0__6__20220301T1539Z","diskSizeInBytes":5242797,"rows":"101913"}
{"segmentName":"events__0__7__20220301T1546Z","diskSizeInBytes":5242797,"rows":"101913"}

It looks like Pinot has settled in on a size of 101,913 rows per segment. The number of rows would be reduced if we imported much bigger strings into the uuid column, but for now it looks fairly stable.