Off-Heap Deduplication in StarTree Pinot
Apache Pinot natively supports deduplication dedup (opens in a new tab) during real-time ingestion to handle scenarios where duplicate primary keys must be invalidated. Common use cases include invalidation of duplicate user logins (e.g., IP/UserID), prevention of duplicate orders in shipment tracking systems etc. Internally, Pinot servers keep track of dedup metadata to tell which segment holds it right now, so that the server drops duplicate records during ingestion. In the open-source implementation, dedup metadata is stored on-heap, leading to scalability issues when the data grows.
Challenges with Open Source Dedup Implementation
- Decline in query performance due to increasing memory overhead.
- Ingestion delays caused by heap exhaustion and garbage collection.
- Prolonged server restart times for large tables as metadata is rebuilt.
StarTree’s Off-Heap Dedup
StarTree introduces off-heap deduplication using RocksDB to manage dedup metadata on disk. This approach reduces heap usage and improves scalability. Key functionalities include:
In-memory caching: Recent reads are served from memory, minimizing disk access. Batched writes: Latest writes are buffered in memory and flushed to disk in large batches, reducing write latency. Improved server restart times: Metadata snapshots allow faster bootstrap of dedup tables reducing the preload time.
Benefits of StarTree Off-Heap Dedup
- Reduced JVM heap pressure, enabling better scalability for large tables.
- Improved query latency by reducing contention on heap memory.
- Minimized ingestion delays through efficient memory and disk usage.
- Streamlined server restarts with prebuilt metadata snapshots.
Configuration for Off-Heap Deduplication
Enable off-heap deduplication by setting the enablePreload
and metadataManagerClass
parameters in the dedupConfig.
These parameters are enabled by default starting from January, 2025.
{
"dedupConfig": {
"dedupEnabled": true,
"hashFunction": "NONE",
"enablePreload": true,
"metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
"metadataManagerConfigs": {
...
},
"dedupTimeColumn": "timeCol"
}
}
Async Removal of Dedup Metadata
StarTree supports asynchronous removal of dedup metadata for deleted segments or expired keys based on Metadata TTL. This ensures consistent cleanup without impacting ingestion or query performance. The configs are enabled by default in StarTree, but one can change the cleanup frequency and key thresholds as shown:
{
"dedupConfig": {
"metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
"metadataManagerConfigs": {
"rocksdb.asyncremoval.enable": "true",
"rocksdb.asyncremoval.threads": "1",
"rocksdb.asyncremoval.interval_in_seconds": 3600,
"rocksdb.asyncremoval.time_threshold_in_seconds": 86400,
"rocksdb.asyncremoval.key_threshold": 1000000
}
}
}
Metadata TTL
With metadataTTL
in Open Source Pinot (opens in a new tab), metadata is removed when next consuming segment starts which can be minutes
or even hours, depending on the segment commit frequency. At StarTree, the dedup metadata is cleaned up by the async
removal process as mentioned above. This is particularly useful for scenarios where duplicates are unlikely after a certain time.
{
"dedupConfig": {
"metadataTTL": 86400
}
}
RocksDB Customization
When using the off-heap dedup, the server creates one RocksDB store to be shared by all the dedup tables for better efficiency. RocksDB stores data in SST (Sorted String Table) files on disk. There are some configurations which can be useful like configuring buffer size at db level and at columnfamily level. DB-level buffer size: Maximum size of all memtables combined across column families (default: 5GB). ColumnFamily buffer size: Maximum size of each memtable before being flushed to disk (default: 100MB).
Default cluster settings:
{
"pinot.server.kvStoreFactory.class.rocksdb": "ai.startree.pinot.common.rocksdb.metastore.RocksDBStore",
"pinot.server.kvStoreFactory.rocksdb.datadir": "/home/pinot/data/index/metadata/common",
"pinot.server.kvStoreFactory.rocksdb.common.delete.on.exit": "true",
"pinot.server.kvStoreFactory.rocksdb.db.db.write.buffer.size": "5368709120",
"pinot.server.kvStoreFactory.rocksdb.columnfamily.write.buffer.size": "104857600"
...
}
Each table partition creates its own ColumnFamily
in the shared RocksDB store. To customize the table's ColumnFamily
add the following RocksDB configs in the metadataManagerConfigs
section. The config names are kept consistent with
those available for RocksDB (opens in a new tab)
"dedupConfig" : {
"enablePreload": true,
"metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
"metadataManagerConfigs": {
"rocksdb.blockcache.size_bytes": "1073741824",
"rocksdb.rowcache.size_bytes": "104857600"
...
}
}
Dedup Snapshot Creation
For large tables with millions or billions of primary keys, server restart times can be significantly reduced using
prebuilt dedup metadata snapshots during preload. These snapshots are created by minion using the DedupSnapshotCreationTask
,
and persisted in deep store, later imported by servers during restart. This mechanism minimizes restart delays by eliminating
the need to rebuild metadata using read and write for each record.
This minion task runs periodically to keep updating the prebuilt dedup metadata incrementally. To enable the minion task, add the following task configs. Refer the Pinot docs (opens in a new tab) to understand how to schedule a minion task.
"task": {
"taskTypeConfigsMap": {
"DedupSnapshotCreationTask": {
"schedule": "0 0 12 * * ?"
}
}
}
Partition Configuration
The DedupSnapshotCreationTask
requires partition information to schedule tasks effectively. If the table has
segmentPartitionConfig
with a single partition column, the task uses the numPartitions
field:
"tableIndexConfig": {
"segmentPartitionConfig": {
"columnPartitionMap": {
"<partition-colum>": {
"numPartitions": <num-partitions>,
...
}
}
}
}
For tables with multiple partition columns or missing segmentPartitionConfig
, explicitly specify the partitions:
"dedupConfig": {
...
"metadataManagerConfigs": {
"rocksdb.preload.num_partition_overwrite": "<num-partitions>"
}
}
As tasks complete, following Restful APIs can be used to inspect the prebuilt dedup metadata.
/dedupSnapshots/{tableNameWithType}/names
/dedupSnapshots/{tableNameWithType}/{snapshotName}/metadata
/dedupSnapshots/{tableNameWithType}/latest