How to index a JSON column
To learn how to configure an index for a JSON column, watch the following video, or complete the tutorial below, starting with Prerequites.
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/json-index |
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/json-index
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Data generator
This recipe contains a data generator that creates events with data about people.
It uses the Faker library (opens in a new tab), so you'll first need to install that:
pip install faker
You can generate data by running the following command:
python datagen.py 2>/dev/null | head -n1 | jq
Output is shown below:
{
"ts": 1680172162791,
"person": {
"id": "ead572cb-c8bc-457a-b331-4f380c5ebe18",
"name": "Melissa Howard",
"email": "daniel75@example.com",
"age": 33,
"address": {
"street_address": "974 Michael Hollow",
"city": "Johnstonport",
"state": "South Dakota",
"country": "Latvia"
},
"phone_number": "+1-697-077-6893x6562",
"job": {
"company": "Ramirez-Hunt",
"position": "Communications engineer",
"department": "iterate efficient e-markets"
},
"interests": [
"Swimming"
],
"friend_ids": [
"88a696de-34d1-4300-ac3c-3084a6bdceeb"
]
}
}
Kafka ingestion
We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool.
We'll also use jq
to structure the data in the key:payload
structure that Kafka expects:
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t people -Kø
We can check that Kafka has some data by running the following command:
docker exec -it kafka-querysegment kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events
We'll see something like the following:
events:0:19960902
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "people",
"dimensionFieldSpecs": [
{
"name": "person",
"dataType": "JSON"
}
],
"metricFieldSpecs": [],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Our schema has only two columns - one for the timestamp and another one that stores the person.
Now for the table config:
{
"tableName": "people",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "people",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"jsonIndexConfigs": {
"person": {
"maxLevels": 2,
"excludeArray": false,
"disableCrossArrayUnnest": true,
"includePaths": null,
"excludePaths": null,
"excludeFields": ["age"]
}
},
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "people",
"stream.kafka.broker.list": "kafka-jsonindex:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows":"100000",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {
},
"tenants": {},
"metadata": {}
}
This highlighted section contains the config for the JSON index. An explanation of each of the config parameters is shown below:
Config Key | Description | Type | Default |
---|---|---|---|
maxLevels | Max levels to flatten the json object (array is also counted as one level) | int | -1 (unlimited) |
excludeArray | Whether to exclude array when flattening the object | boolean | false (include array) |
disableCrossArrayUnnest | Whether to not unnest multiple arrays (unique combination of all elements) | boolean | false (calculate unique combination of all elements) |
includePaths | Only include the given paths, e.g. "$.a.b", "$.a.c[*]" (mutual exclusive with excludePaths). Paths under the included paths will be included, e.g. "$.a.b.c" will be included when "$.a.b" is configured to be included. | Set<String\> | null (include all paths) |
excludePaths | Exclude the given paths, e.g. "$.a.b", "$.a.c[*]" (mutual exclusive with includePaths). Paths under the excluded paths will also be excluded, e.g. "$.a.b.c" will be excluded when "$.a.b" is configured to be excluded. | Set<String\> | null (include all paths) |
excludeFields | Exclude the given fields, e.g. "b", "c", even if it is under the included paths. | Set<String\> | null (include all fields) |
So we are including all fields except age
in the JSON index.
Fields included in the JSON index can be filtered using the JSON_MATCH
function.
If you use this function with a field that isn't included in the index, it won't return any records.
We'll create the table by running the following:
docker run \
--network jsonindex \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-jsonindex" \
-exec
Querying by JSON index
Let's now try to query this table using the JSON index.
JSON indexes support the following predicates: =
, <>
, IN
, and NOT IN
State = Kentucky
select ts, person
from people
WHERE JSON_MATCH(person, '"$.address.state" = ''Kentucky''')
LIMIT 3
ts | person |
---|---|
2023-03-30 09:40:59.788 | {"address":{"street_address":"8173 Diaz Hollow Apt. 175","country":"British Indian Ocean Territory (Chagos Archipelago)","city":"East Carlos","state":"Kentucky"},"friend_ids":["de27ec51-2895-4044-98bc-5643853d7d0d","84080eb3-1c17-4b4a-ada7-a764ed44cef7"],"name":"Robert Alvarado","phone_number":"085.136.9039x6810","id":"874a4543-f528-49e2-85a8-d31a6cbae269","job":{"company":"Gray, Huerta and Moyer","position":"Chemical engineer","department":"transition collaborative networks"},"interests":["Swimming","Meditation","Reading","Cycling","Reading"],"email":"jamiebarker@example.net","age":52} |
2023-03-30 09:40:59.823 | {"address":{"street_address":"277 Robert Crossroad Apt. 622","country":"Haiti","city":"Port Karenshire","state":"Kentucky"},"friend_ids":["2285322d-0049-4386-b41c-9f96a0ed5038"],"name":"Diana Thomas","phone_number":"171-966-4122","id":"b996770e-478c-412d-89bd-d3a085140016","job":{"company":"Diaz LLC","position":"Scientist, research (maths)","department":"deploy granular convergence"},"interests":["Sports","Photography","Art","Photography","Reading"],"email":"ncook@example.org","age":40} |
2023-03-30 09:40:59.843 | {"address":{"street_address":"85628 William Curve","country":"Djibouti","city":"Gregoryborough","state":"Kentucky"},"friend_ids":["1a7ee9e4-6daa-4994-a077-3170bffab04f"],"name":"Monica Harrell","phone_number":"(120)253-7632","id":"c71df52f-49c4-4d61-b6d6-d94134c92e8b","job":{"company":"Larson-Mcdaniel","position":"Ranger/warden","department":"seize leading-edge web services"},"interests":["Traveling","Music","Art"],"email":"allen60@example.net","age":33} |
Query Results
State <\>
Kentucky
select count(*)
from people
WHERE JSON_MATCH(person, '"$.address.state" <> ''Kentucky''')
count(*) |
---|
319584 |
Query Results
State IN Kentucky or Alabama
select json_extract_scalar(person, '$.address.state', 'STRING') AS state, count(*)
from people
WHERE JSON_MATCH(person, '"$.address.state" IN (''Kentucky'', ''Alabama'')')
GROUP BY state
ORDER BY count(*) DESC
state | count(*) |
---|---|
Kentucky | 6682 |
Alabama | 6414 |
Query Results
State NOT IN Kentucky or Alabama
select json_extract_scalar(person, '$.address.state', 'STRING') AS state, count(*)
from people
WHERE JSON_MATCH(person, '"$.address.state" NOT IN (''Kentucky'', ''Alabama'')')
GROUP BY state
ORDER BY count(*) DESC
state | count(*) |
---|---|
Montana | 6743 |
Kansas | 6718 |
Maryland | 6693 |
Ohio | 6677 |
Connecticut | 6662 |
Mississippi | 6639 |
Rhode Island | 6630 |
Arkansas | 6621 |
Query Results
Unindexed field
What if we try to query by the age
column, which hasn't been indexed?
select count(*)
from people
WHERE JSON_MATCH(person, '"$.age"=59')
count(*) |
---|
0 |
Query Results
We get back no results! We can even run the following query to confirm that the table contains records with an age of 59:
select count(*)
from people
WHERE JSON_EXTRACT_SCALAR(person, '$.age', 'STRING') = 59
count(*) |
---|
5018 |
Query Results