Force Commit

How to force commit consuming segments

In this recipe we'll learn how to use the forceCommit API to immediately commit consuming segments. This API is usually used when we've made stream compatible changes to our table config, like changing segment.threshold parameters.

Prerequisites

You will need to install Docker (opens in a new tab) to follow the code examples in this guide.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/force-commit

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Data generator

This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:

python datagen.py 2>/dev/null | head -n1 | jq

Output is shown below:

{
  "ts": 1680171022742,
  "uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
  "count": 260
}

Kafka ingestion

We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool. We'll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

We can check that Kafka has some data by running the following command:

docker exec -it kafka-querysegment kafka-run-class.sh \
  kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic events

We'll see something like the following:

events:0:19960902

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

Now for the table config:

{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "realtime.segment.flush.threshold.rows":"2500000",
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-forcecommit:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.time":"1h"
      }
    },
    "ingestionConfig": {
       
      },
    "tenants": {},
    "metadata": {}
}

config/table.json

This highlighted section indicates that we're going to create new segments after every 2.5 million rows.

We'll create the table by running the following:

docker run \
   --network forcecommit \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-forcecommit" \
    -exec

Querying by segment

Once that's been created, we can head over to the Pinot UI (opens in a new tab) and run some queries.

Pinot has several built-in virtual columns inside every schema that can be used for debugging purposes:

Column NameColumn TypeData TypeDescription
$hostNameDimensionSTRINGName of the server hosting the data
$segmentNameDimensionSTRINGName of the segment containing the record
$docIdDimensionINTDocument id of the record within the segment

The one that's useful for us is $segmentName, which we can use like this to count the number of records in each segment:

select $segmentName, ToDateTime(max(ts), 'YYYY-MM-dd HH:mm:ss') as maxTs, count(*)
from events
group by $segmentName
order by maxTs desc
limit 100
$segmentNamemaxTscount(*)
events__0__1__20230602T1305Z2023-06-02 13:04:341423028
events__0__0__20230602T1304Z2023-06-02 13:04:272500000

Query Results

Now, let's say that we decide we want to reduce the segment threshold to 1m rows. We can do that with the following table config:

{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "realtime.segment.flush.threshold.rows":"1000000",
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-forcecommit:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.time":"1h"
      }
    },
    "ingestionConfig": {
       
      },
    "tenants": {},
    "metadata": {}
}

config/table-newthreshold.json

Let's update the table config:

docker run \
   --network forcecommit \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table-newthreshold.json \
     -controllerHost "pinot-controller-forcecommit" \
    -exec -update

We now have to wait until the current consuming segment reaches 2.5 million rows before this config will be used. Or...we can have it start immediately if we call the forceCommit API:

curl -X POST "http://localhost:9000/tables/events/forceCommit" -H "accept: application/json"

After a few seconds the consuming segment will be committed and segments will be committed every 1m rows. If we run the query again, we'll see this output:

$segmentNamemaxTscount(*)
events__0__5__20230602T1305Z2023-06-02 13:04:551000000
events__0__4__20230602T1305Z2023-06-02 13:04:501000000
events__0__3__20230602T1305Z2023-06-02 13:04:451000000
events__0__2__20230602T1305Z2023-06-02 13:04:391000000
events__0__1__20230602T1305Z2023-06-02 13:04:341423028
events__0__0__20230602T1304Z2023-06-02 13:04:272500000

Query Results