Query table by segment

How to query a table by segment

In this recipe we'll learn how to query a table to find out which records are in a particular segment.

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/query-by-segment

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Data generator

This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:

python datagen.py 2>/dev/null | head -n1 | jq

Output is shown below:

{
  "ts": 1680171022742,
  "uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
  "count": 260
}

Kafka ingestion

We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool. We'll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

We can check that Kafka has some data by running the following command:

docker exec -it kafka-querysegment kafka-run-class.sh \
  kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic events

We'll see something like the following:

events:0:19960902

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

Now for the table config:

{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "realtime.segment.flush.threshold.rows":"100000",
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-querysegment:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.time":"1h"
      }
    },
    "ingestionConfig": {
       
      },
    "tenants": {},
    "metadata": {}
}

This highlighted section indicates that we're going to create new segments after every 100,000 rows.

We'll create the table by running the following:

docker run \
   --network querysegment \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-querysegment" \
    -exec

Querying by segment

Once that's been created, we can head over to the Pinot UI (opens in a new tab) and run some queries.

Pinot has several built-in virtual columns inside every schema that can be used for debugging purposes:

Column NameColumn TypeData TypeDescription
$hostNameDimensionSTRINGName of the server hosting the data
$segmentNameDimensionSTRINGName of the segment containing the record
$docIdDimensionINTDocument id of the record within the segment

The one that's useful for us is $segmentName, which we can use like this to count the number of records in each segment:

select $segmentName, count(*)
from events
group by $segmentName
limit 10
$segmentNamecount(*)
events__0__142__20230330T1004Z100000
events__0__27__20230330T1003Z100000
events__0__123__20230330T1004Z100000
events__0__15__20230330T1003Z100000
events__0__185__20230330T1004Z100000
events__0__96__20230330T1003Z100000
events__0__169__20230330T1004Z100000
events__0__160__20230330T1004Z100000
events__0__77__20230330T1003Z100000
events__0__71__20230330T1003Z100000

Query Results

We can then pick one of those segments and see what records are stored in that segment:

select *
from events
-- Change the segment name to match one that's returned by the previous query
WHERE $segmentName = 'events__0__142__20230330T1004Z'
limit 10
counttsuuid
6662023-03-30 10:03:04.4988e4f5bb3-ad5c-45ec-89cb-946acee52625
2982023-03-30 10:03:04.498f439c3ad-86a1-4043-895e-681d5497cda0
1282023-03-30 10:03:04.4989e29317f-c8fd-44d8-8313-aa28a80f1e5e
402023-03-30 10:03:04.498697e2dbc-aee5-466e-b24e-763b18aff003
6592023-03-30 10:03:04.4988d981789-7e61-4a25-81d1-b62616e877e8
4362023-03-30 10:03:04.498f4a883f3-3e6a-4169-a7ca-2469a5100ef2
4082023-03-30 10:03:04.498dc21499e-cdb3-4fae-93c1-0b9bd0233ae0
4572023-03-30 10:03:04.4980de9a860-5af3-4155-93c9-d396df3cd0b3
1972023-03-30 10:03:04.498c4ddd15a-814f-4036-9eee-3121f59783f8
3622023-03-30 10:03:04.49811b00463-2587-4838-9f05-089c05ad9fea

Query Results