Skip to main content

Upserts and the real-time to offline job

In this guide we'll learn how to use the combination of Pinot's upsert and real-time to offline job features.

Let's quickly recap these features:

  • Upsert is usually used when we are capturing events that contain a state transition. This could be the location of a driver or the status of an order. When querying these events, we want to return the latest state, grouped by a primary key.

  • The real-time to offline job is used to move segments from a real-time table to an offline table.

danger

The main thing to keep in mind when combining these features is that upsert functionality only applies to real-time tables.

As soon as those segments are moved to an offline table, the upsert logic is no longer applied at query time. We will need to backfill the offline segments created by the real-time to offline job to achieve upsert-like queries.

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/upserts-real-time-offline-job

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pinot Minion, Zookeeper, and Kafka. You can find the docker-compose.yml file on GitHub.

Pinot Schema and Tables

Now let's create a Pinot Schema, as well as real-time and offline tables. Pinot is going to take care of populating data into the offline table, but it still expects us to configure the table.

Schema

Our schema is going to capture some order events, and looks like this:

config/orders_schema.json
{
"schemaName": "orders",
"primaryKeyColumns": [
"order_id"
],
"dimensionFieldSpecs": [{
"name": "order_id",
"dataType": "INT"
},
{
"name": "customer_id",
"dataType": "INT"
},
{
"name": "order_status",
"dataType": "STRING"
}
],
"metricFieldSpecs": [{
"name": "amount",
"dataType": "FLOAT"
}],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

You can create the schema by running the following command:

docker exec -it pinot-controller bin/pinot-admin.sh AddSchema   \
-schemaFile /config/orders_schema.json \
-exec

Offline Table

The offline table config is defined below:

config/orders_offline_table.json
{
"tableName": "orders",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "orders",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
}
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"tenants": {},
"metadata": {}
}

You can create the table by running the following command:

docker exec -it pinot-controller bin/pinot-admin.sh AddTable   \
-tableConfigFile /config/orders_offline_table.json \
-exec

Real-Time Table

And the real-time table is defined below:

config/orders_table.json
{
"tableName": "orders",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "orders",
"replicasPerPartition": "1"
},
"task": {
"taskTypeConfigsMap": {
"RealtimeToOfflineSegmentsTask": {
"bucketTimePeriod": "2h",
"bufferTimePeriod": "1m"
}
}
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "orders",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
"stream.kafka.broker.list": "kafka:9093",
"realtime.segment.flush.threshold.rows": 5
}
},
"metadata": {
"customConfigs": {}
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "FULL"
}
}
caution

The realtime.segment.flush.threshold.rows config is intentionally set to an extremely small value so that the segment will be committed after 10,000 records have been ingested. In a production system this value should be set much higher, as described in the configuring segment threshold guide.

Create the table by running the following command:

curl -X POST http://localhost:9000/tables?validationTypesToSkip=All \
--data @config/orders_table.json

You must pass in validationTypesToSkip=All when calling this API endpoint.

Ingesting Data

Let's ingest data into the events Kafka topic, by running the following:

docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server kafka:9092 --topic orders

{"order_id":1,"customer_id":104,"order_status":"OPEN","amount":29.35,"ts":"1632463351000"}
{"order_id":1,"customer_id":104,"order_status":"IN_TRANSIT","amount":29.35,"ts":"1632463361000"}
{"order_id":1,"customer_id":104,"order_status":"COMPLETED","amount":29.35,"ts":"1632463391000"}
{"order_id":2,"customer_id":105,"order_status":"COMPLETED","amount":3.24,"ts":"1632467065000"}
{"order_id":3,"customer_id":103,"order_status":"OPEN","amount":9.77,"ts":"1632467066000"}
{"order_id":4,"customer_id":104,"order_status":"OPEN","amount":55.52,"ts":"1632467068000"}
{"order_id":4,"customer_id":104,"order_status":"CANCELLED","amount":55.52,"ts":"1632467070000"}
{"order_id":5,"customer_id":105,"order_status":"OPEN","amount":12.22,"ts":"1632667070000"}
{"order_id":5,"customer_id":105,"order_status":"IN_TRANSIT","amount":12.22,"ts":"1632667170000"}
{"order_id":5,"customer_id":105,"order_status":"COMPLETED","amount":12.22,"ts":"1632677270000"}
{"order_id":6,"customer_id":106,"order_status":"OPEN","amount":13.94,"ts":"1632677270400"}
{"order_id":7,"customer_id":107,"order_status":"OPEN","amount":20.32,"ts":"1632677270403"}
{"order_id":8,"customer_id":108,"order_status":"OPEN","amount":45.11,"ts":"1632677270508"}
{"order_id":9,"customer_id":109,"order_status":"OPEN","amount":129.22,"ts":"1632677270699"}

