Try StarTree Cloud: 30-day free trial
File Ingestion Task

File Ingestion

The FileIngestionTask converts files into segments. These files can come from Amazon S3, Google Cloud Storage (GCS), Azure Data Lake Storage (ADLS) or a local file system, but other file systems are supported as well by implementing the PinotFS interface (opens in a new tab).

Customize the task as needed for various use cases. For example, specify whether new files can be added and whether updates can be made to existing files.

Prerequisites

The task only works with OFFLINE tables so it's configured in the OFFLINE table config.

💡

Keep an eye on the data retention value set in table config when ingesting old datasets. If the value is set too low segments will be removed by the data retention task soon after they are ingested.

Pinot Table Configuration

The following sections show a few common use cases and how to customize the FileIngestionTask for them.

Bootstrap

Given a folder of many many files, ingest all of them into segments with indices and sizes as configured. During Proof of Concepts, it's common to bootstrap Pinot tables with very large datasets. For those cases we can assume the input files won't change during ingestion.

The task configurations are described below.

Access The Input Folder

Property NameRequiredDescription
inputDirURIYesThe input folder.
inputFormatYesThe input file format.
includeFileNamePatternNoTo filter the target input files out, e.g. to exlude mark file _SUCCESS. It's empty by default to allow all files. The syntax is defined by FileSystem.getPathMatcher (opens in a new tab).
input.fs.classNameNoThe className used to ingest the files. It's inferred per URI.
input.fs.prop.<>NoThose props are based on which fs.className is picked.
hasDirectoriesNoEnable a sanity check on whether a file URI is directory. DEPRECATED, as the sanity check got optimized a lot, and it's enabled all the time.
skipFileIfFailToInitReaderNoSkip input file if record reader fails to init for it. Enabled by default, the names and count of skipping files are tracked in task metadata for debugging.
push.modeNoTar (default) or Metadata . Use Metadata push mode if controller becomes a bottlelneck when tasks upload segments.
💡

The OSS Pinot docs (opens in a new tab) have explained the push modes clearly. In short, with Tar mode, the segments are uploaded from tasks to controller directly; with Metadata mode, the tasks extract metadata from segments, upload segments to deep store and segment metadata to controller separately. If only a few minion tasks run in parallel, Tar mode should just work fine. If many tens or hundreds of tasks run in parallel, then better use Metadata mode.

Control The Ingestion Parallelism

Property NameRequiredDescription
tableMaxNumTasksNoThe max number of parallel tasks a table can run at any time. It's 10 by default. But if set to -1 explicitly, tasks are generated as many as needed to ingest all files in one round.
taskMaxDataSizeNoThe max number of bytes one task can process, to spread workload among parallel tasks. It's 256MB by default. If set, taskMaxNumFiles is ignored.
taskMaxNumFilesNoThe max number of files one task can process, to spread workload among parallel tasks. It's 32 by default. This config is ignored if taskMaxDataSize is set explicitly.
maxNumRecordsPerSegmentNoThe number of records to put in output segment, to control size of output segments. It's 5M by default.
desiredSegmentSizeNoThe segment size desired (Default is 500M. K for kilobyte, M megabyte, G for gigabyte). With this configuration, we do not need both maxNumRecordsPerSegment and taskMaxDataSize properties to be configured..
scheduleNoCRON per Quartz cron syntax (opens in a new tab) for when the job will be routinely triggered. If not set, the task is not cron scheduled but can still be triggered via endpoint /tasks/schedule.
💡

If set tableMaxNumTasks to -1, tasks are generated as many as needed to ingest all files in one round, with taskMaxDataSize to control the workload for each subtask. In this one-shot manner, user should still check subtask completion states to decide if another round is needed to retry failed subtasks, by simply triggering another task generation.

💡

Running the task every 10min+ is a good starting point. You can then adjust the schedule and task parallelism to finish ingestion faster. We don't recommend running tasks very frequently (for example, every few seconds) given schedules of tasks across all tables are handled by a single cron scheduler in the Pinot Controller, and frequent schedules may delay other tasks.

