SQL Ingestion
Overview
The SQL connector allows ingestion from a SQL table (E.g. Snowflake or BigQuery) into Pinot. This is done via the Pinot Minion framework, which will ingest data on a scheduled basis. The connector follows a pull model which will ingest data over a given time period. Currently, SQL ingestion from Snowflake and BigQuery is supported.
A templatized SQL query must provided that contains a time filter using the BETWEEN
operator like the following:
SELECT col1, col2
FROM myTable
WHERE timeColumn BETWEEN $START AND $END
The filter BETWEEN $START AND $END
must exist somewhere in the query.
Joins in the templatized SQL are not supported at the moment.
The connector will continually ingest data from a user provided start time until the current time. The connector will attempt to process time periods since the last execution time.
Prerequisites
- Data is assumed to be immutable after ingestion, since the connector uses watermark and segment time boundary metadata for one time ingestion.
- SQL ingestion can only be enabled for offline Pinot tables
- The SQL source table should have a time column and the user must know the format of the time column (millisecondsSinceEpoch, secondsSinceEpoch, hoursSinceEpoch, or ISO format e.g.
yyyy-MM-dd
).
Pinot Table Configuration
The offline table task configs can be manually set with the following connection and ingestion parameters.
Snowflake connection parameters
All the following properties are required to establish connection with Snowflake.
Property Name | Required | Description |
---|---|---|
sql.snowflake.user | Yes | Snowflake username |
sql.snowflake.password | Yes | Snowflake password |
sql.snowflake.account | Yes | Snowflake account name |
sql.snowflake.db | Yes | Snowflake database |
sql.snowflake.schema | Yes | Snowflake schema (collection of tables/views) |
BigQuery connection parameters
All the following properties are required to establish connection with BigQuery.
Property Name | Required | Description |
---|---|---|
sql.bigquery.projectid | Yes | BigQuery project ID |
In addition, a Google service account key must be exposed through the environment variable GOOGLE_APPLICATION_CREDENTIALS. For instructions on how to create a sercice account key, reference the Google cloud documentation (opens in a new tab).
Ingestion parameters
These ingestion parameters apply to all SQL connectors.
Property Name | Required | Description |
---|---|---|
sql.queryTemplate | Yes | Templatized query to pull data from Snowflake and pull into Pinot |
sql.timeColumnFormat | Yes | The format of the time column in their templatized query. This is needed to understand how to break the query into batches. |
sql.timeColumnName | Yes | The time column name provided in the templatized query. |
sql.startTime | Yes | Marks the beginning time of the Snowflake table for which to ingest. This is used to identify time buckets. Should be in the format of their Snowflake table time column. |
sql.endTime | No | Provide if a one time bootstrap job is desired. Continuous ingestion up to the current time will be disabled if provided. |
sql.bucketTimePeriod | No | If you want to override the segment time periods in case you are noticing that the segments created are too small or too large. This parameter will determine the time buckets for the Pinot segments in relation to the sql.startTime value. All segments will be bucketed by this length. E.g. A 5 day bucket period would be expressed as “5d”. A 1 hour bucket period would be expressed as “1h”. Valid period units are d (days), (h) hours, (m) minutes, and (s) seconds. |
sql.delayTimeLength | No | This will be a time value that will determine the latest time value for which data is ingested from Snowflake. $END = CURRENT_TIME - DELAY. If omitted, defaults to 1. E.g. A 5 day delay from the current time of execution would be expressed as “5d”. A 1 hour delay would be expressed as “1h”. Valid time delay length units are d (days), (h) hours, (m) minutes, and (s) seconds. |
schedule | No | CRON expression for when the job will be routinely triggered. Scheduling support is natively built into Minion jobs (see PinotTaskManager). If not provided, the ingestion task can be manually triggered via the Pinot task API |
Below is a sample task configuration for Snowflake ingestion:
"task": {
"taskTypeConfigsMap": {
"SqlConnectorBatchPushTask": {
"sql.snowflake.user": "startree",
"sql.startTime": "1644321600000",
"sql.timeColumnName": "timeColumn",
"sql.snowflake.password": "verysecretpassword",
"sql.snowflake.account": "myAccount",
"sql.delayTimeLength": "0d",
"schedule": "0 25 * ? * * *",
"sql.timeColumnFormat": "millisecondsSinceEpoch",
"sql.bucketTimePeriod": "1d",
"sql.snowflake.db": "STARTREE_EXAMPLE",
"sql.queryTemplate": "SELECT * FROM myTable WHERE timeColumn BETWEEN $START AND $END",
"sql.className": "ai.startree.connectors.sql.snowflake.SnowflakeConnector",
"sql.snowflake.schema": "PUBLIC"
}
}
}