Segment Import Task

Segment Import

The SegmentImportTask is an extended version of Apache Pinot's RealtimeToOfflineSegmentsTask (opens in a new tab). Both are built with the Pinot Minion framework to convert segments from real-time tables to offline tables.

The SegmentImportTask does this conversion with multiple tasks in parallel, which means it's less likely to lag behind. This task is a drop-in replacement for the RealtimeToOfflineSegmentsTask.

Prerequisites

Like RealtimeToOfflineSegmentsTask, both real-time and offline tables should be created. But, where the RealtimeToOfflineSegmentsTask is configured on the real-time table, the SegmentImportTask is configured on the OFFLINE table, following the convention that minion tasks to ingest data are configured on the destination tables.

Pinot Table Configuration

The task configs used by RealtimeToOfflineSegmentsTask (opens in a new tab) are supported by SegmentImportTask as well. There are also some new parameters that control task parallelism as listed below.

Property NameRequiredDescription
tableMaxNumTasksNoThe max number of parallel tasks a table can run at any time. It's 10 by default.
maxNumRecordsPerTaskNoThe max number of records one task can process, to spread workload among parallel tasks. It's 50M by default.
initialWatermarkMsNoWhere to start the task and by default starting from the smallest start time of real-time segments. It's -1 by default.
desiredSegmentSizeNoThe segment size desired (Default is 500M. K for kilobyte, M megabyte, G for gigabyte).
scheduleNoCRON per Quartz cron syntax (opens in a new tab) for when the job will be routinely triggered. If not set, the task is not cron scheduled but can still be triggered via endpoint /tasks/schedule.

Below is a sample task configuration:

    "task": {
      "taskTypeConfigsMap": {
        "SegmentImportTask": {
          "bucketTimePeriod": "1d",
          "bufferTimePeriod": "1d",
          "schedule": "0 */10 * * * ?",
          "maxNumRecordsPerTask": "50000000",
          "maxNumRecordsPerSegment": "5000000",
          "tableMaxNumTasks": "4",
          "initialWatermarkMs": 1654680177745
        }
      }
    },
💡

Usually, run the task every 10min+ is a good starting point. You can always adjust the schedule and task parallelism to make sure the task can catch up with the load from the real-time table.

Don't run the task only few times a day when the bucketTimePeriod is set to 1d, as the task might just keep lagging until data retention kicks in and deletes the oldest segments in real-time table, causing data loss.

Migrating from RealtimeToOfflineSegmentsTask

The key step is to find the current watermark of the RealtimeToOfflineSegmentsTask and set initialWatermarkMs to that value before starting the SegmentImportTask.

  1. Remove the RealtimeToOfflineSegmentsTask task config from the REALTIME table and wait for existing tasks to finish.
  2. Use tasks API /tasks/RealtimeToOfflineSegmentsTask/{tableNameWithType}/metadata to get current watermark.
  3. Add SegmentImportTask task configs in offline table, and set initialWatermarkMs to what we get in the last step.

During this process, you can check Grafana dashboards to monitor that the RealtimeToOfflineSegmentsTask has stopped and SegmentImportTask started.