Offheap Dedup

Off-Heap Deduplication in StarTree Pinot

Apache Pinot natively supports deduplication dedup (opens in a new tab) during real-time ingestion to handle scenarios where duplicate primary keys must be invalidated. Common use cases include invalidation of duplicate user logins (e.g., IP/UserID), prevention of duplicate orders in shipment tracking systems etc. Internally, Pinot servers keep track of dedup metadata to tell which segment holds it right now, so that the server drops duplicate records during ingestion. In the open-source implementation, dedup metadata is stored on-heap, leading to scalability issues when the data grows.

Challenges with Open Source Dedup Implementation

  1. Decline in query performance due to increasing memory overhead.
  2. Ingestion delays caused by heap exhaustion and garbage collection.
  3. Prolonged server restart times for large tables as metadata is rebuilt.

StarTree’s Off-Heap Dedup

StarTree introduces off-heap deduplication using RocksDB to manage dedup metadata on disk. This approach reduces heap usage and improves scalability. Key functionalities include:

In-memory caching: Recent reads are served from memory, minimizing disk access. Batched writes: Latest writes are buffered in memory and flushed to disk in large batches, reducing write latency. Improved server restart times: Metadata snapshots allow faster bootstrap of dedup tables reducing the preload time.

Benefits of StarTree Off-Heap Dedup

  1. Reduced JVM heap pressure, enabling better scalability for large tables.
  2. Improved query latency by reducing contention on heap memory.
  3. Minimized ingestion delays through efficient memory and disk usage.
  4. Streamlined server restarts with prebuilt metadata snapshots.

Configuration for Off-Heap Deduplication

Enable off-heap deduplication by setting the enablePreload and metadataManagerClass parameters in the dedupConfig. These parameters are enabled by default starting from January, 2025.

{
    "dedupConfig": {
        "dedupEnabled": true,
        "hashFunction": "NONE",
        "enablePreload": true,
        "metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
        "metadataManagerConfigs": {
             ...
        },
        "dedupTimeColumn": "timeCol"
    }
}
 

Async Removal of Dedup Metadata

StarTree supports asynchronous removal of dedup metadata for deleted segments or expired keys based on Metadata TTL. This ensures consistent cleanup without impacting ingestion or query performance. The configs are enabled by default in StarTree, but one can change the cleanup frequency and key thresholds as shown:

{
    "dedupConfig": {
        "metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
        "metadataManagerConfigs": {
            "rocksdb.asyncremoval.enable": "true",
            "rocksdb.asyncremoval.threads": "1",
            "rocksdb.asyncremoval.interval_in_seconds": 3600,
            "rocksdb.asyncremoval.time_threshold_in_seconds": 86400,
            "rocksdb.asyncremoval.key_threshold": 1000000
        }
    }
}

Metadata TTL

With metadataTTL in Open Source Pinot (opens in a new tab), metadata is removed when next consuming segment starts which can be minutes or even hours, depending on the segment commit frequency. At StarTree, the dedup metadata is cleaned up by the async removal process as mentioned above. This is particularly useful for scenarios where duplicates are unlikely after a certain time.

{
    "dedupConfig": {
        "metadataTTL": 86400
    }
}

RocksDB Customization

When using the off-heap dedup, the server creates one RocksDB store to be shared by all the dedup tables for better efficiency. RocksDB stores data in SST (Sorted String Table) files on disk. There are some configurations which can be useful like configuring buffer size at db level and at columnfamily level. DB-level buffer size: Maximum size of all memtables combined across column families (default: 5GB). ColumnFamily buffer size: Maximum size of each memtable before being flushed to disk (default: 100MB).

Default cluster settings:

{
   "pinot.server.kvStoreFactory.class.rocksdb": "ai.startree.pinot.common.rocksdb.metastore.RocksDBStore",
   "pinot.server.kvStoreFactory.rocksdb.datadir": "/home/pinot/data/index/metadata/common",
   "pinot.server.kvStoreFactory.rocksdb.common.delete.on.exit": "true",
   "pinot.server.kvStoreFactory.rocksdb.db.db.write.buffer.size": "5368709120",
   "pinot.server.kvStoreFactory.rocksdb.columnfamily.write.buffer.size": "104857600"
   ...
}

Each table partition creates its own ColumnFamily in the shared RocksDB store. To customize the table's ColumnFamily add the following RocksDB configs in the metadataManagerConfigs section. The config names are kept consistent with those available for RocksDB (opens in a new tab)

  "dedupConfig" : {
        "enablePreload": true,
        "metadataManagerClass": "ai.startree.pinot.dedup.rocksdb.RocksDBTableDedupMetadataManager",
        "metadataManagerConfigs": {
            "rocksdb.blockcache.size_bytes": "1073741824",
            "rocksdb.rowcache.size_bytes": "104857600"
            ...
        }
    }

Dedup Snapshot Creation

For large tables with millions or billions of primary keys, server restart times can be significantly reduced using prebuilt dedup metadata snapshots during preload. These snapshots are created by minion using the DedupSnapshotCreationTask, and persisted in deep store, later imported by servers during restart. This mechanism minimizes restart delays by eliminating the need to rebuild metadata using read and write for each record.

This minion task runs periodically to keep updating the prebuilt dedup metadata incrementally. To enable the minion task, add the following task configs. Refer the Pinot docs (opens in a new tab) to understand how to schedule a minion task.

   "task": {
      "taskTypeConfigsMap": {
        "DedupSnapshotCreationTask": {
         "schedule": "0 0 12 * * ?"
        }
      }
    }

Partition Configuration

The DedupSnapshotCreationTask requires partition information to schedule tasks effectively. If the table has segmentPartitionConfig with a single partition column, the task uses the numPartitions field:

     "tableIndexConfig": {
        "segmentPartitionConfig": {
        "columnPartitionMap": {
          "<partition-colum>": {
            "numPartitions": <num-partitions>,
            ...
          }
        }
      }
    }

For tables with multiple partition columns or missing segmentPartitionConfig, explicitly specify the partitions:

    "dedupConfig": {
      ...
      "metadataManagerConfigs": {
        "rocksdb.preload.num_partition_overwrite": "<num-partitions>"
      }
    }

As tasks complete, following Restful APIs can be used to inspect the prebuilt dedup metadata.

/dedupSnapshots/{tableNameWithType}/names
/dedupSnapshots/{tableNameWithType}/{snapshotName}/metadata
/dedupSnapshots/{tableNameWithType}/latest

Note that using the health check PREBUILT_SNAPSHOT_CHECK, we can identify those dedup tables whose snapshot generation task is disabled in a cluster.

Frequently Asked Questions

Can we enable deduplication on existing real-time tables?

Enabling dedupConfig on existing real time tables is not advised and it will not eliminate any duplicate records from the existing segments, but newly ingested data will start to get deduplicated after enabling dedup. If the user still wishes to enable it on the existing table, add the dedupConfig and restart the server, beware they may see duplicates in existing segments until the segments get deleted from the table, for example due to data retention.

Is it possible to convert an upsert table to dedup table?

It is possible to convert an upsert table to dedup table but it's recommended to run compaction task upsert compaction (opens in a new tab) to remove invalid docs from existing segments first, so that those invalid docs won't become duplicates when enabling dedup. As said above, server restart is needed to convert upsert table to dedup table.

Currently, it's not allowed to enable both upsert and dedup on one table.