Skip to main content

How to index a JSON column

In this recipe we'll learn how to configure an index for a JSON column.

note

Pre-requisites

You will need to install Docker to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/json-index

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.

Data generator

This recipe contains a data generator that creates events with data about people.

It uses the Faker library, so you'll first need to install that:

pip install faker

You can generate data by running the following command:

python datagen.py 2>/dev/null  | head -n1 | jq

Output is shown below:

{
"ts": 1680172162791,
"person": {
"id": "ead572cb-c8bc-457a-b331-4f380c5ebe18",
"name": "Melissa Howard",
"email": "daniel75@example.com",
"age": 33,
"address": {
"street_address": "974 Michael Hollow",
"city": "Johnstonport",
"state": "South Dakota",
"country": "Latvia"
},
"phone_number": "+1-697-077-6893x6562",
"job": {
"company": "Ramirez-Hunt",
"position": "Communications engineer",
"department": "iterate efficient e-markets"
},
"interests": [
"Swimming"
],
"friend_ids": [
"88a696de-34d1-4300-ac3c-3084a6bdceeb"
]
}
}

Kafka ingestion

We're going to ingest this data into an Apache Kafka topic using the kcat command line tool. We'll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t people -Kø

We can check that Kafka has some data by running the following command:

docker exec -it kafka-querysegment kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events

We'll see something like the following:

events:0:19960902

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
"schemaName": "people",
"dimensionFieldSpecs": [
{
"name": "person",
"dataType": "JSON"
}
],
"metricFieldSpecs": [],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}

Our schema has only two columns - one for the timestamp and another one that stores the person.

Now for the table config:

{
"tableName": "people",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "people",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"jsonIndexConfigs": {
"person": {
"maxLevels": 2,
"excludeArray": false,
"disableCrossArrayUnnest": true,
"includePaths": null,
"excludePaths": null,
"excludeFields": ["age"]
}
},
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "people",
"stream.kafka.broker.list": "kafka-jsonindex:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows":"100000",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {

},
"tenants": {},
"metadata": {}
}

This highlighted section contains the config for the JSON index. An explanation of each of the config parameters is shown below:

Config KeyDescriptionTypeDefault
maxLevelsMax levels to flatten the json object (array is also counted as one level)int-1 (unlimited)
excludeArrayWhether to exclude array when flattening the objectbooleanfalse (include array)
disableCrossArrayUnnestWhether to not unnest multiple arrays (unique combination of all elements)booleanfalse (calculate unique combination of all elements)
includePathsOnly include the given paths, e.g. "$.a.b", "$.a.c[*]" (mutual exclusive with excludePaths). Paths under the included paths will be included, e.g. "$.a.b.c" will be included when "$.a.b" is configured to be included.Set<String>null (include all paths)
excludePathsExclude the given paths, e.g. "$.a.b", "$.a.c[*]" (mutual exclusive with includePaths). Paths under the excluded paths will also be excluded, e.g. "$.a.b.c" will be excluded when "$.a.b" is configured to be excluded.Set<String>null (include all paths)
excludeFieldsExclude the given fields, e.g. "b", "c", even if it is under the included paths.Set<String>null (include all fields)

So we are including all fields except age in the JSON index.

tip

Fields included in the JSON index can be filtered using the JSON_MATCH function. If you use this function with a field that isn't included in the index, it won't return any records.

We'll create the table by running the following:

docker run \
--network jsonindex \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-jsonindex" \
-exec

Querying by JSON index

Let's now try to query this table using the JSON index. JSON indexes support the following predicates: =, <>, IN, and NOT IN

State = Kentucky

select ts, person
from people
WHERE JSON_MATCH(person, '"$.address.state" = ''Kentucky''')
LIMIT 3
tsperson
2023-03-30 09:40:59.788{"address":{"street_address":"8173 Diaz Hollow Apt. 175","country":"British Indian Ocean Territory (Chagos Archipelago)","city":"East Carlos","state":"Kentucky"},"friend_ids":["de27ec51-2895-4044-98bc-5643853d7d0d","84080eb3-1c17-4b4a-ada7-a764ed44cef7"],"name":"Robert Alvarado","phone_number":"085.136.9039x6810","id":"874a4543-f528-49e2-85a8-d31a6cbae269","job":{"company":"Gray, Huerta and Moyer","position":"Chemical engineer","department":"transition collaborative networks"},"interests":["Swimming","Meditation","Reading","Cycling","Reading"],"email":"jamiebarker@example.net","age":52}
2023-03-30 09:40:59.823{"address":{"street_address":"277 Robert Crossroad Apt. 622","country":"Haiti","city":"Port Karenshire","state":"Kentucky"},"friend_ids":["2285322d-0049-4386-b41c-9f96a0ed5038"],"name":"Diana Thomas","phone_number":"171-966-4122","id":"b996770e-478c-412d-89bd-d3a085140016","job":{"company":"Diaz LLC","position":"Scientist, research (maths)","department":"deploy granular convergence"},"interests":["Sports","Photography","Art","Photography","Reading"],"email":"ncook@example.org","age":40}
2023-03-30 09:40:59.843{"address":{"street_address":"85628 William Curve","country":"Djibouti","city":"Gregoryborough","state":"Kentucky"},"friend_ids":["1a7ee9e4-6daa-4994-a077-3170bffab04f"],"name":"Monica Harrell","phone_number":"(120)253-7632","id":"c71df52f-49c4-4d61-b6d6-d94134c92e8b","job":{"company":"Larson-Mcdaniel","position":"Ranger/warden","department":"seize leading-edge web services"},"interests":["Traveling","Music","Art"],"email":"allen60@example.net","age":33}

Query Results

State <> Kentucky

select count(*)
from people
WHERE JSON_MATCH(person, '"$.address.state" <> ''Kentucky''')
count(*)
319584

Query Results

State IN Kentucky or Alabama

select json_extract_scalar(person, '$.address.state', 'STRING') AS state, count(*)
from people
WHERE JSON_MATCH(person, '"$.address.state" IN (''Kentucky'', ''Alabama'')')
GROUP BY state
ORDER BY count(*) DESC
statecount(*)
Kentucky6682
Alabama6414

Query Results

State NOT IN Kentucky or Alabama

select json_extract_scalar(person, '$.address.state', 'STRING') AS state, count(*)
from people
WHERE JSON_MATCH(person, '"$.address.state" NOT IN (''Kentucky'', ''Alabama'')')
GROUP BY state
ORDER BY count(*) DESC
statecount(*)
Montana6743
Kansas6718
Maryland6693
Ohio6677
Connecticut6662
Mississippi6639
Rhode Island6630
Arkansas6621

Query Results

Unindexed field

What if we try to query by the age column, which hasn't been indexed?

select count(*)
from people
WHERE JSON_MATCH(person, '"$.age"=59')
count(*)
0

Query Results

We get back no results! We can even run the following query to confirm that the table contains records with an age of 59:

select count(*)
from people
WHERE JSON_EXTRACT_SCALAR(person, '$.age', 'STRING') = 59
count(*)
5018

Query Results