Skip to main content

Full Upserts in Pinot

In this recipe we'll learn how to use full upserts on a real-time Pinot table.

note

The need for upserts

Upsert is a term used to describe inserting a record into a database if it does not already exist or update it if it does exist.

Practically, events arriving in a stream can have updated attributes at a later point in time. For example, an e-commerce order can transition between multiple states of a state machine, including CREATED, PROCESSING, IN_TRANSIT, and DELIVERED. A CDC stream capturing an orders table may emit change events containing different values for the order status.

But, from an analytics perspective, you may only interested in the most up-to-date version and state for each order. For example, consider writing a SQL query to retrieve orders that took more than two days for the delivery. To enable that, we need to merge all change events belonging to a particular order to its latest value.

Apache Pinot supports that by enabling upserts on a real-time table. Let's find out how.

Understanding upserts in Pinot

Upsert feature was introduced to Pinot in the 0.6.0 release, allowing to "upsert" events while ingesting from a stream.

Pinot, by default, allows querying all events ingested from a Kafka topic by a particular primary key (a dimension). Revisiting our e-commerce example above, that kind of a query will return all the state changes for all orders. In some cases, we need to get back the most up-to-date version and state for each order.

Pinot is an immutable datastore, which means that there is no genuine concept of upsert as you stream data into it from Kafka. For the upsert implementation, it’s essential to understand that an individual record is not updated via a write; instead, updates are appended to a log and a pointer maintains the most recent version of a record.

Pinot upserts work in two modes:

  1. Full upserts
  2. Partial upserts

We will focus on full upserts in this recipe.

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/upserts-full

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml file on GitHub.

The use case

This recipe explores capturing change events from an e-commerce orders table. For simplicity, we will simulate change events as if they were coming from a CDC system like Debezium.

Change events to the orders, including new orders and updates are streamed to Kafka. Pinot ingests the change events into a real-time table called 'orders'. When an order is updated, say the status has been changed from OPEN to CANCELLED, that change will immediately reflected in Pinot, allowing us to query the orders table consistently.

Solution overview

First, create the orders topic in Kafka.

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic orders

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

config/orders_schema.json
{
"schemaName": "orders",
"primaryKeyColumns": [
"order_id"
],
"dimensionFieldSpecs": [{
"name": "order_id",
"dataType": "INT"
},
{
"name": "customer_id",
"dataType": "INT"
},
{
"name": "order_status",
"dataType": "STRING"
}
],
"metricFieldSpecs": [{
"name": "amount",
"dataType": "FLOAT"
}],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

Note that, the order_id column is appointed as the primary key, which is mandatory for upserts in Pinot.

"primaryKeyColumns": [
"order_id"
]

We'll also have the following table config:

config/orders_table.json
{
"tableName": "orders",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "orders",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "orders",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
"stream.kafka.broker.list": "kafka:9093",
"realtime.segment.flush.threshold.size": 30,
"realtime.segment.flush.threshold.rows": 30
}
},
"metadata": {
"customConfigs": {}
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "FULL"
}
}

In this table configuration, we enable FULL upserts using following configuration block. When two records with the same primary key exists in Pinot, the FULL upsert overwrites all columns of the older record, which has an older timestamp.

"upsertConfig": {
"mode": "FULL"
}

Conversely, PARTIAL upsert allows you to pick the columns need to be overwritten. We explore that in the partial upserts recipe.

Upsert requires all segments of the same partition must be served from the same server to ensure data consistency across the segments. It therefore must use strictReplicaGroup as the routing strategy, as shown below:

"routing": {
"instanceSelectorType": "strictReplicaGroup"
}

You can create the table and schema by running the following command:

docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable   \
-tableConfigFile /config/orders_table.json \
-schemaFile /config/orders_schema.json \
-exec

You should see a message similar to the following if everything is working correctly:

2022/02/28 10:08:40.333 INFO [AddTableCommand] [main] Executing command: AddTable -tableConfigFile /config/orders_table.json -schemaFile /config/orders_schema.json -controllerProtocol http -controllerHost 172.31.0.3 -controllerPort 9000 -user null -password [hidden] -exec
2022/02/28 10:08:40.747 INFO [AddTableCommand] [main] {"status":"Table orders_REALTIME succesfully added"}

Produce some order events

We will simulate a few new orders by publishing the following events to the orders Kafka topic. Run the following command:

docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server kafka:9092 --topic orders

{"order_id":1,"customer_id":104,"order_status":"IN_TRANSIT","amount":29.35,"ts":"1632467063"}
{"order_id":2,"customer_id":105,"order_status":"COMPLETED","amount":3.24,"ts":"1618931459"}
{"order_id":3,"customer_id":103,"order_status":"OPEN","amount":9.77,"ts":"1626484196"}
{"order_id":4,"customer_id":104,"order_status":"COMPLETED","amount":90.35,"ts":"1623066325"}
{"order_id":5,"customer_id":105,"order_status":"OPEN","amount":55.52,"ts":"1635543905"}

Querying

Once that's completed, navigate to localhost:9000/#/query and click on the orders table or copy/paste the following query:

select * 
from orders
limit 10

You will see an output containing five orders.

Next, go back to the Kafka producers and publish the following event, which mimics a cancellation of the order with ID 5.

docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server kafka:9092 --topic orders

{"order_id":5,"customer_id":105,"order_status":"CANCELLED","amount":55.52,"ts":"1635543948"}

Run the following query to see the order_status is set to the CANCELLED state.

select * 
from orders
where order_id=5