Delta Lake Connector
Overview
The Delta Lake connector allows ingestion from a Delta Lake data platform hosted in S3 and GCS into Pinot. The ingestion is done as a periodic pull based system that is executed by the Pinot Minion framework. You specify the period that you want the ingestion task to follow and on each trigger Minion will check if there has been a change to the Delta Lake since the last execution and, if there has, it will ingest the new data.
StarTree's Delta Lake connector supports both the older Standalone Delta Lake API and the new Delta Kernel API. With the Delta Kernel API, the StarTree Delta Lake connector also supports the use of Deletion Vectors (opens in a new tab).
Prerequisites
- Delta Lake ingestion can only be enabled for offline Pinot tables
- This connector supports Delta tables hosted on Amazon S3 and Google Cloud Storage(GCS).
- This connector supports only sync mode, meaning each file in Delta Lake maps to a single segment in Pinot. Therefore, it is recommended to size the files appropriately to avoid creating an excessive number of segments. Find the official documentation (opens in a new tab) on how to tune the size of files in your Delta table.
Limitations
- Support for ADLS is not yet available.
- Support for Delta column mapping (opens in a new tab) is not yet available.
- Delta tasks do not utilize checkpoints. Consequently, no table entry is created at the ZK path PROPERTYSTORE→MINION_TASK_METADATA. This limitation applies to both Delta Standalone mode and Delta Kernel mode. Tasks will fail if not completed within one day, as Delta does not track progress. Subsequent triggers will generate new tasks to resume processing.
Pinot Table Configuration
The offline table task configs can be manually set with the following connection and ingestion parameters.
Connection parameters
All the following properties are required to establish connection with Delta Tables.
Property Name | Required | Description |
---|---|---|
delta.ingestion.table.fs | Yes | Must be set to "S3" or "GCS" |
delta.ingestion.table.uri | Yes | It is the source location of the Delta Lake table. For Amazon S3, the URI should start with: s3a://<bucket-name>/path/to/delta-table . For GCS, the URI should start with: gs://<bucket-name>/path/to/delta-table . |
delta.ingestion.useDeltaKernel | No | When set to "true" this will make the Delta Lake connector use the Delta Kernel API. When not provided this will default to false . It should be set to true , when delta table is hosted on GCS |
tableMaxNumTasks | No | This is used to determine how to partition ingestion jobs across Minion workers |
schedule | Yes | This sets the schedule for execution and uses the Quartz schedule format (opens in a new tab) for setting the schedule |
Connecting Delta Tables hosted on S3 via basic Authentication
Property Name | Required | Description |
---|---|---|
delta.ingestion.table.s3.region | Yes | The AWS region that houses the Delta Lake. |
delta.ingestion.table.s3.accessKey | No | If you are using Access Key based authorization then this is for the AWS access key. You will also need the AWS Secret Key below. |
delta.ingestion.table.s3.secretKey | No | The AWS Secret Key. |
Connecting Delta Tables hosted on S3 via IAM Authentication
If you are using IAM to managed access to your S3 bucket then, use the following parameters:
Property Name | Required | Description |
---|---|---|
delta.ingestion.table.s3.region | Yes | The AWS region that houses the Delta Lake. |
delta.ingestion.table.s3.role.arn | Yes | "arn:aws:iam::REPLACE_WITH_AWS_ACCOUNT_ID_WHERE_DATA_RESIDES:role/DataManagerDeltaLakeS3IAMRole" |
fs.s3a.aws.credentials.provider | Yes | Must be "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider" on StarTree Pinot versions 1.2.0* and "ai.startree.connectors.auth.AssumedRoleCredentialsProvider" on StarTree Pinot versions 1.3.0*. |
fs.s3a.assumed.role.sts.endpoint | Yes | The STS endpoint for your account (e.g., https://sts.us-west-2.amazonaws.com ) |
fs.s3a.assumed.role.sts.endpoint.region | Yes | The Region for your account (e.g, us-west-2 ) |
fs.s3a.assumed.role.arn | Yes | The ARN For the Role that will be assumed to get access ("arn:aws:iam::REPLACE_WITH_AWS_ACCOUNT_ID_WHERE_DATA_RESIDES:role/DataManagerDeltaLakeS3IAMRole" ) |
fs.s3a.assumed.role.credentials.provider | Yes | Must be com.amazonaws.auth.InstanceProfileCredentialsProvider . Used to gain a set of temporary security credentials to access an ARN that is in an account outside of the StarTree cluster. |
Connecting Delta Tables hosted on GCS
Property Name | Required | Description |
---|---|---|
delta.ingestion.table.gcs.projectId | Yes | It is the GCP project ID associated with the bucket that stores Delta Lake data. |
delta.ingestion.table.gcs.keyContentBase64 | No | It is used to specify the Google Cloud Service Account key in Base64-encoded format for authentication when accessing Delta Lake data stored in GCS. delta.ingestion.table.gcs.keyFile can be alternatively used instead of this parameter. See Google documentation (opens in a new tab) on how to create a service account key. |
delta.ingestion.table.gcs.keyFile | No | It specifies the path to the Google Cloud Service Account key file used for authentication when accessing Delta Lake data stored in a GCS bucket. It should be available locally in all minions. delta.ingestion.table.gcs.keyContentBase64 can be alternatively used instead of this parameter. |
Example Task Configuration with S3
"task": {
"taskTypeConfigsMap": {
"DeltaTableIngestionTask": {
"delta.ingestion.table.fs": "S3",
"delta.ingestion.useDeltaKernel": "true",
"delta.ingestion.table.uri": "s3a://bucket-name/path/to/delta-table",
"delta.ingestion.table.s3.accessKey": "myAccessKey",
"delta.ingestion.table.s3.secretKey": "mySecretKey",
"delta.ingestion.table.s3.region": "us-west-2",
"schedule": "0 20 * ? * * *",
"tableMaxNumTasks": "1000",
}
}
}
Example Task Configuration with GCS
"task": {
"taskTypeConfigsMap": {
"DeltaTableIngestionTask": {
"delta.ingestion.table.fs": "GCS",
"delta.ingestion.useDeltaKernel": "true",
"delta.ingestion.table.uri": "gs://bucket-name/path/to/delta-table",
"delta.ingestion.table.gcs.projectId": "project_id_123",
"delta.ingestion.table.gcs.keyContentBase64": "hraUc5dzBCQVFFRkFBU0NCS2d3Z2dTa=",
"schedule": "0 20 * ? * * *",
"tableMaxNumTasks": "1000",
}
}
}