Partial Upserts in Pinot
In this recipe we'll learn how to perform partial upserts on a real-time Pinot table.
To get a better understanding of upserts and how they work, you can visit the previous recipe on Full Upserts.
Pinot Version | 0.9.3 |
Code | startreedata/pinot-recipes/partial-upserts |
Pre-requisites
You will need to install Docker locally to follow the code examples in this guide.
Download Recipe
First, clone the GitHub repository to your local machine and navigate to this recipe:
git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/upserts-partial
If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml file on GitHub.
Create meetup_rsvp
Kafka topic
This recipe explores capturing RSVPs for meetup events. A meetup can be hosted by multiple groups, at multiple venues.
Let's create the meetup_rsvp
topic in Kafka to record the RSVPs.
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic meetup_rsvp
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "meetup_rsvp",
"primaryKeyColumns": [
"event_id"
],
"dimensionFieldSpecs": [
{
"name": "event_id",
"dataType": "INT"
},
{
"name": "venue_name",
"dataType": "STRING",
"singleValueField": false
},
{
"dataType": "STRING",
"name": "group_name",
"singleValueField": false
}
],
"metricFieldSpecs": [
{
"name": "rsvp_count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "mtime",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Note that, the event_id
column is appointed as the primary key, which is mandatory for upserts in Pinot.
"primaryKeyColumns": [
"event_id"
]
We'll also have the following table config:
{
"tableName": "meetup_rsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetup_rsvp",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "meetup_rsvp",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
"stream.kafka.broker.list": "kafka:9093",
"realtime.segment.flush.threshold.size": 30,
"realtime.segment.flush.threshold.rows": 30
},
"nullHandlingEnabled": true
},
"metadata": {
"customConfigs": {}
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies": {
"rsvp_count": "INCREMENT",
"group_name": "UNION",
"venue_name": "APPEND"
}
}
}
In this table configuration, we only enable upserts on three columns: rsvp_count
, group_name
, and venue_name
. Hence, the mode
is set to PARTIAL
.
"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies": {
"rsvp_count": "INCREMENT",
"group_name": "UNION",
"venue_name": "APPEND"
}
}
When using the APPEND
strategy, you must make sure that the corresponding column can accept multiple values, by specifying the following config:
"singleValueField": false
When RSVP events are ingested with unique event_id
values, rsvp_count
will be incremented.
The name of the group will added to the group_name
column, if not exists.
Also, the venue will be appended to the venue_name
.
You can create the table and schema by running the following command:`
docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \
-tableConfigFile /config/meetup_rsvp_table.json \
-schemaFile /config/meetup_rsvp_schema.json -exec
Produce some RSVP events
We will simulate a few RSVPs by publishing the following events to the meetup_rsvp
Kafka topic.
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server kafka:9092 --topic meetup_rsvp
{"event_id":3,"venue_name":"Indonesia","group_name":"C","rsvp_count":1,"mtime":"1635140709"}
{"event_id":3,"venue_name":"China","group_name":"C","rsvp_count":1,"mtime":"1646067689"}
{"event_id":2,"venue_name":"France","group_name":"C","rsvp_count":1,"mtime":"1616646138"}
{"event_id":1,"venue_name":"Myanmar","group_name":"B","rsvp_count":1,"mtime":"1632930567"}
{"event_id":1,"venue_name":"Hungary","group_name":"A","rsvp_count":1,"mtime":"1643574332"}
{"event_id":1,"venue_name":"China","group_name":"B","rsvp_count":1,"mtime":"1645779637"}
Querying
Once that's completed, navigate to localhost:9000/#/query and click on the meetup_rsvp
table or copy/paste the following query:
select event_id, sum(rsvp_count) as total_rsvp
from meetup_rsvp
group by event_id
order by total_rsvp desc
limit 10