Off-heap upsert

Off-Heap upserts

Pinot provides native support of upserts (opens in a new tab) during real-time ingestion, for cases where records need modifications or deletion.

Under the hood, Pinot servers need to keep track of some upsert metadata in order to tell for a newly ingested record whether its primary key has appeared before and which segment holds it right now, so that the server can mask the old record and make the new one visible to the queries. The upsert metadata contains all the primary keys in the table. As the number of primary keys grows, the metadata gets large too.

The open source implementation of upserts keeps the metadata on-heap. The on-heap upsert uses an in-memory map that's kept in JVM heap to store the metadata, the heap usage quickly increases as the number of primary keys increases in the table. When it takes over a lot of heap space, it starts to affect the performance of query execution and data ingestion. Besides, as the metadata is only kept in memory, it's gone when the server restarts. The server has to restore the metadata for all existing primary keys when starting up, in order to resume the real-time ingestion correctly. And this step can take a long time for large tables, as the server has to scan a lot of existing docs and check their primary keys to bootstrap the upsert metadata.

So in StarTree, we have added off-heap upserts. With off-heap upserts, the upsert metadata is managed outside the JVM heap space and backed by disk space. So it can scale easily on each server while using a fraction of the memory of on-heap upserts, and perform upserts on large datasets at a fraction of the cost. Besides, we have added support to prebuild upsert metadata so that servers can import them directly to restore the upsert metadata, reducing the time when to load large upsert tables.

Although we use disk backed storage system to keep upsert metadata for off-heap upserts, the recent reads are mostly handled in memory with cache on top of the disk storage; and the latest writes are appended to the memory buffers to be flushed to disk later in big batches. Those help minimize the slowdown on the data ingestion. The query path remains unchanged because it continues to access the same set of bitmaps as used in on-heap upserts when to identify the visible records, so you should still get similar p99 latencies. In fact, latencies may improve with reduced pressure on the JVM heap space.

Usage

The off-heap upsert is enabled by default in new StarTree releases, and both FULL and PARTIAL upserts are supported.

We have used RocksDB to implement the off-heap upsert. Different storage systems can be supported if needed. When using the off-heap upsert, the server creates one RocksDB store to be shared by all the upsert tables for better efficiency.

There is no need to add those configs to enable the off-heap upsert, as it's enabled by default. But one may find the following configs useful when need to customize the feature for certain use cases for more performance or efficiency.

The server or cluster configs below are used to initialize the server-level RocksDB store. One can customize where to store the upsert metadata.

By default, the store is deleted and restored upon each server restart to make it simple to ensure data consistency between the upsert metadata and segments assigned to the server, because the segment assignments can change across server restarts and the upsert metadata from uncommitted segments must be cleaned up upon restarts. The preloading feature and the UpsertSnapshotCreationTask minion task explained below make it very fast for servers to restore the upsert metadata consistently.

The write buffer configs and many other configs about the block/row cache sizes can help fine tune the performance of the RocksDB store. The config names are kept consistent with those available for RocksDB (opens in a new tab). The default value are supposed to work well for most cases.

{
   "pinot.server.kvStoreFactory.class.rocksdb": "ai.startree.pinot.upsert.rocksdb.metastore.rocksdb.RocksDBStore",
   "pinot.server.kvStoreFactory.rocksdb.datadir": "/home/pinot/data/index/metadata/upsert",
   "pinot.server.kvStoreFactory.rocksdb.upsert.delete.on.exit": "true",
   "pinot.server.kvStoreFactory.rocksdb.db.db.write.buffer.size": "5368709120",
   "pinot.server.kvStoreFactory.rocksdb.columnfamily.write.buffer.size": "104857600"
   ...
}

The following upsert configs are set inside table config (opens in a new tab). They are enabled by default today, but can be customized if needed. The snapshot and preload features are for servers to recover upsert metadata quickly upon restarts.

Each table partition creates its own ColumnFamily in the shared RocksDB store. To customize the table's ColumnFamily add the following RocksDB configs in the metadataManagerConfigs section. The config names are kept consistent with those available for RocksDB (opens in a new tab)

  "upsertConfig" : {
        "enableSnapshot": true,
        "enablePreload": true,
        "metadataManagerClass": "ai.startree.pinot.upsert.rocksdb.RocksDBTableUpsertMetadataManager",
        "metadataManagerConfigs": {
            "rocksdb.blockcache.size_bytes": "2147483648"
            ...
        }
    }

Enable async removal of upsert metadata

