Full Upserts in Pinot
In this recipe we'll learn how to use upsert functionality.
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/upserts-full |
Upsert is a term used to describe inserting a record into a database if it does not already exist or update it if it does exist.
Practically, events arriving in a stream can have updated attributes at a later point in time. For example, an e-commerce order can transition between multiple states of a state machine, including CREATED
, PROCESSING
, IN_TRANSIT
, and DELIVERED
. A change data capture (opens in a new tab) (CDC) stream capturing an orders table may emit change events containing different values for the order status.
But, from an analytics perspective, you may only interested in the most up-to-date version and state for each order. For example, consider writing a SQL query to retrieve orders that took more than two days for the delivery. To enable that, we need to merge all change events belonging to a particular order to its latest value.
Apache Pinot supports that by enabling upserts on a real-time table.
Understanding upserts in Pinot
Pinot, by default, allows querying all events ingested from a Kafka topic by a particular primary key (a dimension). Revisiting our e-commerce example above, that kind of a query will return all the state changes for all orders. In some cases, we need to get back the most up-to-date version and state for each order.
Pinot is an immutable datastore, which means that there is no genuine concept of upsert as you stream data into it from Kafka. For the upsert implementation, it’s essential to understand that an individual record is not updated via a write; instead, updates are appended to a log and a pointer maintains the most recent version of a record.
Pinot upserts work in two modes:
- Full upserts
- Partial upserts
We will focus on full upserts in this recipe.
To learn how to use full upserts on a real-time Pinot table, watch the following video, or complete the tutorial below, starting with Prerequites.
Pinot Version | 0.9.3 |
Code | startreedata/pinot-recipes/full-upserts |
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/upserts-full
Start Pinot and learn
Spin up a Pinot cluster using Docker Compose:
docker compose \
-f ../pinot-compose.yml \
-f ../kafka-compose.yml up -d
Add table and schema:
docker cp config/orders_schema.json pinot-controller:/opt/pinot/
docker cp config/orders_table.json pinot-controller:/opt/pinot/
docker exec pinot-controller ./bin/pinot-admin.sh AddTable \
-schemaFile /opt/pinot/orders_schema.json \
-tableConfigFile /opt/pinot/orders_table.json \
-exec
Open a tab to import messages into Kafka:
docker exec -it kafka kafka-console-producer.sh --bootstrap-server localhost:9092 --topic orders
Paste the following:
{"order_id":1,"customer_id":104,"order_status":"IN_TRANSIT","amount":29.35,"ts":"1632467063"}
{"order_id":2,"customer_id":105,"order_status":"COMPLETED","amount":3.24,"ts":"1618931459"}
{"order_id":3,"customer_id":103,"order_status":"OPEN","amount":9.77,"ts":"1626484196"}
{"order_id":4,"customer_id":104,"order_status":"COMPLETED","amount":90.35,"ts":"1623066325"}
{"order_id":5,"customer_id":105,"order_status":"OPEN","amount":55.52,"ts":"1635543905"}
Query Pinot:
select *
from orders
limit 10
Go back to the Kafka tab and paste the following:
{"order_id":5,"customer_id":105,"order_status":"CANCELLED","amount":55.52,"ts":"1635543948"}
Run the following query in Pinot to see that the order_status
is for order_id
5 is now set to CANCELLED
.
select *
from orders
where order_id=5