Data will make its way into the real-time table. Let's query the orders table:

select * 
from orders
order by order_id
limit 10
amountcustomer_idorder_idorder_statusts
29.351041COMPLETED1632463391000
3.241052COMPLETED1632467065000
9.771033OPEN1632467066000
55.521044CANCELLED1632467070000
12.221055COMPLETED1632677270000
13.941066OPEN1632677270400
20.321077OPEN1632677270403
45.111088OPEN1632677270508
129.221099OPEN1632677270699

Query Results

As expected, we have the latest event for each order.

Scheduling the RT2OFF Job

The Real-Time to Offline Job can be scheduled automatically via the real-time table config or manually via the REST API. We can trigger it manually by running the following command:

table="orders_REALTIME"
curl -X POST \
"http://localhost:9000/tasks/schedule?taskType=RealtimeToOfflineSegmentsTask&tableName=${table}" \
-H "accept: application/json" 2>/dev/null | jq '.'

You can see which records are available in the offline segment by running the following query:

select * 
from orders_OFFLINE
amountcustomer_idorder_idorder_statusts
29.351041OPEN1632463351000
29.351041IN_TRANSIT1632463361000
29.351041COMPLETED1632463391000
3.241052COMPLETED1632467065000
9.771033OPEN1632467066000
55.521044OPEN1632467068000
55.521044CANCELLED1632467070000

Query Results

We have duplicate records for order_id=1 and order_id=4, which we'll need to sort out.

The time boundary that indicates where records should be read from is 1632463470000. This means that records with a timestamp less than or equal to this value will come from the offline table and records with a timestamp greater than this value will come from the real-time table.

You can see which records will be returned from our newly created offline segment by running the following query:

select * 
from orders_OFFLINE
WHERE ts <= 1632463470000
amountcustomer_idorder_idorder_statusts
29.351041OPEN1632463351000
29.351041IN_TRANSIT1632463361000
29.351041COMPLETED1632463391000

Query Results

This means that although we need to fix the records with order_id=1 and order_id=4, we'll only see an impact on queries against the orders table for order_id=1 until the time boundary increases.

You can see which records are returned from the orders table by running the following query:

select * 
from orders
where order_id = 1 or order_id = 4
amountcustomer_idorder_idorder_statusts
29.351041OPEN1632463351000
29.351041IN_TRANSIT1632463361000
29.351041COMPLETED1632463391000
55.521044CANCELLED1632467070000

Query Results

Replacing offline segment

Let's now backfill the records in the offline segment with the documents in data/orders.json that only contain the most recent event for each order:

data/orders.json
{"order_id":1,"customer_id":104,"order_status":"COMPLETED","amount":29.35,"ts":"1632463391000"}
{"order_id":2,"customer_id":105,"order_status":"COMPLETED","amount":3.24,"ts":"1632467065000"}
{"order_id":3,"customer_id":103,"order_status":"OPEN","amount":9.77,"ts":"1632467066000"}
{"order_id":4,"customer_id":104,"order_status":"CANCELLED","amount":55.52,"ts":"1632467070000"}

We'll ingest this file using the following ingestion spec:

config/job-spec.yml
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentNameGeneratorSpec:
type: fixed
configs:
segment.name: ${segmentName}
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/orders.json'
outputDirURI: '/opt/pinot/data/orders/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'json'
className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
tableSpec:
tableName: 'orders'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'

Now let's run the job to replace the offline segment, which has the name orders_1632463351000_1632467070000_0:

docker exec -it pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml \
-values segmentName='orders_1632463351000_1632467070000_0'

Once we've done this we can go back and query the orders table:

select * 
from orders
where order_id = 1 or order_id = 4
amountcustomer_idorder_idorder_statusts
29.351041COMPLETED1632463391000
55.521044CANCELLED1632467070000

Query Results

We can also confirm that order_id=4 has been fixed by querying the offline table:

select * 
from orders_OFFLINE
amountcustomer_idorder_idorder_statusts
29.351041COMPLETED1632463391000
3.241052COMPLETED1632467065000
9.771033OPEN1632467066000
55.521044CANCELLED1632467070000

Query Results