Skip to main content

Partial Upserts in Pinot

In this recipe we'll learn how to perform partial upserts on a real-time Pinot table.

To get a better understanding of upserts and how they work, you can visit the previous recipe on Full Upserts.

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/upserts-partial

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml file on GitHub.

Create meetup_rsvp Kafka topic

This recipe explores capturing RSVPs for meetup events. A meetup can be hosted by multiple groups, at multiple venues.

Let's create the meetup_rsvp topic in Kafka to record the RSVPs.

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic meetup_rsvp

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

config/meetup_rsvp_schema.json
{
"schemaName": "meetup_rsvp",
"primaryKeyColumns": [
"event_id"
],
"dimensionFieldSpecs": [
{
"name": "event_id",
"dataType": "INT"
},
{
"name": "venue_name",
"dataType": "STRING",
"singleValueField": false
},
{
"dataType": "STRING",
"name": "group_name",
"singleValueField": false
}
],
"metricFieldSpecs": [
{
"name": "rsvp_count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "mtime",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}

Note that, the event_id column is appointed as the primary key, which is mandatory for upserts in Pinot.

"primaryKeyColumns": [
"event_id"
]

We'll also have the following table config:

config/orders_table.json
{
"tableName": "meetup_rsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetup_rsvp",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "meetup_rsvp",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
"stream.kafka.broker.list": "kafka:9093",
"realtime.segment.flush.threshold.size": 30,
"realtime.segment.flush.threshold.rows": 30
},
"nullHandlingEnabled": true
},
"metadata": {
"customConfigs": {}
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies": {
"rsvp_count": "INCREMENT",
"group_name": "UNION",
"venue_name": "APPEND"
}
}
}

In this table configuration, we only enable upserts on three columns: rsvp_count, group_name, and venue_name. Hence, the mode is set to PARTIAL.

"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies": {
"rsvp_count": "INCREMENT",
"group_name": "UNION",
"venue_name": "APPEND"
}
}
tip

When using the APPEND strategy, you must make sure that the corresponding column can accept multiple values, by specifying the following config:

"singleValueField": false

When RSVP events are ingested with unique event_id values, rsvp_count will be incremented. The name of the group will added to the group_name column, if not exists. Also, the venue will be appended to the venue_name.

You can create the table and schema by running the following command:`

docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \
-tableConfigFile /config/meetup_rsvp_table.json \
-schemaFile /config/meetup_rsvp_schema.json -exec

Produce some RSVP events

We will simulate a few RSVPs by publishing the following events to the meetup_rsvp Kafka topic.

docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server kafka:9092 --topic meetup_rsvp

{"event_id":3,"venue_name":"Indonesia","group_name":"C","rsvp_count":1,"mtime":"1635140709"}
{"event_id":3,"venue_name":"China","group_name":"C","rsvp_count":1,"mtime":"1646067689"}
{"event_id":2,"venue_name":"France","group_name":"C","rsvp_count":1,"mtime":"1616646138"}
{"event_id":1,"venue_name":"Myanmar","group_name":"B","rsvp_count":1,"mtime":"1632930567"}
{"event_id":1,"venue_name":"Hungary","group_name":"A","rsvp_count":1,"mtime":"1643574332"}
{"event_id":1,"venue_name":"China","group_name":"B","rsvp_count":1,"mtime":"1645779637"}

Querying

Once that's completed, navigate to localhost:9000/#/query and click on the meetup_rsvp table or copy/paste the following query:

select event_id, sum(rsvp_count) as total_rsvp
from meetup_rsvp
group by event_id
order by total_rsvp desc
limit 10