Ingesting JSON files from Kafka

Ingesting JSON files from Kafka

To learn how to ingest JSON files from Apache Kafka into Pinot, watch the following video, or complete the tutorial below, starting with Prerequites.

Mark Needham (opens in a new tab) shows how to ingest JSON files

If you have complex JSON documents with a nested structure, see how to ingest complex JSON documents with a nested structure from Kafka into Pinot (opens in a new tab).

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/ingest-json-files-kafka

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Dataset

We're going to import the following JSON files:

{"title": "Valentine's Day", "genre": "Comedy", "year": 2010, "releaseDate": "2010-02-12 00:00:00", "budget": 52000000, "boxOffice": 216500000}
{"title": "The Ugly Truth", "genre": "Comedy", "year": 2009, "releaseDate": "2010-04-14 00:00:00", "budget": 38000000, "boxOffice": 205300000}
{"title": "P.S. I Love You", "genre": "Romance", "year": 2007, "releaseDate": "2007-12-21 00:00:00", "budget": 30000000, "boxOffice": 156800000}
{"title": "Dear John", "genre": "Drama", "year": 2010, "releaseDate": "2010-04-14 00:00:00", "budget": 25000000, "boxOffice": 115000000}
{"title": "The Curious Case of Benjamin Button", "genre": "Fantasy", "year": 2008, "releaseDate": "2008-12-25 00:00:00", "budget": 167000000, "boxOffice": 335800000}

data/import1.jsonl

{"title": "Pirates of the Caribbean: Salazar's Revenge", "genre": "Action", "year": 2017, "releaseDate": "2017-05-26 00:00:00", "budget": 230000000, "boxOffice": 794881442}
{"title": "The Hunger Games", "genre": "Action", "year": 2012, "releaseDate": "2012-03-23 00:00:00", "budget": 78000000, "boxOffice": 694394724}
{"title": "Pride & Prejudice", "genre": "Romance", "year": 2005, "releaseDate": "2005-09-16 00:00:00", "budget": 28000000, "boxOffice": 121616555}

data/import2.jsonl

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "movies",
  "dimensionFieldSpecs": [
    {
      "name": "title",
      "dataType": "STRING"
    },
    {
      "name": "genre",
      "dataType": "STRING"
    },
    {
      "name": "year",
      "dataType": "INT"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "budget",
      "dataType": "INT"
    },
    {
      "name": "boxOffice",
      "dataType": "INT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "releaseDate",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

config/schema.json

We'll also have the following table config:

{
  "tableName": "movies",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "releaseDate",
    "timeType": "MILLISECONDS",
    "schemaName": "movies",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.broker.list": "kafka-json:9093",
      "stream.kafka.consumer.type": "lowLevel",
      "stream.kafka.topic.name": "events",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    },
    "loadMode": "MMAP"
  },
  "task": {
    "taskTypeConfigsMap": {}
  },
  "metadata": {
    "customConfigs": {}
  }
}

config/table.json

We need to tell Pinot where our Kafka cluster lives as well as the topic that we wish to pull events from. Finally, we need to specify an offset value, which indicates where Pinot should start pulling data in each topic partition. A value of smallest means it will start from the earliest offset. A value of largest means it will start from the latest offset.

You can create the table and schema by running the following command:`

docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -schemaFile /config/schema.json \
  -exec

Importing data

Now we're going to import the JSON files into Kafka:

docker exec -i kafka-json kafka-console-producer.sh \
  --bootstrap-server kafka-json:9092 \
  --topic events < data/import1.jsonl
docker exec -i kafka-json kafka-console-producer.sh \
  --bootstrap-server kafka-json:9092 \
  --topic events < data/import2.jsonl

Querying

Once that's completed, navigate to localhost:9000/#/query (opens in a new tab) and click on the movies table or copy/paste the following query:

select *
from movies
limit 10

You will see the following output:

boxOfficebudgetgenrereleaseDatetitleyear
21650000052000000Comedy2010-02-12 00:00:00.0Valentine's Day2010
20530000038000000Comedy2010-04-14 00:00:00.0The Ugly Truth2009
15680000030000000Romance2007-12-21 00:00:00.0P.S. I Love You2007
11500000025000000Drama2010-04-14 00:00:00.0Dear John2010
335800000167000000Fantasy2008-12-25 00:00:00.0The Curious Case of Benjamin Button2008
794881442230000000Action2017-05-26 00:00:00.0Pirates of the Caribbean: Salazar's Revenge2017
69439472478000000Action2012-03-23 00:00:00.0The Hunger Games2012
12161655528000000Romance2005-09-16 00:00:00.0Pride & Prejudice2005

Query Results