How to force commit consuming segments
In this recipe we'll learn how to use the forceCommit
API to immediately commit consuming segments.
This API is usually used when we've made stream compatible changes to our table config, like changing segment.threshold
parameters.
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/force-commit |
Prerequisites
You will need to install Docker (opens in a new tab) to follow the code examples in this guide.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/force-commit
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Data generator
This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:
python datagen.py 2>/dev/null | head -n1 | jq
Output is shown below:
{
"ts": 1680171022742,
"uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
"count": 260
}
Kafka ingestion
We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool.
We'll also use jq
to structure the data in the key:payload
structure that Kafka expects:
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
We can check that Kafka has some data by running the following command:
docker exec -it kafka-querysegment kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events
We'll see something like the following:
events:0:19960902
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "events",
"dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
"metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Now for the table config:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"realtime.segment.flush.threshold.rows":"2500000",
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-forcecommit:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {
},
"tenants": {},
"metadata": {}
}
config/table.json
This highlighted section indicates that we're going to create new segments after every 2.5 million rows.
We'll create the table by running the following:
docker run \
--network forcecommit \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-forcecommit" \
-exec
Querying by segment
Once that's been created, we can head over to the Pinot UI (opens in a new tab) and run some queries.
Pinot has several built-in virtual columns inside every schema that can be used for debugging purposes:
Column Name | Column Type | Data Type | Description |
---|---|---|---|
$hostName | Dimension | STRING | Name of the server hosting the data |
$segmentName | Dimension | STRING | Name of the segment containing the record |
$docId | Dimension | INT | Document id of the record within the segment |
The one that's useful for us is $segmentName
, which we can use like this to count the number of records in each segment:
select $segmentName, ToDateTime(max(ts), 'YYYY-MM-dd HH:mm:ss') as maxTs, count(*)
from events
group by $segmentName
order by maxTs desc
limit 100
$segmentName | maxTs | count(*) |
---|---|---|
events__0__1__20230602T1305Z | 2023-06-02 13:04:34 | 1423028 |
events__0__0__20230602T1304Z | 2023-06-02 13:04:27 | 2500000 |
Query Results
Now, let's say that we decide we want to reduce the segment threshold to 1m rows. We can do that with the following table config:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"realtime.segment.flush.threshold.rows":"1000000",
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-forcecommit:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {
},
"tenants": {},
"metadata": {}
}
config/table-newthreshold.json
Let's update the table config:
docker run \
--network forcecommit \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table-newthreshold.json \
-controllerHost "pinot-controller-forcecommit" \
-exec -update
We now have to wait until the current consuming segment reaches 2.5 million rows before this config will be used.
Or...we can have it start immediately if we call the forceCommit
API:
curl -X POST "http://localhost:9000/tables/events/forceCommit" -H "accept: application/json"
After a few seconds the consuming segment will be committed and segments will be committed every 1m rows. If we run the query again, we'll see this output:
$segmentName | maxTs | count(*) |
---|---|---|
events__0__5__20230602T1305Z | 2023-06-02 13:04:55 | 1000000 |
events__0__4__20230602T1305Z | 2023-06-02 13:04:50 | 1000000 |
events__0__3__20230602T1305Z | 2023-06-02 13:04:45 | 1000000 |
events__0__2__20230602T1305Z | 2023-06-02 13:04:39 | 1000000 |
events__0__1__20230602T1305Z | 2023-06-02 13:04:34 | 1423028 |
events__0__0__20230602T1304Z | 2023-06-02 13:04:27 | 2500000 |
Query Results