Skip to main content

How to automatically schedule real-time to offline job

In this recipe we'll learn how to automatically schedule Apache Pinot's Real-Time to Offline job. This job is used to transition data from real-time to offline tables. For background reading on the use case for doing this, see Managed Offline Flow.

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/managed-offline-flow-automatic-scheduling

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pinot Minion, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.

Controller configuration

We need to provide configuration parameters to the Pinot Controller to have the Real-Time to Offline job (RT2OFF) run automatically. This is done in the following section of the Docker Compose file:

pinot-controller:
image: apachepinot/pinot:0.9.3
command: "StartController -zkAddress zookeeper-rt:2181 -config /config/controller-conf.conf"

The configuration is specified in /config/controller-conf.conf, the contents of which are shown below:

/config/controller-conf.conf
controller.access.protocols.http.port=9000
controller.zk.str=zookeeper-rt:2181
controller.helix.cluster.name=PinotCluster
controller.host=pinot-controller-rt
controller.port=9000
controller.data.dir=/data

controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=5m

We're particularly interested in the last two lines:

  • controller.task.scheduler.enabled=true enables the automatic running of the RT2OFF job
  • controller.task.frequencyPeriod=5m configures it to run every 5 minutes

Pinot Schema and Tables

Now let's create a Pinot Schema, as well as real-time and offline tables. Pinot is going to take care of populating data into the offline table, but it still expects us to configure the table.

Schema

Our schema is going to capture some simple events, and looks like this:

config/schema.json
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

You can create the schema by running the following command:

docker exec -it pinot-controller-rt bin/pinot-admin.sh AddSchema   \
-schemaFile /config/schema.json \
-exec

Offline Table

The offline table config is defined below:

config/table-offline.json
{
"tableName": "events",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
}
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"tenants": {},
"metadata": {}
}

You can create the table by running the following command:

docker exec -it pinot-controller-rt bin/pinot-admin.sh AddTable   \
-tableConfigFile /config/table-offline.json \
-exec

Real-Time Table

And the real-time table is defined below:

config/table-realtime.json
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1"
},
"task": {
"taskTypeConfigsMap": {
"RealtimeToOfflineSegmentsTask": {
"bufferTimePeriod": "1m",
"bucketTimePeriod": "5m",
"roundBucketTimePeriod": "1m",
"schedule": "0 * * * * ?",
"mergeType": "rollup",
"count.aggregationType": "max",
"maxNumRecordsPerSegment": "100000"
}
}
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-rt:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "10000",
"realtime.segment.flush.threshold.time": "1h",
"realtime.segment.flush.threshold.segment.size": "5M"
}
},
"tenants": {},
"metadata": {}
}
caution

The realtime.segment.flush.threshold.rows config is intentionally set to an extremely small value so that the segment will be committed after 10,000 records have been ingested. In a production system this value should be set much higher, as described in the configuring segment threshold guide.

The main thing that we're interested in is the RealtimeToOfflineSegmentsTask, which is extracted below:

"task": {
"taskTypeConfigsMap": {
"RealtimeToOfflineSegmentsTask": {
"bufferTimePeriod": "1m",
"bucketTimePeriod": "5m",
"schedule": "0 * * * * ?"
}
}
}

This configuration enables the job and defines what should happen when it runs.

The schedule parameter indicates when this task will be run. The value is a Quartz Cron expression and in this case we have the job running once a minute.

warning

The values for bufferTimePeriod and bucketTimePeriod, and schedule are intentionally set to very low values so that it's easier to see how they work.

In a production setup, bufferTimePeriod and bucketTimePeriod would usually be set to a time of 1 day or more, and schedule could be set to run once a day.

tip

For a breakdown of all the parameters that can be defined for the RealtimeToOfflineSegmentsTask, see the Manually scheduling real-time to offline job guide.

You can create the table by running the following command:

docker exec -it pinot-controller-rt bin/pinot-admin.sh AddTable   \
-tableConfigFile /config/table-realtime.json \
-exec

Ingesting Data

Let's ingest data into the events Kafka topic, by running the following:

while true; do
ts=`date +%s%N | cut -b1-13`;
uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
count=$[ $RANDOM % 1000 + 0 ]
echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done |
docker exec -i kafka-rt /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events

Data will make its way into the real-time table. We can see how many records have been ingested by running the following query:

SELECT count(*)
FROM events

Checking that the job is running

We can check that the job is running by querying the logs on the Pinot Controller, as shown below:

docker exec -it pinot-controller-rt grep -ri --color "\[RealtimeToOff" logs/pinot-all.log

We should see the following output:

Output
2022/03/08 13:21:00.009 INFO [RealtimeToOfflineSegmentsTaskGenerator] [DefaultQuartzScheduler_Worker-8] Start generating task configs for table: events_REALTIME for task: RealtimeToOfflineSegmentsTask
2022/03/08 13:21:00.099 INFO [RealtimeToOfflineSegmentsTaskGenerator] [DefaultQuartzScheduler_Worker-8] Finished generating task configs for table: events_REALTIME for task: RealtimeToOfflineSegmentsTask

When the job can't create any new segments for the offline table it will instead output the following:

Output
2022/03/08 13:24:00.008 INFO [RealtimeToOfflineSegmentsTaskGenerator] [DefaultQuartzScheduler_Worker-1] Start generating task configs for table: events_REALTIME for task: RealtimeToOfflineSegmentsTask
2022/03/08 13:24:00.076 INFO [RealtimeToOfflineSegmentsTaskGenerator] [DefaultQuartzScheduler_Worker-1] Window with start: 1646745600000 and end: 1646745900000 is not older than buffer time: 60000 configured as 1m ago. Skipping task generation: RealtimeToOfflineSegmentsTask

or

Output
2022/03/08 13:30:00.201 INFO [RealtimeToOfflineSegmentsTaskGenerator] [DefaultQuartzScheduler_Worker-1] Start generating task configs for table: events_REALTIME for task: RealtimeToOfflineSegmentsTask
2022/03/08 13:30:00.212 INFO [RealtimeToOfflineSegmentsTaskGenerator] [DefaultQuartzScheduler_Worker-1] No realtime-completed segments found for table: events_REALTIME, skipping task generation: RealtimeToOfflineSegmentsTask

If you see these messages often, it might make sense to reduce the frequency of the RT2OFF job.