Chaining Transformation Functions
In this recipe we'll learn how to chain transformation functions during the data ingestion process.
Pinot Version | 0.9.3 |
Code | startreedata/pinot-recipes/chaining-transformation-functions |
Pre-requisites
You will need to install Docker locally to follow the code examples in this guide.
Download Recipe
First, clone the GitHub repository to your local machine and navigate to this recipe:
git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/chaining-transformation-functions
If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml file on GitHub.
Dataset
We're going to import the following JSON file:
{"payload": {"userId": "3287651__David Smith"}}
{"payload": {"userId": "4987622__Jenny Jones"}}
{"payload": {"userId": "1965900__Stephen Davis"}}
We're going to pull apart the numeric and string parts of the userId
and store them in individual columns.
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName":"people",
"dimensionFieldSpecs":[
{
"name":"userIdString",
"dataType":"STRING"
},
{
"name":"id",
"dataType":"INT"
},
{
"name":"name",
"dataType":"STRING"
}
]
}
The userId
column will store the userId
value from the JSON document.
The name
and id
columns will store values extracted from the userId
.
We'll also have the following table config:
{
"tableName":"people",
"tableType":"OFFLINE",
"segmentsConfig":{
"replication":1,
"schemaName":"people"
},
"ingestionConfig":{
"transformConfigs":[
{
"columnName":"userId",
"transformFunction":"jsonPathString(payload, '$.userId')"
},
{
"columnName":"id",
"transformFunction":"Groovy({Long.valueOf(userId.split('__')[0])}, userId)"
},
{
"columnName":"name",
"transformFunction":"Groovy({userId.split('__')[1]}, userId)"
}
],
"batchIngestionConfig":{
"segmentIngestionType":"APPEND",
"segmentIngestionFrequency":"DAILY"
}
},
"tenants":{
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig":{
"loadMode":"MMAP"
},
"metadata":{}
}
In this config we define transform configs (ingestionConfig.transformConfigs
) that do the following:
- Extract
payload.userId
using the jsonPathString function. - Split the corresponding string on
__
and extracting theid
andname
using Groovy transformation functions.
You can create the table and schema by running the following command:`
docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json \
-exec
You should see a message similar to the following if everything is working correctly:
2022/02/28 10:08:40.333 INFO [AddTableCommand] [main] Executing command: AddTable -tableConfigFile /config/table.json -schemaFile /config/schema.json -controllerProtocol http -controllerHost 172.31.0.3 -controllerPort 9000 -user null -password [hidden] -exec
2022/02/28 10:08:40.747 INFO [AddTableCommand] [main] {"status":"Table people_OFFLINE succesfully added"}
Ingestion Job
Now we’re going to import the JSON file into Pinot. We'll do this with the following ingestion spec:
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/import.json'
outputDirURI: '/opt/pinot/data/people/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'json'
className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
tableSpec:
tableName: 'people'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
The import job will map fields in each JSON document to a corresponding column in the people
schema.
If one of the fields doesn't exist in the schema it will be skipped.
In this case our JSON documents only have one top level field, payload
, which doesn't have a corresponding column in the schema.
Instead, transformation functions extract the payload.userId
field and then store parts of it in different columns.
You can run the following command to run the import:
docker exec -it pinot-controller-json bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml
Querying
Once that's completed, navigate to localhost:9000/#/query and click on the people
table or copy/paste the following query:
select *
from people
limit 10
You will see the following output:
id | name | userId |
---|---|---|
3287651 | David Smith | 3287651__David Smith |
4987622 | Jenny Jones | 4987622__Jenny Jones |
1965900 | Stephen Davis | 1965900__Stephen Davis |
Query Results