Full Upserts in Pinot
In this recipe we'll learn how to use full upserts on a real-time Pinot table.
Pinot Version | 0.9.3 |
Code | startreedata/pinot-recipes/full-upserts |
The need for upserts
Upsert is a term used to describe inserting a record into a database if it does not already exist or update it if it does exist.
Practically, events arriving in a stream can have updated attributes at a later point in time. For example, an e-commerce order can transition between multiple states of a state machine, including CREATED
, PROCESSING
, IN_TRANSIT
, and DELIVERED
.
A CDC stream capturing an orders table may emit change events containing different values for the order status.
But, from an analytics perspective, you may only interested in the most up-to-date version and state for each order. For example, consider writing a SQL query to retrieve orders that took more than two days for the delivery. To enable that, we need to merge all change events belonging to a particular order to its latest value.
Apache Pinot supports that by enabling upserts on a real-time table. Let's find out how.
Understanding upserts in Pinot
Upsert feature was introduced to Pinot in the 0.6.0 release, allowing to "upsert" events while ingesting from a stream.
Pinot, by default, allows querying all events ingested from a Kafka topic by a particular primary key (a dimension). Revisiting our e-commerce example above, that kind of a query will return all the state changes for all orders. In some cases, we need to get back the most up-to-date version and state for each order.
Pinot is an immutable datastore, which means that there is no genuine concept of upsert as you stream data into it from Kafka. For the upsert implementation, it’s essential to understand that an individual record is not updated via a write; instead, updates are appended to a log and a pointer maintains the most recent version of a record.
Pinot upserts work in two modes:
- Full upserts
- Partial upserts
We will focus on full upserts in this recipe.
Pre-requisites
You will need to install Docker locally to follow the code examples in this guide.
Download Recipe
First, clone the GitHub repository to your local machine and navigate to this recipe:
git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/upserts-full
If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml file on GitHub.
The use case
This recipe explores capturing change events from an e-commerce orders table. For simplicity, we will simulate change events as if they were coming from a CDC system like Debezium.
Change events to the orders, including new orders and updates are streamed to Kafka. Pinot ingests the change events into a real-time table called 'orders'. When an order is updated, say the status has been changed from OPEN to CANCELLED, that change will immediately reflected in Pinot, allowing us to query the orders
table consistently.
Solution overview
First, create the orders
topic in Kafka.
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic orders
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "orders",
"primaryKeyColumns": [
"order_id"
],
"dimensionFieldSpecs": [{
"name": "order_id",
"dataType": "INT"
},
{
"name": "customer_id",
"dataType": "INT"
},
{
"name": "order_status",
"dataType": "STRING"
}
],
"metricFieldSpecs": [{
"name": "amount",
"dataType": "FLOAT"
}],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
Note that, the order_id
column is appointed as the primary key, which is mandatory for upserts in Pinot.
"primaryKeyColumns": [
"order_id"
]
We'll also have the following table config:
{
"tableName": "orders",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "orders",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "orders",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
"stream.kafka.broker.list": "kafka:9093",
"realtime.segment.flush.threshold.size": 30,
"realtime.segment.flush.threshold.rows": 30
}
},
"metadata": {
"customConfigs": {}
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "FULL"
}
}
In this table configuration, we enable FULL
upserts using following configuration block.
When two records with the same primary key exists in Pinot, the FULL
upsert overwrites all columns of the older record, which has an older timestamp.
"upsertConfig": {
"mode": "FULL"
}
Conversely, PARTIAL
upsert allows you to pick the columns need to be overwritten.
We explore that in the partial upserts recipe.
Upsert requires all segments of the same partition must be served from the same server to ensure data consistency across the segments.
It therefore must use strictReplicaGroup
as the routing strategy, as shown below:
"routing": {
"instanceSelectorType": "strictReplicaGroup"
}
You can create the table and schema by running the following command:
docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable \
-tableConfigFile /config/orders_table.json \
-schemaFile /config/orders_schema.json \
-exec
You should see a message similar to the following if everything is working correctly:
2022/02/28 10:08:40.333 INFO [AddTableCommand] [main] Executing command: AddTable -tableConfigFile /config/orders_table.json -schemaFile /config/orders_schema.json -controllerProtocol http -controllerHost 172.31.0.3 -controllerPort 9000 -user null -password [hidden] -exec
2022/02/28 10:08:40.747 INFO [AddTableCommand] [main] {"status":"Table orders_REALTIME succesfully added"}
Produce some order events
We will simulate a few new orders by publishing the following events to the orders
Kafka topic.
Run the following command:
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server kafka:9092 --topic orders
{"order_id":1,"customer_id":104,"order_status":"IN_TRANSIT","amount":29.35,"ts":"1632467063"}
{"order_id":2,"customer_id":105,"order_status":"COMPLETED","amount":3.24,"ts":"1618931459"}
{"order_id":3,"customer_id":103,"order_status":"OPEN","amount":9.77,"ts":"1626484196"}
{"order_id":4,"customer_id":104,"order_status":"COMPLETED","amount":90.35,"ts":"1623066325"}
{"order_id":5,"customer_id":105,"order_status":"OPEN","amount":55.52,"ts":"1635543905"}
Querying
Once that's completed, navigate to localhost:9000/#/query and click on the orders
table or copy/paste the following query:
select *
from orders
limit 10
You will see an output containing five orders.
Next, go back to the Kafka producers and publish the following event, which mimics a cancellation of the order with ID 5.
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server kafka:9092 --topic orders
{"order_id":5,"customer_id":105,"order_status":"CANCELLED","amount":55.52,"ts":"1635543948"}
Run the following query to see the order_status
is set to the CANCELLED
state.
select *
from orders
where order_id=5