Ingesting JSON files from Kafka
In this recipe we'll learn how to ingest JSON documents from Apache Kafka.
Mark Needham shows how to ingest JSON files
Pinot Version | 0.10.0 |
Code | startreedata/pinot-recipes/ingest-json-files-kafka |
Pre-requisites
You will need to install Docker locally to follow the code examples in this guide.
Download Recipe
First, clone the GitHub repository to your local machine and navigate to this recipe:
git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/ingest-json-files-kafka
If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.
Dataset
We're going to import the following JSON files:
{"title": "Valentine's Day", "genre": "Comedy", "year": 2010, "releaseDate": "2010-02-12 00:00:00", "budget": 52000000, "boxOffice": 216500000}
{"title": "The Ugly Truth", "genre": "Comedy", "year": 2009, "releaseDate": "2010-04-14 00:00:00", "budget": 38000000, "boxOffice": 205300000}
{"title": "P.S. I Love You", "genre": "Romance", "year": 2007, "releaseDate": "2007-12-21 00:00:00", "budget": 30000000, "boxOffice": 156800000}
{"title": "Dear John", "genre": "Drama", "year": 2010, "releaseDate": "2010-04-14 00:00:00", "budget": 25000000, "boxOffice": 115000000}
{"title": "The Curious Case of Benjamin Button", "genre": "Fantasy", "year": 2008, "releaseDate": "2008-12-25 00:00:00", "budget": 167000000, "boxOffice": 335800000}
{"title": "Pirates of the Caribbean: Salazar's Revenge", "genre": "Action", "year": 2017, "releaseDate": "2017-05-26 00:00:00", "budget": 230000000, "boxOffice": 794881442}
{"title": "The Hunger Games", "genre": "Action", "year": 2012, "releaseDate": "2012-03-23 00:00:00", "budget": 78000000, "boxOffice": 694394724}
{"title": "Pride & Prejudice", "genre": "Romance", "year": 2005, "releaseDate": "2005-09-16 00:00:00", "budget": 28000000, "boxOffice": 121616555}
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "movies",
"dimensionFieldSpecs": [
{
"name": "title",
"dataType": "STRING"
},
{
"name": "genre",
"dataType": "STRING"
},
{
"name": "year",
"dataType": "INT"
}
],
"metricFieldSpecs": [
{
"name": "budget",
"dataType": "INT"
},
{
"name": "boxOffice",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "releaseDate",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
We'll also have the following table config:
{
"tableName": "movies",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "releaseDate",
"timeType": "MILLISECONDS",
"schemaName": "movies",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.broker.list": "kafka-json:9093",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "events",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
},
"loadMode": "MMAP"
},
"task": {
"taskTypeConfigsMap": {}
},
"metadata": {
"customConfigs": {}
}
}
We need to tell Pinot where our Kafka cluster lives as well as the topic that we wish to pull events from.
Finally, we need to specify an offset value, which indicates where Pinot should start pulling data in each topic partition.
A value of smallest
means it will start from the earliest offset.
A value of largest
means it will start from the latest offset.
You can create the table and schema by running the following command:`
docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json \
-exec
Importing data
Now we're going to import the JSON files into Kafka:
docker exec -i kafka-json kafka-console-producer.sh \
--bootstrap-server kafka-json:9092 \
--topic events < data/import1.jsonl
docker exec -i kafka-json kafka-console-producer.sh \
--bootstrap-server kafka-json:9092 \
--topic events < data/import2.jsonl
Querying
Once that's completed, navigate to localhost:9000/#/query and click on the movies
table or copy/paste the following query:
select *
from movies
limit 10
You will see the following output:
boxOffice | budget | genre | releaseDate | title | year |
---|---|---|---|---|---|
216500000 | 52000000 | Comedy | 2010-02-12 00:00:00.0 | Valentine's Day | 2010 |
205300000 | 38000000 | Comedy | 2010-04-14 00:00:00.0 | The Ugly Truth | 2009 |
156800000 | 30000000 | Romance | 2007-12-21 00:00:00.0 | P.S. I Love You | 2007 |
115000000 | 25000000 | Drama | 2010-04-14 00:00:00.0 | Dear John | 2010 |
335800000 | 167000000 | Fantasy | 2008-12-25 00:00:00.0 | The Curious Case of Benjamin Button | 2008 |
794881442 | 230000000 | Action | 2017-05-26 00:00:00.0 | Pirates of the Caribbean: Salazar's Revenge | 2017 |
694394724 | 78000000 | Action | 2012-03-23 00:00:00.0 | The Hunger Games | 2012 |
121616555 | 28000000 | Romance | 2005-09-16 00:00:00.0 | Pride & Prejudice | 2005 |
Query Results