Skip to main content

How to query a table by segment

In this recipe we'll learn how to query a table to find out which records are in a particular segment.

Pre-requisites

You will need to install Docker to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/query-by-segment

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.

Data generator

This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:

python datagen.py 2>/dev/null | head -n1 | jq

Output is shown below:

{
"ts": 1680171022742,
"uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
"count": 260
}

Kafka ingestion

We're going to ingest this data into an Apache Kafka topic using the kcat command line tool. We'll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

We can check that Kafka has some data by running the following command:

docker exec -it kafka-querysegment kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events

We'll see something like the following:

events:0:19960902

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
"schemaName": "events",
"dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
"metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}

Now for the table config:

{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"realtime.segment.flush.threshold.rows":"100000",
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-querysegment:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {

},
"tenants": {},
"metadata": {}
}

This highlighted section indicates that we're going to create new segments after every 100,000 rows.

We'll create the table by running the following:

docker run \
--network querysegment \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-querysegment" \
-exec

Querying by segment

Once that's been created, we can head over to the Pinot UI and run some queries.

Pinot has several built-in virtual columns inside every schema that can be used for debugging purposes:

Column NameColumn TypeData TypeDescription
$hostNameDimensionSTRINGName of the server hosting the data
$segmentNameDimensionSTRINGName of the segment containing the record
$docIdDimensionINTDocument id of the record within the segment

The one that's useful for us is $segmentName, which we can use like this to count the number of records in each segment:

select $segmentName, count(*)
from events
group by $segmentName
limit 10
$segmentNamecount(*)
events__0__142__20230330T1004Z100000
events__0__27__20230330T1003Z100000
events__0__123__20230330T1004Z100000
events__0__15__20230330T1003Z100000
events__0__185__20230330T1004Z100000
events__0__96__20230330T1003Z100000
events__0__169__20230330T1004Z100000
events__0__160__20230330T1004Z100000
events__0__77__20230330T1003Z100000
events__0__71__20230330T1003Z100000

Query Results

We can then pick one of those segments and see what records are stored in that segment:

select *
from events
-- Change the segment name to match one that's returned by the previous query
WHERE $segmentName = 'events__0__142__20230330T1004Z'
limit 10
counttsuuid
6662023-03-30 10:03:04.4988e4f5bb3-ad5c-45ec-89cb-946acee52625
2982023-03-30 10:03:04.498f439c3ad-86a1-4043-895e-681d5497cda0
1282023-03-30 10:03:04.4989e29317f-c8fd-44d8-8313-aa28a80f1e5e
402023-03-30 10:03:04.498697e2dbc-aee5-466e-b24e-763b18aff003
6592023-03-30 10:03:04.4988d981789-7e61-4a25-81d1-b62616e877e8
4362023-03-30 10:03:04.498f4a883f3-3e6a-4169-a7ca-2469a5100ef2
4082023-03-30 10:03:04.498dc21499e-cdb3-4fae-93c1-0b9bd0233ae0
4572023-03-30 10:03:04.4980de9a860-5af3-4155-93c9-d396df3cd0b3
1972023-03-30 10:03:04.498c4ddd15a-814f-4036-9eee-3121f59783f8
3622023-03-30 10:03:04.49811b00463-2587-4838-9f05-089c05ad9fea

Query Results