⌂
Partial Upserts in Pinot

Partial Upserts in Pinot

To learn how to perform partial upserts on a real-time table, watch the following video, or complete the tutorial below, starting with Prerequites.

To get a better understanding of upserts and how they work, see the Full Upserts documentation.

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/upserts-partial

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Create meetup_rsvp Kafka topic

This recipe explores capturing RSVPs for meetup events. A meetup can be hosted by multiple groups, at multiple venues.

Let's create the meetup_rsvp topic in Kafka to record the RSVPs.

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic meetup_rsvp

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
    "schemaName": "meetup_rsvp",
    "primaryKeyColumns": [
        "event_id"
    ],
    "dimensionFieldSpecs": [
        {
            "name": "event_id",
            "dataType": "INT"
        },
        {
            "name": "venue_name",
            "dataType": "STRING",
            "singleValueField": false
        },
        {
          "dataType": "STRING",
          "name": "group_name",
          "singleValueField": false
        }
    ],
    "metricFieldSpecs": [
        {
            "name": "rsvp_count",
            "dataType": "INT"
        }
    ],
    "dateTimeFieldSpecs": [
        {
            "name": "mtime",
            "dataType": "LONG",
            "format": "1:MILLISECONDS:EPOCH",
            "granularity": "1:MILLISECONDS"
          }
    ]
}

config/meetup_rsvp_schema.json Note that, the event_id column is appointed as the primary key, which is mandatory for upserts in Pinot.

"primaryKeyColumns": [
        "event_id"
]

We'll also have the following table config:

{
    "tableName": "meetup_rsvp",
    "tableType": "REALTIME",
    "segmentsConfig": {
        "timeColumnName": "mtime",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "1",
        "segmentPushType": "APPEND",
        "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
        "schemaName": "meetup_rsvp",
        "replicasPerPartition": "1"
    },
    "tenants": {},
    "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowLevel",
            "stream.kafka.topic.name": "meetup_rsvp",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
            "stream.kafka.broker.list": "kafka:9093",
            "realtime.segment.flush.threshold.size": 30,
            "realtime.segment.flush.threshold.rows": 30
        },
        "nullHandlingEnabled": true
    },
    "metadata": {
        "customConfigs": {}
    },
    "routing": {
        "instanceSelectorType": "strictReplicaGroup"
    },
    "upsertConfig": {
        "mode": "PARTIAL",
        "partialUpsertStrategies": {
            "rsvp_count": "INCREMENT",
            "group_name": "UNION",
            "venue_name": "APPEND"
        }
    }
}

config/orders_table.json

In this table configuration, we only enable upserts on three columns: rsvp_count, group_name, and venue_name. Hence, the mode is set to PARTIAL.

"upsertConfig": {
      "mode": "PARTIAL",
      "partialUpsertStrategies": {
          "rsvp_count": "INCREMENT",
          "group_name": "UNION",
          "venue_name": "APPEND"
      }
  }
💡

When using the APPEND strategy, you must make sure that the corresponding column can accept multiple values, by specifying the following config:

"singleValueField": false

When RSVP events are ingested with unique event_id values, rsvp_count will be incremented. The name of the group will added to the group_name column, if not exists. Also, the venue will be appended to the venue_name.

You can create the table and schema by running the following command:`

docker run \
   --network upserts \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
  -tableConfigFile /config/meetup_rsvp_table.json \
  -controllerHost "pinot-controller" \
  -schemaFile /config/meetup_rsvp_schema.json \
  -exec

Produce some RSVP events

We will simulate a few RSVPs by publishing the following events to the meetup_rsvp Kafka topic.

echo - '
{"event_id":3,"venue_name":"Indonesia","group_name":"C","rsvp_count":1,"mtime":"1635140709"}
{"event_id":3,"venue_name":"China","group_name":"C","rsvp_count":1,"mtime":"1646067689"}
{"event_id":2,"venue_name":"France","group_name":"C","rsvp_count":1,"mtime":"1616646138"}
{"event_id":1,"venue_name":"Myanmar","group_name":"B","rsvp_count":1,"mtime":"1632930567"}
{"event_id":1,"venue_name":"Hungary","group_name":"A","rsvp_count":1,"mtime":"1643574332"}
{"event_id":1,"venue_name":"China","group_name":"B","rsvp_count":1,"mtime":"1645779637"}
' | kcat -P -b localhost:9092 -t meetup_rsvp

Querying

Once that's completed, navigate to localhost:9000/#/query (opens in a new tab) and click on the meetup_rsvp table or copy/paste the following query:

select event_id, sum(rsvp_count) as total_rsvp
from meetup_rsvp 
group by event_id
order by total_rsvp desc
limit 10