Ingesting JSON files from Kafka
To learn how to ingest JSON files from Apache Kafka into Pinot, watch the following video, or complete the tutorial below, starting with Prerequites.
Mark Needham (opens in a new tab) shows how to ingest JSON files
If you have complex JSON documents with a nested structure, see how to ingest complex JSON documents with a nested structure from Kafka into Pinot (opens in a new tab).
Pinot Version | 0.10.0 |
Code | startreedata/pinot-recipes/ingest-json-files-kafka |
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/ingest-json-files-kafka
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Dataset
We're going to import the following JSON files:
{"title": "Valentine's Day", "genre": "Comedy", "year": 2010, "releaseDate": "2010-02-12 00:00:00", "budget": 52000000, "boxOffice": 216500000}
{"title": "The Ugly Truth", "genre": "Comedy", "year": 2009, "releaseDate": "2010-04-14 00:00:00", "budget": 38000000, "boxOffice": 205300000}
{"title": "P.S. I Love You", "genre": "Romance", "year": 2007, "releaseDate": "2007-12-21 00:00:00", "budget": 30000000, "boxOffice": 156800000}
{"title": "Dear John", "genre": "Drama", "year": 2010, "releaseDate": "2010-04-14 00:00:00", "budget": 25000000, "boxOffice": 115000000}
{"title": "The Curious Case of Benjamin Button", "genre": "Fantasy", "year": 2008, "releaseDate": "2008-12-25 00:00:00", "budget": 167000000, "boxOffice": 335800000}
data/import1.jsonl
{"title": "Pirates of the Caribbean: Salazar's Revenge", "genre": "Action", "year": 2017, "releaseDate": "2017-05-26 00:00:00", "budget": 230000000, "boxOffice": 794881442}
{"title": "The Hunger Games", "genre": "Action", "year": 2012, "releaseDate": "2012-03-23 00:00:00", "budget": 78000000, "boxOffice": 694394724}
{"title": "Pride & Prejudice", "genre": "Romance", "year": 2005, "releaseDate": "2005-09-16 00:00:00", "budget": 28000000, "boxOffice": 121616555}
data/import2.jsonl
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "movies",
"dimensionFieldSpecs": [
{
"name": "title",
"dataType": "STRING"
},
{
"name": "genre",
"dataType": "STRING"
},
{
"name": "year",
"dataType": "INT"
}
],
"metricFieldSpecs": [
{
"name": "budget",
"dataType": "INT"
},
{
"name": "boxOffice",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "releaseDate",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
config/schema.json
We'll also have the following table config:
{
"tableName": "movies",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "releaseDate",
"timeType": "MILLISECONDS",
"schemaName": "movies",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.broker.list": "kafka-json:9093",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "events",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
},
"loadMode": "MMAP"
},
"task": {
"taskTypeConfigsMap": {}
},
"metadata": {
"customConfigs": {}
}
}
config/table.json
We need to tell Pinot where our Kafka cluster lives as well as the topic that we wish to pull events from.
Finally, we need to specify an offset value, which indicates where Pinot should start pulling data in each topic partition.
A value of smallest
means it will start from the earliest offset.
A value of largest
means it will start from the latest offset.
You can create the table and schema by running the following command:`
docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json \
-exec
Importing data
Now we're going to import the JSON files into Kafka:
docker exec -i kafka-json kafka-console-producer.sh \
--bootstrap-server kafka-json:9092 \
--topic events < data/import1.jsonl
docker exec -i kafka-json kafka-console-producer.sh \
--bootstrap-server kafka-json:9092 \
--topic events < data/import2.jsonl
Querying
Once that's completed, navigate to localhost:9000/#/query (opens in a new tab) and click on the movies
table or copy/paste the following query:
select *
from movies
limit 10
You will see the following output:
boxOffice | budget | genre | releaseDate | title | year |
---|---|---|---|---|---|
216500000 | 52000000 | Comedy | 2010-02-12 00:00:00.0 | Valentine's Day | 2010 |
205300000 | 38000000 | Comedy | 2010-04-14 00:00:00.0 | The Ugly Truth | 2009 |
156800000 | 30000000 | Romance | 2007-12-21 00:00:00.0 | P.S. I Love You | 2007 |
115000000 | 25000000 | Drama | 2010-04-14 00:00:00.0 | Dear John | 2010 |
335800000 | 167000000 | Fantasy | 2008-12-25 00:00:00.0 | The Curious Case of Benjamin Button | 2008 |
794881442 | 230000000 | Action | 2017-05-26 00:00:00.0 | Pirates of the Caribbean: Salazar's Revenge | 2017 |
694394724 | 78000000 | Action | 2012-03-23 00:00:00.0 | The Hunger Games | 2012 |
121616555 | 28000000 | Romance | 2005-09-16 00:00:00.0 | Pride & Prejudice | 2005 |
Query Results