Skip to main content

Groovy Transformation Functions

In this recipe we'll learn how to use Groovy transformation functions to ingest a CSV file whose column names contain spaces.

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/groovy-transformation-functions

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster and Kafka Broker by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka Broker, and Zookeeper. You can find the docker-compose.yml file on GitHub.

Dataset

We're going to import a couple of JSON documents into Kafka and then from there into Pinot.

{"timestamp": "2019-10-09 21:25:25", "payload": {"firstName": "James", "lastName": "Smith", "before": {"id": 2}, "after": { "id": 3}}}
{"timestamp": "2019-10-10 21:33:25", "payload": {"firstName": "John", "lastName": "Gates", "before": {"id": 2}}}

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

Only the timestamp field from our data source maps to a schema column name - we'll be using transformation functions to populate the id and name columns.

config/schema.json
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "id",
"dataType": "INT"
},
{
"name": "name",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [{
"name": "timestamp",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

The table config indicates that data will be ingested from the Kafka events topic:

config/table.json
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-groovy:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{
"columnName": "id",
"transformFunction": "Groovy({def jsonSlurper = new groovy.json.JsonSlurper(); def object = jsonSlurper.parseText(new groovy.json.JsonBuilder(payload).toPrettyString()); def result = object.after == null ? Long.valueOf(object.before.id) : Long.valueOf(object.after.id); return result}, payload)"
},
{
"columnName": "name",
"transformFunction": "Groovy({def jsonSlurper = new groovy.json.JsonSlurper(); def object = jsonSlurper.parseText(new groovy.json.JsonBuilder(payload).toPrettyString()); return object.firstName + ' ' + object.lastName}, payload)"
}
]
},
"metadata": {}
}

Let's dive into the transformation functions defined under ingestionConfig.transformConfigs:

  • The id one extracts payload.after.id if the after property exists, otherwise it uses payload.before.id
  • The name one concatenates payload.firstName and payload.lastName

They both use Groovy's JSON parser to create an object from the payload, before using logic from the programming language to return the desired out.

tip

If you only need to do simple data transformation, you can use the in-built transformation functions.

Ingest Data into Kafka

We can run the following command to import a couple of documents into Kafka:

printf '{"timestamp": "2019-10-09 21:25:25", "payload": {"firstName": "James", "lastName": "Smith", "before": {"id": 2}, "after": { "id": 3}}}
{"timestamp": "2019-10-10 21:33:25", "payload": {"firstName": "John", "lastName": "Gates", "before": {"id": 2}}}\n' |
docker exec -i kafka-groovy /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events

Let's check those documents have been imported by running the following command:

docker exec -i kafka-groovy /opt/kafka/bin/kafka-console-consumer.sh   \
--bootstrap-server localhost:9092 \
--topic events \
--from-beginning \
--max-messages 2
{"timestamp": "2019-10-09 21:25:25", "payload": {"firstName": "James", "lastName": "Smith", "before": {"id": 2}, "after": { "id": 3}}}
{"timestamp": "2019-10-09 21:25:25", "payload": {"firstName": "James", "lastName": "Smith", "before": {"id": 2}}}

Output

Looks good so far.

Querying

Once that's completed, navigate to localhost:9000/#/query and click on the events table or copy/paste the following query:

select * 
from events
limit 10

You will see the following output:

idnametimestamp
3James Smith2019-10-09 21:25:25.0
2John Gates2019-10-10 21:33:25.0

Query Results