File Ingestion
The FileIngestionTask
converts files into segments.
These files can come from Amazon S3, Google Cloud Storage (GCS), Azure Data Lake Storage (ADLS) or a local file system, but other file systems are supported as well by implementing the PinotFS interface (opens in a new tab).
Customize the task as needed for various use cases. For example, specify whether new files can be added and whether updates can be made to existing files.
Prerequisites
The task only works with OFFLINE tables so it's configured in the OFFLINE table config.
Keep an eye on the data retention value set in table config when ingesting old datasets. If the value is set too low segments will be removed by the data retention task soon after they are ingested.
Pinot Table Configuration
The following sections show a few common use cases and how to customize the FileIngestionTask for them.
Bootstrap
Given a folder of many many files, ingest all of them into segments with indices and sizes as configured. During Proof of Concepts, it's common to bootstrap Pinot tables with very large datasets. For those cases we can assume the input files won't change during ingestion.
The task configurations are described below.
Access The Input Folder
Property Name | Required | Description |
---|---|---|
inputDirURI | Yes | The input folder. |
inputFormat | Yes | The input file format. |
includeFileNamePattern | No | To filter the target input files out, e.g. to exlude mark file _SUCCESS. It's empty by default to allow all files. The syntax is defined by FileSystem.getPathMatcher (opens in a new tab). |
input.fs.className | No | The className used to ingest the files. It's inferred per URI. |
input.fs.prop.<> | No | Those props are based on which fs.className is picked. |
No | Enable a sanity check on whether a file URI is directory. DEPRECATED, as the sanity check got optimized a lot, and it's enabled all the time. | |
skipFileIfFailToInitReader | No | Skip input file if record reader fails to init for it. Enabled by default, the names and count of skipping files are tracked in task metadata for debugging. |
push.mode | No | Tar (default) or Metadata . Use Metadata push mode if controller becomes a bottlelneck when tasks upload segments. |
The OSS Pinot docs (opens in a new tab) have explained the push modes clearly. In short, with Tar mode, the segments are uploaded from tasks to controller directly; with Metadata mode, the tasks extract metadata from segments, upload segments to deep store and segment metadata to controller separately. If only a few minion tasks run in parallel, Tar mode should just work fine. If many tens or hundreds of tasks run in parallel, then better use Metadata mode.
Control The Ingestion Parallelism
Property Name | Required | Description |
---|---|---|
tableMaxNumTasks | No | The max number of parallel tasks a table can run at any time. It's 10 by default. But if set to -1 explicitly, tasks are generated as many as needed to ingest all files in one round. |
taskMaxDataSize | No | The max number of bytes one task can process, to spread workload among parallel tasks. It's 1G by default. If set, taskMaxNumFiles is ignored. |
taskMaxNumFiles | No | The max number of files one task can process, to spread workload among parallel tasks. It's 32 by default. This config is ignored if taskMaxDataSize is set explicitly. |
maxNumRecordsPerSegment | No | The number of records to put in output segment, to control size of output segments. It's 5M by default. |
desiredSegmentSize | No | The segment size desired (Default is 500M. K for kilobyte, M megabyte, G for gigabyte). With this configuration, we do not need both maxNumRecordsPerSegment and taskMaxDataSize properties to be configured.. |
schedule | No | CRON per Quartz cron syntax (opens in a new tab) for when the job will be routinely triggered. If not set, the task is not cron scheduled but can still be triggered via endpoint /tasks/schedule. |
If set tableMaxNumTasks
to -1, tasks are generated as many as needed to ingest all files in one round, with taskMaxDataSize
to control the workload for each subtask. In this one-shot manner, user should still check subtask completion states to decide if another round is needed to retry failed subtasks, by simply triggering another task generation.
Running the task every 10min+ is a good starting point. You can then adjust the schedule and task parallelism to finish ingestion faster. We don't recommend running tasks very frequently (for example, every few seconds) given schedules of tasks across all tables are handled by a single cron scheduler in the Pinot Controller, and frequent schedules may delay other tasks.
A sample task configuration for S3
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
"input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
"input.fs.prop.region": "...",
"input.fs.prop.secretKey": "...",
"input.fs.prop.accessKey": "...",
"inputDirURI": "s3://..../",
"inputFormat": "parquet",
"includeFileNamePattern": "glob:**/*.parquet",
"taskMaxNumFiles": "32",
"tableMaxNumTasks": "10",
"maxNumRecordsPerSegment": "5000000",
"schedule": "0 */10 * * * ?"
}
}
},
"tableIndexConfig": {
"segmentNameGeneratorType": "normalizedDate"
}
The segmentNameGeneratorType (opens in a new tab) parameter is useful when the time column has date formatted string values instead of numeric epoch values. By default 'simple' generator is used and it only works with numeric epoch values.
A sample task configuration for GCS
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
"input.fs.className": "org.apache.pinot.plugin.filesystem.GcsPinotFS",
"input.fs.prop.projectId": "<project>",
"input.fs.prop.gcpKey": "/home/pinot/gcp/credentials.json",
"inputDirURI": "gs://<BucketName>/<Some prefix>/",
"includeFileNamePattern": "glob:**/*.parquet",
"schedule": "0 */10 * * * ?",
"inputFormat": "parquet",
"tableMaxNumTasks": "400",
"taskMaxDataSize": "1G",
"maxNumRecordsPerSegment": "2147483647",
"push.mode": "metadata"
}
}
},
In the following subsections, we'll discuss some advanced features and configs when processing the input data via FileIngestionTask.
Derive Columns from The Source File Paths
This is useful when the source file directory is partitioned on some dimensions present in the file path(eg: time with day as the smallest bucket). Those dimensions can be extracted as Pinot table columns.
Property Name | Required | Description |
---|---|---|
pathColumn.<seq> .name | Yes | The name of the value that is extracted from the file path or transformed from other values.
|
pathColumn.<seq> .output | No | Whether the value should be output to the Pinot table. |
pathColumn.<seq> .pathComponentStartIndex | No | The start path component index used to extract a sub part of the file path.
|
pathColumn.<seq> .numPathComponentsToExtract | No | The number of path components to extract starting from pathColumn.<seq>.pathComponentStartIndex . If it is not specified, it will extract path components from pathColumn.<seq>.pathComponentStartIndex to the last one |
pathColumn.<seq> .charStartIndex | No | The start character index used to further extract a sub part from the part determined by pathColumn.<seq>.pathComponentStartIndex & pathColumn.<seq>.numPathComponentsToExtract .
|
pathColumn.<seq> .numCharsToExtract | No | The number of chars to extract starting from pathColumn.<seq>.charStartIndex . If it is not specified, we will extract chars from pathColumn.<seq>.charStartIndex to the last one |
pathColumn.<seq> .regex | No | A regex to further extract one or more groups from the part determined by pathColumn.<seq>.pathComponentStartIndex & pathColumn.<seq>.numPathComponentsToExtract & pathColumn.<seq>.charStartIndex & pathColumn.<seq>.charEndIndex |
pathColumn.<seq> .transform | No | A function or an expression used to transform values referred by the names to a new value.
|
Note that <seq>
is an identifier, properties with the same <seq>
form a group and are used together to extract or transform a value.
To help readers understand how it works, here is an example. Say the file path is s3://bucket/year=2022/month=05/day=20/file.csv
, and a table has the following schema and configurations
{
"schemaName": "...",
"dateTimeFieldSpec": [
{
"name": "date",
"dataType": "STRING",
"format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy/MM/dd"
}
],
...
}
{
...
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
...
"pathColumn.0.name": "year",
"pathColumn.0.output": "false",
"pathColumn.0.pathComponentStartIndex": "1",
"pathColumn.0.numPathComponentsToExtract": "1",
"pathColumn.0.charStartIndex": "5",
"pathColumn.0.numCharsToExtract": "4",
"pathColumn.1.name": "month",
"pathColumn.1.output": "false",
"pathColumn.1.pathComponentStartIndex": "2",
"pathColumn.1.numPathComponentsToExtract": "1",
"pathColumn.1.regex": "^month=(.*)$")
"pathColumn.2.name": "day",
"pathColumn.2.output": "false",
"pathColumn.2.pathComponentStartIndex": "3",
"pathColumn.2.numPathComponentsToExtract": "1",
"pathColumn.2.charStartIndex": "4",
"pathColumn.2.numCharsToExtract": "2"
"pathColumn.3.name": "date",
"pathColumn.3.output": "true",
"pathColumn.3.transform": "${year}/${month}/${day}",
...
}
}
},
Those 4 groups (indicated by 0 ~ 3) of pathColumn
work as following:
- The first group extracts 2022 as "year"
- The second group extracts 01 as "month"
- The third group extracts 01 as "day"
- The fourth group combines "year", "month" and "day" as "date", and mark "date" as a column in the Pinot table
Partition Data by Sub-directory Level in The Source File Paths
This feature is useful to group data from different folders and files into the same segment or set of segments.
Property Name | Description |
---|---|
subDirLevelForDataPartition | Treated inputDirURI as the root directory (the level of sub-directory = 0), subDirLevelForDataPartition defines the level of sub-directory used to group/partition data.
|
Note that the configuration taskMaxNumFiles
is still respected when subDirLevelForDataPartition
is configured.
To help readers understand how it works, here is an exmaple. Given the following list of files and the following configurations
s3://bucket/a/c/file1.csv
s3://bucket/a/c/file2.csv
s3://bucket/a/file1.csv
s3://bucket/a/file2.csv
s3://bucket/b/c/file1.csv
s3://bucket/b/c/file2.csv
s3://bucket/b/c/file3.csv
s3://bucket/b/c/file4.csv
s3://bucket/b/file1.csv
s3://bucket/b/file2.csv
{
...
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
...
"inputDirURI": "s3://bucket",
"subDirLevelForDataPartition": "2",
"taskMaxNumFiles": "3",
...
}
}
},
- Files "s3://bucket/a/c/file1.csv" and "s3://bucket/a/c/file2.csv" will be group together
- Files "s3://bucket/a/file1.csv", "s3://bucket/a/file2.csv" and "s3://bucket/b/file1.csv" will be group together
- Files "s3://bucket/b/c/file1.csv", "s3://bucket/b/c/file2.csv" and "s3://bucket/b/c/file3.csv" will be group together
- File "s3://bucket/b/file2.csv" is a group
- File "s3://bucket/b/c/file4.csv" is a group
Combined together with the feature Derive Columns from The Source File Paths
, one can achieve the goal of partitionning data by columns extracted from file paths.
Other Implicit Configurations
Data records can be partitioned and sorted based on configs in tableIndexConfig (opens in a new tab) and transformed based on configs in ingestionConfig (opens in a new tab).
The FileIngestionTask uses the same segment processing library used by MergeRollupTask (opens in a new tab) and RealtimeToOfflineTask (opens in a new tab), which means it can merge/rollup the data records when generating the segments in similar way.
Those configs to tweak the segment processing library can be set in task configs
e.g. mergeType
, <metricName>.aggregationType
, windowStartMs
, windowEndMs
, etc.
And you can find some examples in the Pinot documentation for MergeRollupTask and RealtimeToOfflineTask.
Sync mode
This is for cases where the existing files are updated regularly and users want to keep the segments and files in sync.
Most of the task configs are the same as those used to Bootstrap, with a few exceptions like setting enableSync
.
The segmentNameGeneratorType
should be fixed
in sync mode, but it is set automatically if omit.
Below is a sample task configuration:
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
"input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
"input.fs.prop.region": "...",
"input.fs.prop.secretKey": "...",
"input.fs.prop.accessKey": "...",
"inputDirURI": "s3://.../",
"inputFormat": "json",
"includeFileNamePattern": "glob:**/*.json.gz",
"enableSync": "true",
"tableMaxNumTasks": "10",
"schedule": "0 */10 * * * ?"
}
}
},
"tableIndexConfig": {
"segmentNameGeneratorType": "fixed",
}
- One file is mapped to one segment. So the file is better to be well prepared ahead, like the size.
- File update is detected by file modification time alone.
- File deletion is not checked.
- The number of files is should be less than 30K, because one file generates one segment and it's better to limit the total number of segments for a table. However, this is a soft limit and can be tuned.
Sync mode with atomic switch
A sync mode with atomic switch ensures that data in the Pinot table is consistent both during and after file ingestion. The table remains unchanged (nothing ingested) or successfully updated (all files ingested, and new segments made available). After the ingestion task is complete, the table atomically switches to the new set of segments, and the old segments are purged.
Incremental ingestion
This is for cases where the new files can be added during ingestion, and you want the tasks to dectect and ingest the new files automatically. The sync mode can handle some of those cases subject to the aforementioned limitations. To be more scalable, you can enable incremental ingestion like below.
The input folders must be date partitioned.
e.g. <input>/YYYY/MM/DD/<filename>
<input>/2022/05/11/000000.csv
<input>/2022/05/11/000001.csv
<input>/2022/05/11/000002.csv
<input>/2022/05/12/000000.csv
<input>/2022/05/12/000001.csv
e.g. <input>/year=YYYY/month=MM/day=DD/<filename>
<input>/year=2022/month=05/day=11/000000.csv
<input>/year=2022/month=05/day=11/000001.csv
<input>/year=2022/month=05/day=11/000002.csv
<input>/year=2022/month=05/day=12/000000.csv
<input>/year=2022/month=05/day=12/000001.csv
Input folders should be added chronologically, and files should be added alphabetically. For example,
<input>/2022/05/11/
should be added before <input>/2022/05/12/
, and <input>/2022/05/11/000000.csv
should be added before <input>/2022/05/11/000001.csv
.
There are a few requirements on the date values in the file paths:
- The year, month and day are numbers. Year has four digits; month and day have two digits, so you'll need to pad with zeroes e.g.
05
. - It’s in year, month, day order. It doesn’t matter if you use
/
or-
as separator, or if you have a prefix like year=2022/month=05.
A sample task configuration is shown below.
In addition to the task configs explained for Bootstrap above, you need to add checkpointTTLSec
to enable incremental ingestion.
With this config, the checkpoints are kept around for as long as TTL so that files added within this time window can be detected.
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
"input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
"input.fs.prop.region": "...",
"input.fs.prop.secretKey": "...",
"input.fs.prop.accessKey": "...",
"inputDirURI": "s3://..../",
"inputFormat": "csv",
"includeFileNamePattern": "glob:**/*.csv",
"checkpointTTLSec": "259200",
"taskMaxNumFiles": "32",
"tableMaxNumTasks": "10",
"maxNumRecordsPerSegment": "5000000",
"schedule": "0 */10 * * * ?"
}
}
},
"tableIndexConfig": {
"segmentNameGeneratorType": "normalizedDate"
}
More about checkpointTTLSec:
- It's 0 by default, to remove checkpoints once the input files are confirmed as ingested and bump the watermark ahead, used by the Bootstrap use case. FYI, watermark is an internal track of the last file that's got ingested. Files before it are ingested and files are scanned in alphabetical order.
- It can be -1, to keep all checkpoints and disable watermark, so that files can be added in any daily subfolders and be detected and ingested. This way results in many checkpoints in ZooKeeper and should work fine only for small input folder.
- It can be a positive values like 259200 (3 days) to allow new files to be added in most recent few daily subfolders.
The incremental ingestion only detects whether new files are added. This is different from the sync mode where both new files and updates made on existing files are detected.
User driven ingestion
There are cases where the automation aforementioned doesn't work with the existing input folders. This could be because the folder is not date partitioned as assumed by the automation.
For these cases, APIs are provided for you to fully control the ingestion process.
Start ingestion
POST /tasks/execute with request body like below to config the ingestion job
{
"taskType": "FileIngestionTask",
"tableName": "myTable",
"taskName": "myTask-0",
"taskConfigs": {
"input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
"input.fs.prop.region": "...",
"input.fs.prop.secretKey": "...",
"input.fs.prop.accessKey": "...",
"inputDirURI": "s3://..../",
"inputFormat": "parquet",
"includeFileNamePattern": "glob:**/*.parquet",
"taskMaxNumFiles": "32",
"tableMaxNumTasks": "1024",
"maxNumRecordsPerSegment": "5000000"
}
}
- The taskName can be set explicitly or auto generated if omit.
- There is no need to set the cron schedule, as this POST call triggers the ingestion.
- Can set a large value for
tableMaxNumTasks
so that enough tasks are generated to ingest all input files in the folder. But no problem if more tasks are needed as we can call this endpoint again and again based on the ingestion progress.
Poll progress
GET /tasks/subtask/{taskName}/state with the taskName used to start ingestion
This returns if all subtasks are COMPLETED or any is failed with ERROR.
If there are failed subtasks, call /tasks/execute
again with same task configs but a different taskName.
Don't worry about duplicating or missing any files, as there is internal progress metadata to make sure files are ingested exactly once even though the same input folder is ingested again and again.
If all subtasks are completed, also call /tasks/execute
again with same task configs but a different taskName just to make sure no files are left behind.
The response will say that no subtasks are generated when all files in the folder are ingested.
Reset ingestion
Before ingesting a new folder, the progress metadata in ZooKeeper has to be reset. The following request deletes the progress metadata kept in ZK.
DELETE /tasks/{taskType}/{tableNameWithType}/metadata
Remove the schedule/cron parameter from the table's task configuration
Scheduled and user-driven task triggers should not co-exist. This step is to remove the scheduled trigger, so that the scheduled task no longer runs. In the table config, look for the following and remove the schedule parameter.
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
"input.fs.prop.secretKey": "",
"**schedule**": "",
Monitoring
There are metrics as listed in OSS Pinot docs (opens in a new tab) to be collected and plotted via tools like Prometheus and Grafana. It's also very handy to use Pinot UI to check task runtime status and completion results for example (opens in a new tab).