Offheap Dedup

Off-Heap Deduplication in StarTree Pinot

Apache Pinot natively supports deduplication dedup (opens in a new tab) during real-time ingestion to handle scenarios where duplicate primary keys must be invalidated. Common use cases include invalidation of duplicate user logins (e.g., IP/UserID), prevention of duplicate orders in shipment tracking systems etc. Internally, Pinot servers keep track of dedup metadata to tell which segment holds it right now, so that the server drops duplicate records during ingestion. In the open-source implementation, dedup metadata is stored on-heap, leading to scalability issues when the data grows.

Challenges with Open Source Dedup Implementation

  1. Decline in query performance due to increasing memory overhead.
  2. Ingestion delays caused by heap exhaustion and garbage collection.
  3. Prolonged server restart times for large tables as metadata is rebuilt.

StarTree’s Off-Heap Dedup

StarTree introduces off-heap deduplication using RocksDB to manage dedup metadata on disk. This approach reduces heap usage and improves scalability. Key functionalities include:

In-memory caching: Recent reads are served from memory, minimizing disk access. Batched writes: Latest writes are buffered in memory and flushed to disk in large batches, reducing write latency. Improved server restart times: Metadata snapshots allow faster bootstrap of dedup tables reducing the preload time.

Benefits of StarTree Off-Heap Dedup

  1. Reduced JVM heap pressure, enabling better scalability for large tables.
  2. Improved query latency by reducing contention on heap memory.
  3. Minimized ingestion delays through efficient memory and disk usage.
  4. Streamlined server restarts with prebuilt metadata snapshots.

Configuration for Off-Heap Deduplication

Enable off-heap deduplication by setting the enablePreload and metadataManagerClass parameters in the dedupConfig. These parameters are enabled by default starting from January, 2025.

{
    "dedupConfig": {
        "dedupEnabled": true,
        "hashFunction": "NONE",
        "enablePreload": true,
        "metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
        "metadataManagerConfigs": {
             ...
        },
        "dedupTimeColumn": "timeCol"
    }
}

Async Removal of Dedup Metadata

StarTree supports asynchronous removal of dedup metadata for deleted segments or expired keys based on Metadata TTL. This ensures consistent cleanup without impacting ingestion or query performance. The configs are enabled by default in StarTree, but one can change the cleanup frequency and key thresholds as shown:

{
    "dedupConfig": {
        "metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
        "metadataManagerConfigs": {
            "rocksdb.asyncremoval.enable": "true",
            "rocksdb.asyncremoval.threads": "1",
            "rocksdb.asyncremoval.interval_in_seconds": 3600,
            "rocksdb.asyncremoval.time_threshold_in_seconds": 86400,
            "rocksdb.asyncremoval.key_threshold": 1000000
        }
    }
}

Metadata TTL

With metadataTTL in Open Source Pinot (opens in a new tab), metadata is removed when next consuming segment starts which can be minutes or even hours, depending on the segment commit frequency. At StarTree, the dedup metadata is cleaned up by the async removal process as mentioned above. This is particularly useful for scenarios where duplicates are unlikely after a certain time.

{
    "dedupConfig": {
        "metadataTTL": 86400
    }
}

RocksDB Customization

When using the off-heap dedup, the server creates one RocksDB store to be shared by all the dedup tables for better efficiency. RocksDB stores data in SST (Sorted String Table) files on disk. There are some configurations which can be useful like configuring buffer size at db level and at columnfamily level. DB-level buffer size: Maximum size of all memtables combined across column families (default: 5GB). ColumnFamily buffer size: Maximum size of each memtable before being flushed to disk (default: 100MB).

Default cluster settings:

{
   "pinot.server.kvStoreFactory.class.rocksdb": "ai.startree.pinot.common.rocksdb.metastore.RocksDBStore",
   "pinot.server.kvStoreFactory.rocksdb.datadir": "/home/pinot/data/index/metadata/common",
   "pinot.server.kvStoreFactory.rocksdb.common.delete.on.exit": "true",
   "pinot.server.kvStoreFactory.rocksdb.db.db.write.buffer.size": "5368709120",
   "pinot.server.kvStoreFactory.rocksdb.columnfamily.write.buffer.size": "104857600"
   ...
}

Each table partition creates its own ColumnFamily in the shared RocksDB store. To customize the table's ColumnFamily add the following RocksDB configs in the metadataManagerConfigs section. The config names are kept consistent with those available for RocksDB (opens in a new tab)

  "dedupConfig" : {
        "enablePreload": true,
        "metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
        "metadataManagerConfigs": {
            "rocksdb.blockcache.size_bytes": "1073741824",
            "rocksdb.rowcache.size_bytes": "104857600"
            ...
        }
    }

Dedup Snapshot Creation

For large tables with millions or billions of primary keys, server restart times can be significantly reduced using prebuilt dedup metadata snapshots during preload. These snapshots are created by minion using the DedupSnapshotCreationTask, and persisted in deep store, later imported by servers during restart. This mechanism minimizes restart delays by eliminating the need to rebuild metadata using read and write for each record.

This minion task runs periodically to keep updating the prebuilt dedup metadata incrementally. To enable the minion task, add the following task configs. Refer the Pinot docs (opens in a new tab) to understand how to schedule a minion task.

   "task": {
      "taskTypeConfigsMap": {
        "DedupSnapshotCreationTask": {
         "schedule": "0 0 12 * * ?"
        }
      }
    }

Partition Configuration

The DedupSnapshotCreationTask requires partition information to schedule tasks effectively. If the table has segmentPartitionConfig with a single partition column, the task uses the numPartitions field:

     "tableIndexConfig": {
        "segmentPartitionConfig": {
        "columnPartitionMap": {
          "<partition-colum>": {
            "numPartitions": <num-partitions>,
            ...
          }
        }
      }
    }

For tables with multiple partition columns or missing segmentPartitionConfig, explicitly specify the partitions:

    "dedupConfig": {
      ...
      "metadataManagerConfigs": {
        "rocksdb.preload.num_partition_overwrite": "<num-partitions>"
      }
    }

As tasks complete, following Restful APIs can be used to inspect the prebuilt dedup metadata.

/dedupSnapshots/{tableNameWithType}/names
/dedupSnapshots/{tableNameWithType}/{snapshotName}/metadata
/dedupSnapshots/{tableNameWithType}/latest