A sample task configuration for S3

    "task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "...",
          "input.fs.prop.secretKey": "...",
          "input.fs.prop.accessKey": "...",
 
          "inputDirURI": "s3://..../",
          "inputFormat": "parquet",
          "includeFileNamePattern": "glob:**/*.parquet",
 
          "taskMaxNumFiles": "32",
          "tableMaxNumTasks": "10",
          "maxNumRecordsPerSegment": "5000000",
          "schedule": "0 */10 * * * ?"
        }
      }
    },
 
    "tableIndexConfig": {
      "segmentNameGeneratorType": "normalizedDate"
    }
💡

The segmentNameGeneratorType (opens in a new tab) parameter is useful when the time column has date formatted string values instead of numeric epoch values. By default 'simple' generator is used and it only works with numeric epoch values.

A sample task configuration for GCS

     "task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.className": "org.apache.pinot.plugin.filesystem.GcsPinotFS",
          "input.fs.prop.projectId": "<project>",
          "input.fs.prop.gcpKey": "/home/pinot/gcp/credentials.json",
          "inputDirURI": "gs://<BucketName>/<Some prefix>/",
          "includeFileNamePattern": "glob:**/*.parquet",
          "schedule": "0 */10 * * * ?",
          "inputFormat": "parquet",
          "tableMaxNumTasks": "400",
          "taskMaxDataSize": "1G",
          "maxNumRecordsPerSegment": "2147483647",
          "push.mode": "metadata" 
        }
      }
    },

In the following subsections, we'll discuss some advanced features and configs when processing the input data via FileIngestionTask.

Derive Columns from The Source File Paths

This is useful when the source file directory is partitioned on some dimensions present in the file path(eg: time with day as the smallest bucket). Those dimensions can be extracted as Pinot table columns.

Property NameRequiredDescription
pathColumn.<seq>.nameYesThe name of the value that is extracted from the file path or transformed from other values.
  1. the name should be unique in the list
  2. If pathColumn.<seq>.output (the next property) is true, the name must match one name defined in the schema, and the value must match the dataType and format defined in the schema
pathColumn.<seq>.outputNoWhether the value should be output to the Pinot table.
pathColumn.<seq>.pathComponentStartIndexNoThe start path component index used to extract a sub part of the file path.
  1. The scheme part is omitted when calculating the number of path components. For example, s3://bucket-name/year=2022/month=01/day=01/city=sunnyvale/file.csv has 6 components
  2. The index is inclusive
  3. If the value is non-negative, it is 0 based
  4. The value can also be negative, where the last path component has index -1, and the first path component has index "-(the total number of path component)"
  5. If it is not specified, it is 0
pathColumn.<seq>.numPathComponentsToExtractNoThe number of path components to extract starting from pathColumn.<seq>.pathComponentStartIndex. If it is not specified, it will extract path components from pathColumn.<seq>.pathComponentStartIndex to the last one
pathColumn.<seq>.charStartIndexNoThe start character index used to further extract a sub part from the part determined by pathColumn.<seq>.pathComponentStartIndex & pathColumn.<seq>.numPathComponentsToExtract.
  1. The index is inclusive
  2. If the value is non-negative, it is 0 based
  3. The value can also be negative, where the last path component has index -1, and the first path component has index "-(the total number of characters)"
  4. If it is not specified, it is 0
pathColumn.<seq>.numCharsToExtractNoThe number of chars to extract starting from pathColumn.<seq>.charStartIndex. If it is not specified, we will extract chars from pathColumn.<seq>.charStartIndex to the last one
pathColumn.<seq>.regexNoA regex to further extract one or more groups from the part determined by pathColumn.<seq>.pathComponentStartIndex & pathColumn.<seq>.numPathComponentsToExtract & pathColumn.<seq>.charStartIndex & pathColumn.<seq>.charEndIndex
pathColumn.<seq>.transformNoA function or an expression used to transform values referred by the names to a new value.
  1. Currently, only string concatenation is supported. Values are referred by their names (the first property), the format is ${another value’s name}
  2. This field cannot be used together with the 3rd ~ 5th properties

Note that <seq> is an identifier, properties with the same <seq> form a group and are used together to extract or transform a value.

