Ingest CSV files from a S3 bucket

Ingest CSV files from a S3 bucket

In this recipe, we will learn how Pinot can be configured to ingest CSV files from an AWS S3 bucket.

Prerequisites

You will need a running Pinot cluster locally to follow the code examples in this guide.

Make sure you have the following configured in the machine that you are going to deploy Apache Pinot.

Create an S3 bucket

Let's create an S3 bucket called pinot-demo to keep the source CSV files.

You can use the AWS CLI to do that.

aws s3 mb s3://pinot-demo

Copy CSV files

Create a CSV file called transcript.csv with the following content.

studentID,firstName,lastName,gender,subject,score,timestampInEpoch
200,Lucy,Smith,Female,Maths,3.8,1570863600000
200,Lucy,Smith,Female,English,3.5,1571036400000
201,Bob,King,Male,Maths,3.2,1571900400000
202,Nick,Young,Male,Physics,3.6,1572418800000

Now, copy that file into S3 bucket using the CLI.

aws s3 cp transcript.csv s3://pinot-demo/rawdata/transcript.csv

Configure Pinot

Now that we have the CSV file in the S3 bucket. Let's configure Pinot to ingest it and create a segment out of it. First, let's create a schema and a table definition for the transcript data set.

Create the transcript_schema.json as follows.

{
  "schemaName": "transcript",
  "dimensionFieldSpecs": [
    {
      "name": "studentID",
      "dataType": "INT"
    },
    {
      "name": "firstName",
      "dataType": "STRING"
    },
    {
      "name": "lastName",
      "dataType": "STRING"
    },
    {
      "name": "gender",
      "dataType": "STRING"
    },
    {
      "name": "subject",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "score",
      "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "timestampInEpoch",
    "dataType": "LONG",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}

Create the transcript_table.json as follows.

{
  "tableName": "transcript",
  "segmentsConfig" : {
    "timeColumnName": "timestampInEpoch",
    "timeType": "MILLISECONDS",
    "replication" : "1",
    "schemaName" : "transcript"
  },
  "tableIndexConfig" : {
    "invertedIndexColumns" : [],
    "loadMode"  : "MMAP"
  },
  "tenants" : {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableType":"OFFLINE",
  "metadata": {}
}

Upload the table using following command.

bin/pinot-admin.sh AddTable \
  -tableConfigFile transcript_table.json \
  -schemaFile transcript_schema.json -exec

Create the ingestion job spec file

Create a file called job-spec.yml and add the following content to it.

executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: 's3://pinot-demo/rawdata/'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: 's3://pinot-demo/segments'
overwriteOutput: true
pinotFSSpecs:
  - scheme: s3
    className: org.apache.pinot.plugin.filesystem.S3PinotFS
    configs:
      region: us-east-1
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'transcript'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'

Let's break down this specification further for better understanding.

The following configuration block instructs Pinot to use S3 as the underlying file system implementation, with the className pointing to the implementation class.

pinotFSSpecs:
  - scheme: s3
    className: org.apache.pinot.plugin.filesystem.S3PinotFS
    configs:
      region: us-east-1

Below, inputDirURI specifies the S3 bucket location where Pinot should ingest the data from. If you remember, we copied the transcript.csv file into that folder. The directive includeFileNamePattern filters all CSV files in that folder.

Once the ingestion is completed, Pinot writes the segments into the location specified by outputDirURI

inputDirURI: 's3://pinot-demo/rawdata/'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: 's3://pinot-demo/segments'

We are reading CSV files. Hence, the following configuration block uses the CSV record reader format.

recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'

Initiate the ingestion job

Now we have everything in place. Let's go ahead and kick off the ingestion by running:

bin/pinot-admin.sh LaunchDataIngestionJob \
    -jobSpecFile job-spec.yml

Once it is completed, you should see the transcript table populated with data in the Query Console (opens in a new tab).