Partial Upserts in Pinot
To learn how to perform partial upserts on a real-time table, watch the following video, or complete the tutorial below, starting with Prerequites.
To get a better understanding of upserts and how they work, see the Full Upserts documentation.
Pinot Version | 0.9.3 |
Code | startreedata/pinot-recipes/partial-upserts |
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/upserts-partial
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Create meetup_rsvp
Kafka topic
This recipe explores capturing RSVPs for meetup events. A meetup can be hosted by multiple groups, at multiple venues.
Let's create the meetup_rsvp
topic in Kafka to record the RSVPs.
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic meetup_rsvp
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "meetup_rsvp",
"primaryKeyColumns": [
"event_id"
],
"dimensionFieldSpecs": [
{
"name": "event_id",
"dataType": "INT"
},
{
"name": "venue_name",
"dataType": "STRING",
"singleValueField": false
},
{
"dataType": "STRING",
"name": "group_name",
"singleValueField": false
}
],
"metricFieldSpecs": [
{
"name": "rsvp_count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "mtime",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
config/meetup_rsvp_schema.json
Note that, the event_id
column is appointed as the primary key, which is mandatory for upserts in Pinot.
"primaryKeyColumns": [
"event_id"
]
We'll also have the following table config:
{
"tableName": "meetup_rsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetup_rsvp",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "meetup_rsvp",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
"stream.kafka.broker.list": "kafka:9093",
"realtime.segment.flush.threshold.size": 30,
"realtime.segment.flush.threshold.rows": 30
},
"nullHandlingEnabled": true
},
"metadata": {
"customConfigs": {}
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies": {
"rsvp_count": "INCREMENT",
"group_name": "UNION",
"venue_name": "APPEND"
}
}
}
config/orders_table.json
In this table configuration, we only enable upserts on three columns: rsvp_count
, group_name
, and venue_name
. Hence, the mode
is set to PARTIAL
.
"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies": {
"rsvp_count": "INCREMENT",
"group_name": "UNION",
"venue_name": "APPEND"
}
}
When using the APPEND
strategy, you must make sure that the corresponding column can accept multiple values, by specifying the following config:
"singleValueField": false
When RSVP events are ingested with unique event_id
values, rsvp_count
will be incremented.
The name of the group will added to the group_name
column, if not exists.
Also, the venue will be appended to the venue_name
.
You can create the table and schema by running the following command:`
docker run \
--network upserts \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-tableConfigFile /config/meetup_rsvp_table.json \
-controllerHost "pinot-controller" \
-schemaFile /config/meetup_rsvp_schema.json \
-exec
Produce some RSVP events
We will simulate a few RSVPs by publishing the following events to the meetup_rsvp
Kafka topic.
echo - '
{"event_id":3,"venue_name":"Indonesia","group_name":"C","rsvp_count":1,"mtime":"1635140709"}
{"event_id":3,"venue_name":"China","group_name":"C","rsvp_count":1,"mtime":"1646067689"}
{"event_id":2,"venue_name":"France","group_name":"C","rsvp_count":1,"mtime":"1616646138"}
{"event_id":1,"venue_name":"Myanmar","group_name":"B","rsvp_count":1,"mtime":"1632930567"}
{"event_id":1,"venue_name":"Hungary","group_name":"A","rsvp_count":1,"mtime":"1643574332"}
{"event_id":1,"venue_name":"China","group_name":"B","rsvp_count":1,"mtime":"1645779637"}
' | kcat -P -b localhost:9092 -t meetup_rsvp
Querying
Once that's completed, navigate to localhost:9000/#/query (opens in a new tab) and click on the meetup_rsvp
table or copy/paste the following query:
select event_id, sum(rsvp_count) as total_rsvp
from meetup_rsvp
group by event_id
order by total_rsvp desc
limit 10