To help readers understand how it works, here is an example. Say the file path is s3://bucket/year=2022/month=05/day=20/file.csv, and a table has the following schema and configurations

{
  "schemaName": "...",
  "dateTimeFieldSpec": [
    {
      "name": "date",
      "dataType": "STRING",
      "format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy/MM/dd"
    }
  ],
  ...
}
{
  ...
  "task": {
    "taskTypeConfigsMap": {
      "FileIngestionTask": {
        ...
        "pathColumn.0.name": "year",
        "pathColumn.0.output": "false",
        "pathColumn.0.pathComponentStartIndex": "1",
        "pathColumn.0.numPathComponentsToExtract": "1",
        "pathColumn.0.charStartIndex": "5",
        "pathColumn.0.numCharsToExtract": "4",
        
        "pathColumn.1.name": "month",
        "pathColumn.1.output": "false",
        "pathColumn.1.pathComponentStartIndex": "2",
        "pathColumn.1.numPathComponentsToExtract": "1",
        "pathColumn.1.regex": "^month=(.*)$")
        
        "pathColumn.2.name": "day",
        "pathColumn.2.output": "false",
        "pathColumn.2.pathComponentStartIndex": "3",
        "pathColumn.2.numPathComponentsToExtract": "1",
        "pathColumn.2.charStartIndex": "4",
        "pathColumn.2.numCharsToExtract": "2"
        
        "pathColumn.3.name": "date",
        "pathColumn.3.output": "true",
        "pathColumn.3.transform": "${year}/${month}/${day}",
        ...
      }
    }
  },

Those 4 groups (indicated by 0 ~ 3) of pathColumn work as following:

  1. The first group extracts 2022 as "year"
  2. The second group extracts 01 as "month"
  3. The third group extracts 01 as "day"
  4. The fourth group combines "year", "month" and "day" as "date", and mark "date" as a column in the Pinot table

Partition Data by Sub-directory Level in The Source File Paths

This feature is useful to group data from different folders and files into the same segment or set of segments.

Property NameDescription
subDirLevelForDataPartitionTreated inputDirURI as the root directory (the level of sub-directory = 0), subDirLevelForDataPartition defines the level of sub-directory used to group/partition data.
  1. Files within the same sub-directory defined by subDirLevelForDataPartition are grouped into the same segment or set of segments
  2. As a special case, files with sub-directory levels less than subDirLevelForDataPartition are treated as a special group, and also are grouped into the same segment or set of segments

Note that the configuration taskMaxNumFiles is still respected when subDirLevelForDataPartition is configured.

To help readers understand how it works, here is an exmaple. Given the following list of files and the following configurations

s3://bucket/a/c/file1.csv
s3://bucket/a/c/file2.csv
s3://bucket/a/file1.csv
s3://bucket/a/file2.csv
s3://bucket/b/c/file1.csv
s3://bucket/b/c/file2.csv
s3://bucket/b/c/file3.csv
s3://bucket/b/c/file4.csv
s3://bucket/b/file1.csv
s3://bucket/b/file2.csv
{
  ...
  "task": {
    "taskTypeConfigsMap": {
      "FileIngestionTask": {
        ...
        "inputDirURI": "s3://bucket",
        "subDirLevelForDataPartition": "2",
        "taskMaxNumFiles": "3",
        ...
      }
    }
  },
  1. Files "s3://bucket/a/c/file1.csv" and "s3://bucket/a/c/file2.csv" will be group together
  2. Files "s3://bucket/a/file1.csv", "s3://bucket/a/file2.csv" and "s3://bucket/b/file1.csv" will be group together
  3. Files "s3://bucket/b/c/file1.csv", "s3://bucket/b/c/file2.csv" and "s3://bucket/b/c/file3.csv" will be group together
  4. File "s3://bucket/b/file2.csv" is a group
  5. File "s3://bucket/b/c/file4.csv" is a group
💡

Combined together with the feature Derive Columns from The Source File Paths, one can achieve the goal of partitionning data by columns extracted from file paths.

Other Implicit Configurations

Data records can be partitioned and sorted based on configs in tableIndexConfig (opens in a new tab) and transformed based on configs in ingestionConfig (opens in a new tab).

The FileIngestionTask uses the same segment processing library used by MergeRollupTask (opens in a new tab) and RealtimeToOfflineTask (opens in a new tab), which means it can merge/rollup the data records when generating the segments in similar way.

Those configs to tweak the segment processing library can be set in task configs e.g. mergeType, <metricName>.aggregationType, windowStartMs, windowEndMs, etc.

And you can find some examples in the Pinot documentation for MergeRollupTask and RealtimeToOfflineTask.

Sync mode

This is for cases where the existing files are updated regularly and users want to keep the segments and files in sync. Most of the task configs are the same as those used to Bootstrap, with a few exceptions like setting enableSync. The segmentNameGeneratorType should be fixed in sync mode, but it is set automatically if omit.

Below is a sample task configuration:

"task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "...",
          "input.fs.prop.secretKey": "...",
          "input.fs.prop.accessKey": "...",
 
          "inputDirURI": "s3://.../",
          "inputFormat": "json",
          "includeFileNamePattern": "glob:**/*.json.gz",
 
          "enableSync": "true",
          "tableMaxNumTasks": "10",
          "schedule": "0 */10 * * * ?"
        }
      }
    },
 
    "tableIndexConfig": {
      "segmentNameGeneratorType": "fixed",
    }
