Upserts

In real-time OLAP databases, we typically use append only data structures for fast data ingestion. But how do we deal with updating records to say, capture the latest status of an order, or the most recent location of a delivery vehicle? Pinot handles this for real-time data ingestion with its upsert functionality.

💡

If you want to learn how to configure upserts, see the full upserts or partial upserts developer guides.

So how does it work?

Events are still ingested into the store regardless of whether they are new records or updates of existing ones, as shown in the diagram below:

All records are still ingestedAll records are still ingested

But Pinot also populates an in-memory dictionary to keep track of the latest docId for each primary key, as shown in the diagram below:

Pinot's in-memory upserts dictionaryPinot's in-memory upserts dictionary

That dictionary is then used to populate a segment's validDocIds, which is used when querying the data.

Something to keep in mind is that upserts work on an individual partition basis only, so you need to make sure that the partitions in your streaming data platform are keyed by the primary key.

Partition data by primary keyPartition data by primary key

Otherwise, upserts won't work and you'll see duplicate data.

Upsert capabilities

Delete

By default, upsert lets you update records but not delete them. To enable the delete functionality, set the "deleteRecordColumn" in the upsert configuration.

The column name you set in the "deleteRecordColumn" can be used to mark the records for deletion by setting the value to true.

For example:

{ 
    "upsertConfig": {  
        ... 
        "deleteRecordColumn": <delete_column_name>
    } 
}
// In the schema
{
    ...
    {
      "name": "<delete_column_name>",
      "dataType": "BOOLEAN"
    },
    ...
}
// In the record
{
    ...,
    "<delete_column_name>": true,
    ...
}

TTL

Upserts also support time to live (TTL). This means metadata for primary keys not updated withing a specific duration (based on the comparison column value) is removed to save memory.

You can enable TTL by setting an appropriate metadataTTL: <value> where the value is in the same unit as the comparisonColumn so for example, if your comparison column is timestamp in milliseconds, the value should also be in milliseconds.

Limitations:

  • Due to the caveat mentioned above regarding units, TTL is not supported on non-numeric comparison columns.
  • Snapshots should be enabled for TTL.
  • Upserts with delete is not supported with TTL.
  • Multiple comparison columns are not supported with TTL.

Snapshot

Upsert metadata is not persisted across restarts. This means Pinot has to rebuild the metadata from scratch every time the server restarts, and requires a full scan of the segment, which can be time-consuming if you have a lot of rows.

To avoid this, you can enable the snapshot feature. This persists a file called validDocIdSnapshot that contains which rows are valid (not updated yet) versus which rows are old. Pinot can then use this information to rebuild the metadata much faster by only scanning valid rows.

To enable snapshots, use the "enableSnapshot" : true config in the upsert configuration.

Preload

This is the most recent optimization added to improve the restart times of the servers. Preload extends the snapshot feature to use an important quality of the valid doc id snapshots, so no two segments can have the same primary key marked as valid in the snapshot. This lets the system create the upsert metadata faster by reducing the number of operations involved in comparing the values for each primary key.

You can enable this feature using enablePreload: true in the table configuration.

Compaction

With upserts, we can end up with a lot of segments that contain only a few valid rows, and take up space on the disk.

Compaction lets you merge sparse segments into dense segments with all valid rows, reducing the disk usage immensely andmaking queries faster due to smaller indices that need to be loaded.

To compact segments on upserts, complete the following steps:

  • Ensure task scheduling is enabled and a minion is available.
  • Add the following to your table configuration. These configurations (except schedule) determine which segments to compact.
"task": {
  "taskTypeConfigsMap": {
    "UpsertCompactionTask": {
      "schedule": "0 */5 * ? * *",
      "bufferTimePeriod": "2d",
      "invalidRecordsThresholdPercent": "30",
      "invalidRecordsThresholdCount": "100000"
    }
  }
}

bufferTimePeriod: the time duration for which the segments won't be considered for compaction. For example, 2d means the segments created in the last 2 days won't be compacted. This is done because the recent segments may still be going through a lot of changes and are still not sparse enough.

invalidRecordsThresholdPercent (Optional): Only segments that invalid records / valid records greater than equal to this threshold are considered for compaction. This helps saving resources on unnecessarily running compaction for dense segments.

invalidRecordsThresholdCount (Optional): Same as above but considers the count of invalid records instead of percentage threshold.

Drop out of order records

Upserts in Pinot don't rely on the order of the incoming event to decide the priority. Instead, Pinot relies on the comparisonColumn value where a higher value indicates that the record should have higher priority.

This leads to an interesting issue where sometimes we can receive records with lower comparison values at later point in time. They are simply ignored in upsert metadata and never appear in query results, but are still stored in segments. To avoid this you can set dropOutOfOrderRecord: true in the upsert configuration, so Pinot does not process these records.

Limitations

There are a few limitations of upserts that you should be aware of:

  • Upserts only work on real-time tables. They are not supported on offline tables or hybrid tables.
  • StarTree indices are not supported on upsert tables.
  • The input data in the stream must be partitioned by the primary key. Otherwise, you'll see duplicate data. In case you have multiple primary keys, you can partition by any one of them.
  • You cannot change the partitions of the input stream once you have started ingesting data. If you do, you will see duplicate data. You should start with as many partitions as you expect to have in the future for an upsert tables.

Off-heap upsert

Because on-heap upserts use an in-memory map on each server to store metadata for every unique key in the table, in heap memory usage quickly increase as the number of primary keys increase in the table. Also, persisting the state Another limitation of keeping the upsert state in memory is that the system must recreate the state every time servers restart.

Another limitation of keeping this state in memory is that we have to recreate it every time the servers need a restart.

You can read more about Off-heap upsert in the Manage Data - Offheap Upsert section.