Skip to main content

How to get the time boundary for a hybrid table

Hybrid tables consist of real-time and offline tables with the same name. When querying these tables, the Pinot Broker uses the time boundary to work out which records to read from the offline table and which to read from the real-time table.

tip

For background reading on time boundaries, see Concepts: Time Boundary

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/managed-offline-flow

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.

Pinot Schema and Tables

Now let's create a Pinot Schema, as well as real-time and offline tables. Pinot is going to take care of populating data into the offline table, but it still expects us to configure the table.

Schema

Our schema is going to capture some simple events, and looks like this:

config/schema.json
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

You can create the schema by running the following command:

docker exec -it pinot-controller-rt bin/pinot-admin.sh AddSchema   \
-schemaFile /config/schema.json \
-exec

Offline Table

The offline table config is defined below:

config/table-offline.json
{
"tableName": "events",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
}
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"tenants": {},
"metadata": {}
}

You can create the table by running the following command:

docker exec -it pinot-controller-rt bin/pinot-admin.sh AddTable   \
-tableConfigFile /config/table-offline.json \
-exec

Real-Time Table

And the real-time table is defined below:

config/table-realtime.json
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-rt:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"tenants": {},
"metadata": {}
}

Data Ingestion

Let's import some data into our tables.

Offline Table

We'll start by ingesting the following file of JSON documents:

input/events.json
{"ts": "1646993042165", "uuid": "5befc828ccf8457889c7104f1b0c9143", "count": 23}
{"ts": "1646993042172", "uuid": "56c76d09c9c54e12a15ff42500e153e9", "count": 124}
{"ts": "1646993042178", "uuid": "58fb1b4390f14e748c13056f02900c87", "count": 200}
{"ts": "1646993042182", "uuid": "f15ab0daf0344a5497ff1743fd38cc1d", "count": 638}
{"ts": "1646993042183", "uuid": "b561e1dde0dc4b3fba294198937b71f7", "count": 177}
{"ts": "1646996614169", "uuid": "5befc828ccf8457889c7104f1b0c9143", "count": 724}
{"ts": "1646996614173", "uuid": "56c76d09c9c54e12a15ff42500e153e9", "count": 623}
{"ts": "1646996614177", "uuid": "58fb1b4390f14e748c13056f02900c87", "count": 313}
{"ts": "1646996614181", "uuid": "f15ab0daf0344a5497ff1743fd38cc1d", "count": 836}
{"ts": "1646996614184", "uuid": "b561e1dde0dc4b3fba294198937b71f7", "count": 175}
info

The first five records have timestamps from 10th March 2022, the next five from 11th March 2022.

docker exec -it pinot-controller-rt bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml
info

You can find the ingestion job spec in the pinot-recipes/time-boundary-hybrid-table recipe's GitHub repository.

Real-Time Table

Now let's ingest some data into the real-time table via the events Kafka topic:

while true; do
ts=`date +%s%N | cut -b1-13`;
uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
count=$[ $RANDOM % 1000 + 0 ]
echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done |
docker exec -i kafka-rt /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events

Compute time boundary

Now we can compute the time boundary, by calling the HTTP API on the Pinot Broker, as shown below:

tableName="events"
curl "http://localhost:8099/debug/timeBoundary/${tableName}" \
-H "accept: application/json" 2>/dev/null | jq '.'
{
"timeColumn": "ts",
"timeValue": "1646993014184"
}

Let's convert this timestamp to a more friendly DateTime string using the following Python script:

import datetime
ts = 1646661240000
datetime.datetime.fromtimestamp(ts / 1000.0).isoformat()
Output
'2022-03-11T10:03:34.184000'

So any records that are equal to or less than 10.03 on 11th March 2022 will be served by the offline table. Anything after this date will be served by the real-time table.

And finally, let's understand how this time boundary was determined. The time boundary is computed based on the value of ingestionConfig.batchIngestionConfig.segmentIngestionFrequency in the offline table:

Our offline table had the following configuration:

"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
}
}

This means that the time boundary will be an hour before the maximum end time in offline segments.

tip

For more on time boundaries, including how they're computed and used by the Pinot Broker, see Concepts: Time Boundary