How to get the time boundary for a hybrid table
Hybrid tables consist of real-time and offline tables with the same name. When querying these tables, the Pinot broker uses the time boundary to determine which records to read from the offline table and which to read from the real-time table.
To learn how to get the time boundary for a hybrid table, watch the following video, or complete the tutorial below, starting with Prerequites.
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/time-boundary-hybrid-table |
For more information on time boundaries, see Concepts: Time Boundary
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/managed-offline-flow
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Pinot Schema and Tables
Now let's create a Pinot Schema, as well as real-time and offline tables. Pinot is going to take care of populating data into the offline table, but it still expects us to configure the table.
Schema
Our schema is going to capture some simple events, and looks like this:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
config/schema.json
Offline Table
The offline table config is defined below:
{
"tableName": "events",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
}
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"tenants": {},
"metadata": {}
}
config/table-offline.json
Real-Time Table
And the real-time table is defined below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-rt:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"tenants": {},
"metadata": {}
}
config/table-realtime.json
You can create the schema and offline/real-time tables by running the following command:
docker run \
--network rt \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-realtimeTableConfigFile /config/table-realtime.json \
-offlineTableConfigFile /config/table-offline.json \
-controllerHost "pinot-controller-rt" \
-exec
Data Ingestion
Let's import some data into our tables.
Offline Table
We'll start by ingesting the following file of JSON documents:
{"ts": "1646993042165", "uuid": "5befc828ccf8457889c7104f1b0c9143", "count": 23}
{"ts": "1646993042172", "uuid": "56c76d09c9c54e12a15ff42500e153e9", "count": 124}
{"ts": "1646993042178", "uuid": "58fb1b4390f14e748c13056f02900c87", "count": 200}
{"ts": "1646993042182", "uuid": "f15ab0daf0344a5497ff1743fd38cc1d", "count": 638}
{"ts": "1646993042183", "uuid": "b561e1dde0dc4b3fba294198937b71f7", "count": 177}
{"ts": "1646996614169", "uuid": "5befc828ccf8457889c7104f1b0c9143", "count": 724}
{"ts": "1646996614173", "uuid": "56c76d09c9c54e12a15ff42500e153e9", "count": 623}
{"ts": "1646996614177", "uuid": "58fb1b4390f14e748c13056f02900c87", "count": 313}
{"ts": "1646996614181", "uuid": "f15ab0daf0344a5497ff1743fd38cc1d", "count": 836}
{"ts": "1646996614184", "uuid": "b561e1dde0dc4b3fba294198937b71f7", "count": 175}
input/events.json
The first five records have timestamps from 10th March 2022, the next five from 11th March 2022.
docker run \
--network rt \
-v $PWD/config:/config \
-v $PWD/input:/input \
apachepinot/pinot:1.0.0 LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml
You can find the ingestion job spec (opens in a new tab) in the pinot-recipes/time-boundary-hybrid-table (opens in a new tab) recipe's GitHub repository.
Real-Time Table
Now let's ingest some data into the real-time table via the events
Kafka topic:
python datagen.py |
kcat -P -b localhost:9092 -t events
Compute time boundary
Now we can compute the time boundary, by calling the HTTP API on the Pinot Broker, as shown below:
tableName="events"
curl "http://localhost:8099/debug/timeBoundary/${tableName}" \
-H "accept: application/json" 2>/dev/null | jq '.'
{
"timeColumn": "ts",
"timeValue": "1646993014184"
}
Let's convert this timestamp to a more friendly DateTime string using the following Python script:
import datetime
ts = 1646661240000
datetime.datetime.fromtimestamp(ts / 1000.0).isoformat()
Output
'2022-03-11T10:03:34.184000'
So any records that are equal to or less than 10.03 on 11th March 2022 will be served by the offline table. Anything after this date will be served by the real-time table.
And finally, let's understand how this time boundary was determined.
The time boundary is computed based on the value of ingestionConfig.batchIngestionConfig.segmentIngestionFrequency
in the offline table:
Our offline table had the following configuration:
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
}
}
This means that the time boundary will be an hour before the maximum end time in offline segments.
For more on time boundaries, including how they're computed and used by the Pinot Broker, see Concepts: Time Boundary