How to query a table by segment
In this recipe we'll learn how to query a table to find out which records are in a particular segment.
Pinot Version | 0.12.0 |
Code | startreedata/pinot-recipes/query-by-segment |
Pre-requisites
You will need to install Docker to follow the code examples in this guide.
Download Recipe
First, clone the GitHub repository to your local machine and navigate to this recipe:
git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/query-by-segment
If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.
Data generator
This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:
python datagen.py 2>/dev/null | head -n1 | jq
Output is shown below:
{
"ts": 1680171022742,
"uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
"count": 260
}
Kafka ingestion
We're going to ingest this data into an Apache Kafka topic using the kcat command line tool.
We'll also use jq
to structure the data in the key:payload
structure that Kafka expects:
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
We can check that Kafka has some data by running the following command:
docker exec -it kafka-querysegment kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events
We'll see something like the following:
events:0:19960902
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "events",
"dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
"metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Now for the table config:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"realtime.segment.flush.threshold.rows":"100000",
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-querysegment:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {
},
"tenants": {},
"metadata": {}
}
This highlighted section indicates that we're going to create new segments after every 100,000 rows.
We'll create the table by running the following:
docker run \
--network querysegment \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-querysegment" \
-exec
Querying by segment
Once that's been created, we can head over to the Pinot UI and run some queries.
Pinot has several built-in virtual columns inside every schema that can be used for debugging purposes:
Column Name | Column Type | Data Type | Description |
---|---|---|---|
$hostName | Dimension | STRING | Name of the server hosting the data |
$segmentName | Dimension | STRING | Name of the segment containing the record |
$docId | Dimension | INT | Document id of the record within the segment |
The one that's useful for us is $segmentName
, which we can use like this to count the number of records in each segment:
select $segmentName, count(*)
from events
group by $segmentName
limit 10
$segmentName | count(*) |
---|---|
events__0__142__20230330T1004Z | 100000 |
events__0__27__20230330T1003Z | 100000 |
events__0__123__20230330T1004Z | 100000 |
events__0__15__20230330T1003Z | 100000 |
events__0__185__20230330T1004Z | 100000 |
events__0__96__20230330T1003Z | 100000 |
events__0__169__20230330T1004Z | 100000 |
events__0__160__20230330T1004Z | 100000 |
events__0__77__20230330T1003Z | 100000 |
events__0__71__20230330T1003Z | 100000 |
Query Results
We can then pick one of those segments and see what records are stored in that segment:
select *
from events
-- Change the segment name to match one that's returned by the previous query
WHERE $segmentName = 'events__0__142__20230330T1004Z'
limit 10
count | ts | uuid |
---|---|---|
666 | 2023-03-30 10:03:04.498 | 8e4f5bb3-ad5c-45ec-89cb-946acee52625 |
298 | 2023-03-30 10:03:04.498 | f439c3ad-86a1-4043-895e-681d5497cda0 |
128 | 2023-03-30 10:03:04.498 | 9e29317f-c8fd-44d8-8313-aa28a80f1e5e |
40 | 2023-03-30 10:03:04.498 | 697e2dbc-aee5-466e-b24e-763b18aff003 |
659 | 2023-03-30 10:03:04.498 | 8d981789-7e61-4a25-81d1-b62616e877e8 |
436 | 2023-03-30 10:03:04.498 | f4a883f3-3e6a-4169-a7ca-2469a5100ef2 |
408 | 2023-03-30 10:03:04.498 | dc21499e-cdb3-4fae-93c1-0b9bd0233ae0 |
457 | 2023-03-30 10:03:04.498 | 0de9a860-5af3-4155-93c9-d396df3cd0b3 |
197 | 2023-03-30 10:03:04.498 | c4ddd15a-814f-4036-9eee-3121f59783f8 |
362 | 2023-03-30 10:03:04.498 | 11b00463-2587-4838-9f05-089c05ad9fea |
Query Results