How to get the time boundary for a hybrid table
Hybrid tables consist of real-time and offline tables with the same name. When querying these tables, the Pinot Broker uses the time boundary to work out which records to read from the offline table and which to read from the real-time table.
Pinot Version | 0.9.3 |
Code | startreedata/pinot-recipes/time-boundary-hybrid-table |
For background reading on time boundaries, see Concepts: Time Boundary
Pre-requisites
You will need to install Docker locally to follow the code examples in this guide.
Download Recipe
First, clone the GitHub repository to your local machine and navigate to this recipe:
git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/managed-offline-flow
If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.
Pinot Schema and Tables
Now let's create a Pinot Schema, as well as real-time and offline tables. Pinot is going to take care of populating data into the offline table, but it still expects us to configure the table.
Schema
Our schema is going to capture some simple events, and looks like this:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
You can create the schema by running the following command:
docker exec -it pinot-controller-rt bin/pinot-admin.sh AddSchema \
-schemaFile /config/schema.json \
-exec
Offline Table
The offline table config is defined below:
{
"tableName": "events",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
}
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"tenants": {},
"metadata": {}
}
You can create the table by running the following command:
docker exec -it pinot-controller-rt bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table-offline.json \
-exec
Real-Time Table
And the real-time table is defined below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-rt:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"tenants": {},
"metadata": {}
}
Data Ingestion
Let's import some data into our tables.
Offline Table
We'll start by ingesting the following file of JSON documents:
{"ts": "1646993042165", "uuid": "5befc828ccf8457889c7104f1b0c9143", "count": 23}
{"ts": "1646993042172", "uuid": "56c76d09c9c54e12a15ff42500e153e9", "count": 124}
{"ts": "1646993042178", "uuid": "58fb1b4390f14e748c13056f02900c87", "count": 200}
{"ts": "1646993042182", "uuid": "f15ab0daf0344a5497ff1743fd38cc1d", "count": 638}
{"ts": "1646993042183", "uuid": "b561e1dde0dc4b3fba294198937b71f7", "count": 177}
{"ts": "1646996614169", "uuid": "5befc828ccf8457889c7104f1b0c9143", "count": 724}
{"ts": "1646996614173", "uuid": "56c76d09c9c54e12a15ff42500e153e9", "count": 623}
{"ts": "1646996614177", "uuid": "58fb1b4390f14e748c13056f02900c87", "count": 313}
{"ts": "1646996614181", "uuid": "f15ab0daf0344a5497ff1743fd38cc1d", "count": 836}
{"ts": "1646996614184", "uuid": "b561e1dde0dc4b3fba294198937b71f7", "count": 175}
The first five records have timestamps from 10th March 2022, the next five from 11th March 2022.
docker exec -it pinot-controller-rt bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml
You can find the ingestion job spec in the pinot-recipes/time-boundary-hybrid-table recipe's GitHub repository.
Real-Time Table
Now let's ingest some data into the real-time table via the events
Kafka topic:
while true; do
ts=`date +%s%N | cut -b1-13`;
uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
count=$[ $RANDOM % 1000 + 0 ]
echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done |
docker exec -i kafka-rt /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events
Compute time boundary
Now we can compute the time boundary, by calling the HTTP API on the Pinot Broker, as shown below:
tableName="events"
curl "http://localhost:8099/debug/timeBoundary/${tableName}" \
-H "accept: application/json" 2>/dev/null | jq '.'
{
"timeColumn": "ts",
"timeValue": "1646993014184"
}
Let's convert this timestamp to a more friendly DateTime string using the following Python script:
import datetime
ts = 1646661240000
datetime.datetime.fromtimestamp(ts / 1000.0).isoformat()
'2022-03-11T10:03:34.184000'
So any records that are equal to or less than 10.03 on 11th March 2022 will be served by the offline table. Anything after this date will be served by the real-time table.
And finally, let's understand how this time boundary was determined.
The time boundary is computed based on the value of ingestionConfig.batchIngestionConfig.segmentIngestionFrequency
in the offline table:
Our offline table had the following configuration:
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
}
}
This means that the time boundary will be an hour before the maximum end time in offline segments.
For more on time boundaries, including how they're computed and used by the Pinot Broker, see Concepts: Time Boundary