How to query a table by segment
In this recipe we'll learn how to query a table to find out which records are in a particular segment.
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/query-by-segment |
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/query-by-segment
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Data generator
This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:
python datagen.py 2>/dev/null | head -n1 | jq
Output is shown below:
{
"ts": 1680171022742,
"uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
"count": 260
}
Kafka ingestion
We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool.
We'll also use jq
to structure the data in the key:payload
structure that Kafka expects:
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
We can check that Kafka has some data by running the following command:
docker exec -it kafka-querysegment kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events
We'll see something like the following:
events:0:19960902
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "events",
"dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
"metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Now for the table config:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"realtime.segment.flush.threshold.rows":"100000",
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-querysegment:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {
},
"tenants": {},
"metadata": {}
}
This highlighted section indicates that we're going to create new segments after every 100,000 rows.
We'll create the table by running the following:
docker run \
--network querysegment \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-querysegment" \
-exec
Querying by segment
Once that's been created, we can head over to the Pinot UI (opens in a new tab) and run some queries.
Pinot has several built-in virtual columns inside every schema that can be used for debugging purposes:
Column Name | Column Type | Data Type | Description |
---|---|---|---|
$hostName | Dimension | STRING | Name of the server hosting the data |
$segmentName | Dimension | STRING | Name of the segment containing the record |
$docId | Dimension | INT | Document id of the record within the segment |
The one that's useful for us is $segmentName
, which we can use like this to count the number of records in each segment:
select $segmentName, count(*)
from events
group by $segmentName
limit 10
$segmentName | count(*) |
---|---|
events__0__142__20230330T1004Z | 100000 |
events__0__27__20230330T1003Z | 100000 |
events__0__123__20230330T1004Z | 100000 |
events__0__15__20230330T1003Z | 100000 |
events__0__185__20230330T1004Z | 100000 |
events__0__96__20230330T1003Z | 100000 |
events__0__169__20230330T1004Z | 100000 |
events__0__160__20230330T1004Z | 100000 |
events__0__77__20230330T1003Z | 100000 |
events__0__71__20230330T1003Z | 100000 |
Query Results
We can then pick one of those segments and see what records are stored in that segment:
select *
from events
-- Change the segment name to match one that's returned by the previous query
WHERE $segmentName = 'events__0__142__20230330T1004Z'
limit 10
count | ts | uuid |
---|---|---|
666 | 2023-03-30 10:03:04.498 | 8e4f5bb3-ad5c-45ec-89cb-946acee52625 |
298 | 2023-03-30 10:03:04.498 | f439c3ad-86a1-4043-895e-681d5497cda0 |
128 | 2023-03-30 10:03:04.498 | 9e29317f-c8fd-44d8-8313-aa28a80f1e5e |
40 | 2023-03-30 10:03:04.498 | 697e2dbc-aee5-466e-b24e-763b18aff003 |
659 | 2023-03-30 10:03:04.498 | 8d981789-7e61-4a25-81d1-b62616e877e8 |
436 | 2023-03-30 10:03:04.498 | f4a883f3-3e6a-4169-a7ca-2469a5100ef2 |
408 | 2023-03-30 10:03:04.498 | dc21499e-cdb3-4fae-93c1-0b9bd0233ae0 |
457 | 2023-03-30 10:03:04.498 | 0de9a860-5af3-4155-93c9-d396df3cd0b3 |
197 | 2023-03-30 10:03:04.498 | c4ddd15a-814f-4036-9eee-3121f59783f8 |
362 | 2023-03-30 10:03:04.498 | 11b00463-2587-4838-9f05-089c05ad9fea |
Query Results