Chaining Transformation Functions

Chaining Transformation Functions

In this recipe we'll learn how to chain transformation functions during the data ingestion process.

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/chaining-transformation-functions

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Dataset

We're going to import the following JSON file:

{"payload": {"userId": "3287651__David Smith"}}
{"payload": {"userId": "4987622__Jenny Jones"}}
{"payload": {"userId": "1965900__Stephen Davis"}}

data/import.json

We're going to pull apart the numeric and string parts of the userId and store them in individual columns.

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName":"people",
  "dimensionFieldSpecs":[
    {
      "name":"userIdString",
      "dataType":"STRING"
    },
    {
      "name":"id",
      "dataType":"INT"
    },
    {
      "name":"name",
      "dataType":"STRING"
    }
  ]
}

config/schema.json

The userId column will store the userId value from the JSON document. The name and id columns will store values extracted from the userId.

We'll also have the following table config:

{
  "tableName":"people",
  "tableType":"OFFLINE",
  "segmentsConfig":{
    "replication":1,
    "schemaName":"people"
  },
  "ingestionConfig":{
    "transformConfigs":[
      {
        "columnName":"userId",
        "transformFunction":"jsonPathString(payload, '$.userId')"
      },
      {
        "columnName":"id",
        "transformFunction":"Groovy({Long.valueOf(userId.split('__')[0])}, userId)"
      },
      {
        "columnName":"name",
        "transformFunction":"Groovy({userId.split('__')[1]}, userId)"
      }
    ],
    "batchIngestionConfig":{
      "segmentIngestionType":"APPEND",
      "segmentIngestionFrequency":"DAILY"
    }
  },
  "tenants":{
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableIndexConfig":{
    "loadMode":"MMAP"
  },
  "metadata":{}
}

config/table.json

In this config we define transform configs (ingestionConfig.transformConfigs) that do the following:

  • Extract payload.userId using the jsonPathString (opens in a new tab) function.
  • Split the corresponding string on __ and extracting the id and name using Groovy transformation functions.

You can create the table and schema by running the following command:`

docker run \
   --network json \
   -v $PWD/config:/config \
    apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-json" \
    -exec

You should see a message similar to the following if everything is working correctly:

2022/02/28 10:08:40.333 INFO [AddTableCommand] [main] Executing command: AddTable -tableConfigFile /config/table.json -schemaFile /config/schema.json -controllerProtocol http -controllerHost 172.31.0.3 -controllerPort 9000 -user null -password [hidden] -exec
2022/02/28 10:08:40.747 INFO [AddTableCommand] [main] {"status":"Table people_OFFLINE succesfully added"}

Ingestion Job

Now we’re going to import the JSON file into Pinot. We'll do this with the following ingestion spec:

executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/import.json'
outputDirURI: '/opt/pinot/data/people/'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'json'
  className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
tableSpec:
  tableName: 'people'
pinotClusterSpecs:
  - controllerURI: 'http://pinot-controller-json:9000'
pushJobSpec:
  pushAttempts: 2
  pushRetryIntervalMillis: 1000

config/job-spec.yml

The import job will map fields in each JSON document to a corresponding column in the people schema. If one of the fields doesn't exist in the schema it will be skipped.

In this case our JSON documents only have one top level field, payload, which doesn't have a corresponding column in the schema. Instead, transformation functions extract the payload.userId field and then store parts of it in different columns.

You can run the following command to run the import:

docker run \
   --network json \
   -v $PWD/config:/config \
   -v $PWD/data:/data \
   apachepinot/pinot:1.0.0 LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec.yml

Querying

Once that's completed, navigate to localhost:9000/#/query (opens in a new tab) and click on the people table or copy/paste the following query:

select * 
from people 
limit 10

You will see the following output:

idnameuserId
3287651David Smith3287651__David Smith
4987622Jenny Jones4987622__Jenny Jones
1965900Stephen Davis1965900__Stephen Davis

Query Results