This is enabled by default in new StarTree releases, so you can skip this section unless there is a need to customize its behavior.

When a table has a lot of primary keys, its upsert metadata in RocksDB can be huge. Cleaning them up can take a long time. As this cleanup is done on the HelixTaskExecutor threads by default, those threads may be occupied for a long time, blocking the Helix state transitions from the other tables, which can potentially block the real-time data ingestion. So this async removal feature was added to move cleanup of upsert metadata to another thread pool, releasing the HelixTaskExecutor threads as soon as possible.

This feature can be enabled in the metadataManagerConfigs section and customized with the configs listed below.

    "upsertConfig" : {
        "enableSnapshot": true,
        "enablePreload": true,
        "metadataManagerClass": "ai.startree.pinot.upsert.rocksdb.RocksDBTableUpsertMetadataManager",
        "metadataManagerConfigs": {
          "rocksdb.asyncremoval.enable": "true",
          "rocksdb.asyncremoval.threads": "1",
          "rocksdb.asyncremoval.interval_in_seconds": "3600"
        }
    }

Use UpsertSnapshotCreation minion task

The open source implementation of the preloading feature uses the validDocIds snapshots (opens in a new tab) kept on server local disk to identify valid docs and write their upsert metadata into the RocksDB store. As preloading is write-only, it can finish pretty fast in most cases.

But for very large upsert tables, like those with hundreds of millions or billions of primary keys per server, this can still take a long time to finish. Besides, there are cases when servers may lose their local disks and have to download raw segments from the deep store. The raw segments don't have validDocIds snapshots, so servers have to load them with cpu intensive read/check/write operations.

To handle those issues, we have built a minion task called UpsertSnapshotCreationTask to prebuild the upsert metadata for table partitions on the minion workers and upload the metadata to the deep store. The servers can simply download and import the prebuilt metadata into RocksDB when loading upsert tables. The prebuilt upsert metadata contains certain upsert configs and segment information for consistency check before importing to ensure data correctness.

The minion task runs periodically to keep updating the prebuilt upsert metadata incrementally. To enable the minion task, add the following task configs. Please make sure enableSnapshot and enablePreload are set to true in the tableConfig for this minion task to execute. More about how to operate the minion tasks can be found in the Pinot docs (opens in a new tab).

   "task": {
      "taskTypeConfigsMap": {
        "UpsertSnapshotCreationTask": {
         "schedule": "0 0 12 * * ?"
        }
      }
    }

As tasks complete, those Restful APIs can be used to inspect those prebuilt upsert metadata.

/upsertSnapshots/{tableNameWithType}/names
/upsertSnapshots/{tableNameWithType}/{snapshotName}/metadata

For servers to take use of the prebuilt upsert metadata, enable the following config. Internally, this was enabled by default.

    "upsertConfig": {
      ...
      "metadataManagerConfigs": {
        "rocksdb.preload.use_prebuilt_snapshot": "true"
      }
    }

The UpsertSnapshotCreation task needs to know the number of table partitions in order to properly schedule tasks. If your tableIndexConfig has the segmentPartitionConfig field and defines a single partition column, then UpsertSnapshotCreation task will read the number of partitions as set in the numPartitions field below.

     "tableIndexConfig": {
        "segmentPartitionConfig": {
        "columnPartitionMap": {
          "<partition-colum>": {
            "numPartitions": <num-partitions>,
            ...
          }
        }
      }
    }

However, if your tableIndexConfig does not have the segmentPartitionConfig field or it has two or more partition columns specified there, you must explicitly tell the UpsertSnapshotCreation task how many partitions are in the table by using the num_partition_overwrite field in the upsertConfig section like below.

    "upsertConfig": {
      ...
      "metadataManagerConfigs": {
        "rocksdb.preload.use_prebuilt_snapshot": "true",
        "rocksdb.preload.num_partition_overwrite": "<num-partitions>"
      }

Metadata TTL and deletedKeys TTL

The TTL configs as mentioned in Pinot docs (opens in a new tab) can be used for off-heap upsert too. The upsert metadata is cleaned up by the async removal feature as said above, to not block the start of new consuming segments. Besides, the UpsertSnapshotCreation minion task is also extended to support TTL configs, so that the stale upsert metadata is not included in the upsert snapshots.

Enable off-heap upsert for an existing table

The off-heap upsert is enabled by default in new StarTree releases. But if you do have tables still using the on-heap upsert, then you may need to adjust the upsert configs accordingly and restart the servers to transfer the existing upsert metadata to the new backend.