Delta Lake Connector

Delta Lake Connector

Overview

The Delta Lake connector allows ingestion from a Delta Lake data platform hosted in S3 into Pinot. The ingestion is done as a periodic pull based system that is executed by the Pinot Minion framework. You specify the period that you want the ingestion task to follow and on each trigger Minion will check if there has been a change to the Delta Lake since the last execution and, if there has, it will ingest the new data.

StarTree's Delta Lake connector supports both the older Standalone Delta Lake API and the new Delta Kernel API. With the Delta Kernel API, the StarTree Delta Lake connector also supports the use of Deletion Vectors (opens in a new tab).

Prerequisites

  • Delta Lake ingestion can only be enabled for offline Pinot tables
  • This connector only supports Delta tables hosted on Amazon S3.
  • This connector supports only sync mode, meaning each file in Delta Lake maps to a single segment in Pinot. Therefore, it is recommended to size the files appropriately to avoid creating an excessive number of segments. Find the official documentation (opens in a new tab) on how to tune the size of files in your Delta table.

Limitations

  • Support for GCS and ADLS is not yet available.
  • Support for Delta column mapping (opens in a new tab) is not yet available.
  • Delta tasks do not utilize checkpoints. Consequently, no table entry is created at the ZK path PROPERTYSTORE→MINION_TASK_METADATA. This limitation applies to both Delta Standalone mode and Delta Kernel mode. Tasks will fail if not completed within one day, as Delta does not track progress. Subsequent triggers will generate new tasks to resume processing.

Pinot Table Configuration

The offline table task configs can be manually set with the following connection and ingestion parameters.

Connection parameters

All the following properties are required to establish connection with Delta Tables.

Property NameRequiredDescription
delta.ingestion.table.fsYesMust be set to "S3"
scheduleYesThis sets the schedule for execution and uses the Quartz schedule format (opens in a new tab) for setting the schedule
delta.ingestion.table.uriYesThe URI to the Delta Lake table.
delta.ingestion.table.s3.regionYesThe AWS region that houses the Delta Lake.
tableMaxNumTasksNoThis is used to determine how to partition ingestion jobs across Minion workers
delta.ingestion.useDeltaKernelNoWhen set to "true" this will make the Delta Lake connector use the Delta Kernel API. When not provided this will default to true and it is recommended that you use kernel mode.

Secret Key S3 Access Control

Property NameRequiredDescription
delta.ingestion.table.s3.accessKeyNoIf you are using Access Key based authorization then this is for the AWS access key. You will also need the AWS Secret Key below.
delta.ingestion.table.s3.secretKeyNoThe AWS Secret Key.

IAM S3 Access Control

If you are using IAM to managed access to your S3 bucket then, use the following parameters:

Property NameRequiredDescription
delta.ingestion.table.s3.role.arnYes"arn:aws:iam::REPLACE_WITH_AWS_ACCOUNT_ID_WHERE_DATA_RESIDES:role/DataManagerDeltaLakeS3IAMRole"
fs.s3a.aws.credentials.providerYesMust be "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider" on StarTree Pinot versions 1.2.0* and "ai.startree.connectors.auth.AssumedRoleCredentialsProvider" on StarTree Pinot versions 1.3.0*.
fs.s3a.assumed.role.sts.endpointYesThe STS endpoint for your account (e.g., https://sts.us-west-2.amazonaws.com)
fs.s3a.assumed.role.sts.endpoint.regionYesThe Region for your account (e.g, us-west-2)
fs.s3a.assumed.role.arnYesThe ARN For the Role that will be assumed to get access ("arn:aws:iam::REPLACE_WITH_AWS_ACCOUNT_ID_WHERE_DATA_RESIDES:role/DataManagerDeltaLakeS3IAMRole")
fs.s3a.assumed.role.credentials.providerYesMust be com.amazonaws.auth.InstanceProfileCredentialsProvider. Used to gain a set of temporary security credentials to access an ARN that is in an account outside of the StarTree cluster.

Example Task Configuration

"task": {
      "taskTypeConfigsMap": {
        "DeltaTableIngestionTask": {
          "delta.ingestion.table.fs": "S3",
          "delta.ingestion.table.s3.accessKey": "myAccessKey",
          "schedule": "0 20 * ? * * *",
          "delta.ingestion.table.uri": "s3a://my.bucket/deltaLake/",
          "delta.ingestion.table.s3.secretKey": "mySecretKey",
          "tableMaxNumTasks": "1000",
          "delta.ingestion.table.s3.region": "us-west-2",
          "delta.ingestion.useDeltaKernel": "true"
        }
      }
    }