SQL Connector

SQL Ingestion

Overview

The SQL connector allows ingestion from a SQL table (E.g. Snowflake or BigQuery) into Pinot. This is done via the Pinot Minion framework, which will ingest data on a scheduled basis. The connector follows a pull model which will ingest data over a given time period. Currently, SQL ingestion from Snowflake and BigQuery is supported.

A templatized SQL query must provided that contains a time filter using the BETWEEN operator like the following:

SELECT col1, col2 
FROM myTable 
WHERE timeColumn BETWEEN $START AND $END
💡

The filter BETWEEN $START AND $END must exist somewhere in the query. Joins in the templatized SQL are not supported at the moment.

The connector will continually ingest data from a user provided start time until the current time. The connector will attempt to process time periods since the last execution time.

Prerequisites

  • Data is assumed to be immutable after ingestion, since the connector uses watermark and segment time boundary metadata for one time ingestion.
  • SQL ingestion can only be enabled for offline Pinot tables
  • The SQL source table should have a time column and the user must know the format of the time column (millisecondsSinceEpoch, secondsSinceEpoch, hoursSinceEpoch, or ISO format e.g. yyyy-MM-dd).

Pinot Table Configuration

The offline table task configs can be manually set with the following connection and ingestion parameters.

Snowflake connection parameters

All the following properties are required to establish connection with Snowflake.

Property NameRequiredDescription
sql.snowflake.userYesSnowflake username
sql.snowflake.passwordYesSnowflake password
sql.snowflake.accountYesSnowflake account name
sql.snowflake.dbYesSnowflake database
sql.snowflake.schemaYesSnowflake schema (collection of tables/views)

BigQuery connection parameters

All the following properties are required to establish connection with BigQuery.

Property NameRequiredDescription
sql.bigquery.projectidYesBigQuery project ID

In addition, a Google service account key must be exposed through the environment variable GOOGLE_APPLICATION_CREDENTIALS. For instructions on how to create a sercice account key, reference the Google cloud documentation (opens in a new tab).

Ingestion parameters

These ingestion parameters apply to all SQL connectors.

Property NameRequiredDescription
sql.queryTemplateYesTemplatized query to pull data from Snowflake and pull into Pinot
sql.timeColumnFormatYesThe format of the time column in their templatized query. This is needed to understand how to break the query into batches.
sql.timeColumnNameYesThe time column name provided in the templatized query.
sql.startTimeYesMarks the beginning time of the Snowflake table for which to ingest. This is used to identify time buckets. Should be in the format of their Snowflake table time column.
sql.endTimeNoProvide if a one time bootstrap job is desired. Continuous ingestion up to the current time will be disabled if provided.
sql.bucketTimePeriodNoIf you want to override the segment time periods in case you are noticing that the segments created are too small or too large. This parameter will determine the time buckets for the Pinot segments in relation to the sql.startTime value. All segments will be bucketed by this length. E.g. A 5 day bucket period would be expressed as “5d”. A 1 hour bucket period would be expressed as “1h”. Valid period units are d (days), (h) hours, (m) minutes, and (s) seconds.
sql.delayTimeLengthNoThis will be a time value that will determine the latest time value for which data is ingested from Snowflake. $END = CURRENT_TIME - DELAY. If omitted, defaults to 1. E.g. A 5 day delay from the current time of execution would be expressed as “5d”. A 1 hour delay would be expressed as “1h”. Valid time delay length units are d (days), (h) hours, (m) minutes, and (s) seconds.
scheduleNoCRON expression for when the job will be routinely triggered. Scheduling support is natively built into Minion jobs (see PinotTaskManager). If not provided, the ingestion task can be manually triggered via the Pinot task API

Below is a sample task configuration for Snowflake ingestion:

"task": {
      "taskTypeConfigsMap": {
        "SqlConnectorBatchPushTask": {
          "sql.snowflake.user": "startree",
          "sql.startTime": "1644321600000",
          "sql.timeColumnName": "timeColumn",
          "sql.snowflake.password": "verysecretpassword",
          "sql.snowflake.account": "myAccount",
          "sql.delayTimeLength": "0d",
          "schedule": "0 25 * ? * * *",
          "sql.timeColumnFormat": "millisecondsSinceEpoch",
          "sql.bucketTimePeriod": "1d",
          "sql.snowflake.db": "STARTREE_EXAMPLE",
          "sql.queryTemplate": "SELECT * FROM myTable WHERE timeColumn BETWEEN $START AND $END",
          "sql.className": "ai.startree.connectors.sql.snowflake.SnowflakeConnector",
          "sql.snowflake.schema": "PUBLIC"
        }
      }
    }