💡
  1. One file is mapped to one segment. So the file is better to be well prepared ahead, like the size.
  2. File update is detected by file modification time alone.
  3. File deletion is not checked.
  4. The number of files is should be less than 30K, because one file generates one segment and it's better to limit the total number of segments for a table. However, this is a soft limit and can be tuned.

Sync mode with atomic switch

A sync mode with atomic switch ensures that data in the Pinot table is consistent both during and after file ingestion. The table remains unchanged (nothing ingested) or successfully updated (all files ingested, and new segments made available). After the ingestion task is complete, the table atomically switches to the new set of segments, and the old segments are purged.

Below is a sample task configuration.

"task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "...",
          "input.fs.prop.secretKey": "...",
          "input.fs.prop.accessKey": "...",
 
          "inputDirURI": "s3://.../",
          "inputFormat": "json",
          "includeFileNamePattern": "glob:**/*.json.gz",
 
          "enableSync": "true",
	  "enableAtomicUpload": "true",
          "schedule": "0 */30 * * * ?"
        }
      }
    },

Turn on sync mode with consistent push

  1. In tableIndexConfig (opens in a new tab), set both enableAtomicUpload and enableSync to true.
  2. Provision additional storage as needed to handle more frequent updates. If the table storage has been close to reaching capacity, we recommend increasing the storage capacity to ensure storage is available to hold extra segments.
  3. Set the cron job schedule property to every 15 or 30 minutes.

Incremental ingestion

This is for cases where the new files can be added during ingestion, and you want the tasks to dectect and ingest the new files automatically. The sync mode can handle some of those cases subject to the aforementioned limitations. To be more scalable, you can enable incremental ingestion like below.

The input folders must be date partitioned.

e.g. <input>/YYYY/MM/DD/<filename>
     <input>/2022/05/11/000000.csv
     <input>/2022/05/11/000001.csv
     <input>/2022/05/11/000002.csv
     <input>/2022/05/12/000000.csv
     <input>/2022/05/12/000001.csv


e.g. <input>/year=YYYY/month=MM/day=DD/<filename>
     <input>/year=2022/month=05/day=11/000000.csv
     <input>/year=2022/month=05/day=11/000001.csv
     <input>/year=2022/month=05/day=11/000002.csv
     <input>/year=2022/month=05/day=12/000000.csv
     <input>/year=2022/month=05/day=12/000001.csv

Input folders should be added chronologically, and files should be added alphabetically. For example, <input>/2022/05/11/ should be added before <input>/2022/05/12/, and <input>/2022/05/11/000000.csv should be added before <input>/2022/05/11/000001.csv.

There are a few requirements on the date values in the file paths:

  1. The year, month and day are numbers. Year has four digits; month and day have two digits, so you'll need to pad with zeroes e.g. 05.
  2. It’s in year, month, day order. It doesn’t matter if you use / or - as separator, or if you have a prefix like year=2022/month=05.

