Try StarTree Cloud: 30-day free trial
Get time boundary

How to get the time boundary for a hybrid table

Hybrid tables consist of real-time and offline tables with the same name. When querying these tables, the Pinot broker uses the time boundary to determine which records to read from the offline table and which to read from the real-time table.

To learn how to get the time boundary for a hybrid table, watch the following video, or complete the tutorial below, starting with Prerequites.

💡

For more information on time boundaries, see Concepts: Time Boundary

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/managed-offline-flow

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Pinot Schema and Tables

Now let's create a Pinot Schema, as well as real-time and offline tables. Pinot is going to take care of populating data into the offline table, but it still expects us to configure the table.

Schema

Our schema is going to capture some simple events, and looks like this:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [
    {
      "name": "uuid",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "count",
      "dataType": "INT"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "ts",
    "dataType": "TIMESTAMP",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}

config/schema.json

Offline Table

The offline table config is defined below:

{
  "tableName": "events",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "HOURLY"
    }
  },
  "tableIndexConfig": {
    "loadMode": "MMAP"
  },
  "tenants": {},
  "metadata": {}
}

config/table-offline.json

Real-Time Table

And the real-time table is defined below:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
    "streamType": "kafka",
    "stream.kafka.topic.name": "events",
    "stream.kafka.broker.list": "kafka-rt:9093",
    "stream.kafka.consumer.type": "lowlevel",
    "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
    "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
    "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
    }
  },
  "tenants": {},
  "metadata": {}
}

config/table-realtime.json

You can create the schema and offline/real-time tables by running the following command:

docker run \
   --network rt \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
   -schemaFile /config/schema.json \
   -realtimeTableConfigFile /config/table-realtime.json \
   -offlineTableConfigFile /config/table-offline.json \
   -controllerHost "pinot-controller-rt" \
   -exec

Data Ingestion

Let's import some data into our tables.

Offline Table

We'll start by ingesting the following file of JSON documents:

{"ts": "1646993042165", "uuid": "5befc828ccf8457889c7104f1b0c9143", "count": 23}
{"ts": "1646993042172", "uuid": "56c76d09c9c54e12a15ff42500e153e9", "count": 124}
{"ts": "1646993042178", "uuid": "58fb1b4390f14e748c13056f02900c87", "count": 200}
{"ts": "1646993042182", "uuid": "f15ab0daf0344a5497ff1743fd38cc1d", "count": 638}
{"ts": "1646993042183", "uuid": "b561e1dde0dc4b3fba294198937b71f7", "count": 177}
{"ts": "1646996614169", "uuid": "5befc828ccf8457889c7104f1b0c9143", "count": 724}
{"ts": "1646996614173", "uuid": "56c76d09c9c54e12a15ff42500e153e9", "count": 623}
{"ts": "1646996614177", "uuid": "58fb1b4390f14e748c13056f02900c87", "count": 313}
{"ts": "1646996614181", "uuid": "f15ab0daf0344a5497ff1743fd38cc1d", "count": 836}
{"ts": "1646996614184", "uuid": "b561e1dde0dc4b3fba294198937b71f7", "count": 175}

input/events.json

The first five records have timestamps from 10th March 2022, the next five from 11th March 2022.

docker run \
   --network rt \
   -v $PWD/config:/config \
   -v $PWD/input:/input \
   apachepinot/pinot:1.0.0 LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec.yml

Real-Time Table

Now let's ingest some data into the real-time table via the events Kafka topic:

python datagen.py |
kcat -P -b localhost:9092 -t events

Compute time boundary

Now we can compute the time boundary, by calling the HTTP API on the Pinot Broker, as shown below:

tableName="events"
curl "http://localhost:8099/debug/timeBoundary/${tableName}" \
  -H "accept: application/json" 2>/dev/null | jq '.'
{
  "timeColumn": "ts",
  "timeValue": "1646993014184"
}

Let's convert this timestamp to a more friendly DateTime string using the following Python script:

import datetime
ts = 1646661240000
datetime.datetime.fromtimestamp(ts / 1000.0).isoformat()

Output

'2022-03-11T10:03:34.184000'

So any records that are equal to or less than 10.03 on 11th March 2022 will be served by the offline table. Anything after this date will be served by the real-time table.

And finally, let's understand how this time boundary was determined. The time boundary is computed based on the value of ingestionConfig.batchIngestionConfig.segmentIngestionFrequency in the offline table:

Our offline table had the following configuration:

"ingestionConfig": {
  "batchIngestionConfig": {
    "segmentIngestionType": "APPEND",
    "segmentIngestionFrequency": "HOURLY"
  }
}

This means that the time boundary will be an hour before the maximum end time in offline segments.

💡

For more on time boundaries, including how they're computed and used by the Pinot Broker, see Concepts: Time Boundary