Consistent Push

Consistent Push in File Ingestion Task

Overview

Consistent Push ensures that data in the Pinot table remains either unchanged or fully updated, with all files ingested and new segments activated. Once the ingestion task is completed, the table seamlessly switches to the new segments, and the old segments are purged to maintain data consistency.

Configuring File Ingestion Task Parameters for Consistent Push

ParameterTypeDefaultPurposeBehavior/When to Use
consistentPushSwapEnabledMandatory (Boolean)falseEnables a full table refresh to atomically replace the entire table with new files.Overrides consistentPushEnabled. Deletes data in Pinot if corresponding source files are deleted.
consistentPushEnabledMandatory (Boolean)falseAllows atomic ingestion of a new set of files without full table replacement.Manages atomic transitions; retains original behavior for sync or append operations.
clearConsistentPushStateOptional (Boolean)falseClears the metadata and progress state associated with the consistent push session in ZK.Use for debugging or resetting corrupted metadata. Default retains 2 ingestion sessions for troubleshooting.
consistentPushMaxRetriesOptional (Integer, max: INT_MAX)0Configures the max retries for tasks if file ingestion fails.Useful for large ingestions to avoid reprocessing successful files when retrying after failures.
retryCount (read-only)IntegerTracks the number of retries for a task.Stored in ZK at /pinot/PROPERTYSTORE/MINION_TASK_METADATA/<tableName>/FileIngestionTask/consistentPush/<cpSessionNode>.
state (read-only)Enum (INIT, IN_PROGRESS, SWITCH, DONE)Tracks the current status of an ingestion session.INIT: Session starts.
IN_PROGRESS: Ingestion ongoing.
SWITCH: Preparing final ingestion stage.
DONE: All data ingested and queryable. Retries required if state isn't DONE.

Below is a sample task configuration.

 
"task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "...",
          "input.fs.prop.secretKey": "...",
          "input.fs.prop.accessKey": "...",
          "inputDirURI": "s3://.../",
          "inputFormat": "json",
          "includeFileNamePattern": "glob:**/*.json.gz",
          "consistentPushEnabled": "true",
          "schedule": "0 */30 * * * ?"
        }
      }
    },
 

Consistent Push with Retry on failure

The retry mechanism is implemented to reuse partial progress and avoid re-reading successful files. It ensures data consistency and completeness in file ingestion tasks. Consistent Push handles failures effectively, ensuring all files are ingested without compromising consistency. The goal of this mechanism is to ingest all or none of the files, maintaining atomicity across retries.

Why Retry is Needed

In typical file ingestion, if any subtask fails, the files linked to those tasks remain un-ingested. Consistent push ensures no partial or inconsistent ingestion. Retrying ingestion is vital to:

  1. Ensure atomicity (all-or-none ingestion) in consistent push flows.
  2. Prevent the loss of previously ingested data.
  3. Optimize ingestion tasks by reusing the progress of successful sub-tasks in the previous session.

Consistent Push Retry Handling

To avoid restarting ingestion from scratch due to a few failed subtasks, Pinot enables extending a consistent push session across multiple task triggers:

  1. On Failure: Failed subtasks do not cause progress loss. Subsequent task triggers are aware of previous progress and only generate subtasks for unread files.
  2. Retry Cycle: This process continues until all files are ingested or retry limits are met.
  3. Session Completion: Upon successful ingestion, segments become available for querying, and the next trigger initiates a new session.
  4. Retry Exhaustion: If retries are exhausted without successful ingestion, Pinot clears session progress and begins a new ingestion attempt from scratch, that ingests all files again.

Key Points to Note

  1. The default nature of consistent push is 0 retries. This means it will throw away the successfully read files, clean up the earlier session completely, and re-ingest everything.
  2. Consistent Push can be used with both Append Mode and Sync Mode.
  3. Retry Tracking: Each retry count is stored within consistent push metadata.
  4. Manual/Cron Triggers: Retries do not auto-trigger to allow issue resolution, it has to be configured. By default, the retry will happen with the next schedule.
  5. Data Consistency During Session: Files processed in prior retries are not reprocessed if they’re modified. Only unprocessed files are picked up in subsequent retries, ensuring data integrity across retries.
  6. Single Session Limit: Only one ingestion session per table can run when consistent push is enabled, with subsequent triggers paused until the current session completes.
  7. Backward Compatibility: For sync mode compatibility, enableAtomicUpload functions as previously configured.
  8. It’s advisable to closely monitor Consistent Push usage when handling data volumes over 1TB. This process has been tested with 10,000 segments, however, not limited to these many segments.
  9. Since segments aren’t available for querying until the last segment is fully uploaded in Consistent Push, it may appear that more space is being used than expected during the ingestion process.

This retry mechanism ensures robust and consistent data ingestion in scenarios where atomicity is critical, helping maintain data integrity while maximizing ingestion efficiency.

Example

A File Ingestion Task is scheduled to run every 4 hours, the total number of retry attempts configured is 3 with the below config: consistentPushEnabled = true consistentPushMaxRetries = 3

Scene 1: Initial Ingestion Attempt: The File Ingestion Task(FIT_1) is scheduled for 12 PM.

  • 5 new files found F1, F2, F3, F4, F5
  • F1 and F2 are ingested successfully, but F3, F4, and F5 fail.
  • As a result of failure, due to Consistent Push, F1 and F2 remain non-queryable, and no partial data is available for querying.

Scene 2 - At 3 PM, after the initial failure, a new File, F6 is added

  • FIT_Retry_1 will run at 4 PM because the cron scheduled for FIT_1 is every 4 hours.
  • FIT_Retry_1 will retain the original session of FIT_1 and will retry the remaining F3, F4, and F5, and includes F6 as well.

Scene 3 - At 3 PM, A failed file F3 is rewritten after the initial failure, let’s call the file F3.1

  • FIT_Retry_1 will run at 4 PM because the cron scheduled for FIT_1 is every 4 hours.
  • FIT_Retry_1 will retain the original session of FIT_1 and now recognizes this updated version, F3.1, and ingests F3.1 along with F4 and F5.

Scene 4 - At 3 PM, A successful file F1 is rewritten after the initial failure, let’s call the file F1.1

  • FIT_Retry_1 will run at 4 PM because the cron scheduled for FIT_1 is every 4 hours.
  • FIT_Retry_1 will retain the original session of FIT_1 however, it now doesn’t recognize this updated version, F1.1, and ingests only F3, F4, and F5. This Retry succeeds.
  • The next scheduled run will be at 8 PM because the cron scheduled for FIT is every 4 hours.
  • The new FIT_2 will not retain any context from the earlier session of FIT_1 and will process F1.1 along with other files, if any.

Scene 5 - At 3 PM, A successfully ingested file, F1 is deleted from the source.

  • FIT_Retry_1 will retain the original session of FIT_1, run at 4 PM, and it will process the remaining files from the failed ingestion session: F3, F4, and F5. As F1 has been removed from the source, the segment corresponding to F1 is also removed during this retry, ensuring that the data set reflects the current state of the source files.

Scene 6 - At 3 PM, A failed file, F3 is deleted from the source.

  • When FIT_Retry_1 runs at 4 PM, it processes F4 and F5. Since F3 no longer exists, FIT_Retry_1 has no context within this retry. This means that F3 will be omitted, and no segment is created for it, ensuring consistency between the source data and the ingestion process.