A sample task configuration is shown below. In addition to the task configs explained for Bootstrap above, you need to add checkpointTTLSec to enable incremental ingestion. With this config, the checkpoints are kept around for as long as TTL so that files added within this time window can be detected.

    "task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "...",
          "input.fs.prop.secretKey": "...",
          "input.fs.prop.accessKey": "...",
 
          "inputDirURI": "s3://..../",
          "inputFormat": "csv",
          "includeFileNamePattern": "glob:**/*.csv",
 
          "checkpointTTLSec": "259200",
          "taskMaxNumFiles": "32",
          "tableMaxNumTasks": "10",
          "maxNumRecordsPerSegment": "5000000",
          "schedule": "0 */10 * * * ?"
        }
      }
    },
 
    "tableIndexConfig": {
      "segmentNameGeneratorType": "normalizedDate"
    }
💡

More about checkpointTTLSec:

  1. It's 0 by default, to remove checkpoints once the input files are confirmed as ingested and bump the watermark ahead, used by the Bootstrap use case. FYI, watermark is an internal track of the last file that's got ingested. Files before it are ingested and files are scanned in alphabetical order.
  2. It can be -1, to keep all checkpoints and disable watermark, so that files can be added in any daily subfolders and be detected and ingested. This way results in many checkpoints in ZooKeeper and should work fine only for small input folder.
  3. It can be a positive values like 259200 (3 days) to allow new files to be added in most recent few daily subfolders.

The incremental ingestion only detects whether new files are added. This is different from the sync mode where both new files and updates made on existing files are detected.

User driven ingestion

There are cases where the automation aforementioned doesn't work with the existing input folders. This could be because the folder is not date partitioned as assumed by the automation.

For these cases, APIs are provided for you to fully control the ingestion process.

Start ingestion

POST /tasks/execute with request body like below to config the ingestion job
{
   "taskType": "FileIngestionTask",
   "tableName": "myTable",
   "taskName": "myTask-0",
   "taskConfigs": {
	"input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
	"input.fs.prop.region": "...",
	"input.fs.prop.secretKey": "...",
	"input.fs.prop.accessKey": "...",
 
	"inputDirURI": "s3://..../",
	"inputFormat": "parquet",
	"includeFileNamePattern": "glob:**/*.parquet",
 
	"taskMaxNumFiles": "32",
	"tableMaxNumTasks": "1024",
	"maxNumRecordsPerSegment": "5000000"
   }
 }
💡
  1. The taskName can be set explicitly or auto generated if omit.
  2. There is no need to set the cron schedule, as this POST call triggers the ingestion.
  3. Can set a large value for tableMaxNumTasks so that enough tasks are generated to ingest all input files in the folder. But no problem if more tasks are needed as we can call this endpoint again and again based on the ingestion progress.

Poll progress

GET /tasks/subtask/{taskName}/state with the taskName used to start ingestion 

This returns if all subtasks are COMPLETED or any is failed with ERROR.

If there are failed subtasks, call /tasks/execute again with same task configs but a different taskName. Don't worry about duplicating or missing any files, as there is internal progress metadata to make sure files are ingested exactly once even though the same input folder is ingested again and again.

If all subtasks are completed, also call /tasks/execute again with same task configs but a different taskName just to make sure no files are left behind. The response will say that no subtasks are generated when all files in the folder are ingested.

Reset ingestion

Before ingesting a new folder, the progress metadata in ZooKeeper has to be reset. The following request deletes the progress metadata kept in ZK.

DELETE /tasks/{taskType}/{tableNameWithType}/metadata

Remove the schedule/cron parameter from the table's task configuration

Scheduled and user-driven task triggers should not co-exist. This step is to remove the scheduled trigger, so that the scheduled task no longer runs. In the table config, look for the following and remove the schedule parameter.

"task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.prop.secretKey": "",
          "**schedule**": "",

Monitoring

There are metrics as listed in OSS Pinot docs (opens in a new tab) to be collected and plotted via tools like Prometheus and Grafana. It's also very handy to use Pinot UI to check task runtime status and completion results for example (opens in a new tab).