Checking segment assignment

How to check segment to server assignment

In this recipe we'll learn how to work out to which servers our segments have been assigned.

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/segment-assignment

Launch Pinot Cluster

Create Kubernetes cluster:

kind create cluster

You can spin up a Pinot Cluster by running the following command:

helm repo add pinot https://raw.githubusercontent.com/apache/pinot/master/kubernetes/helm
kubectl create ns pinot-quickstart
helm install pinot pinot/pinot \
    -n pinot-quickstart \
    --set cluster.name=pinot \
    --set server.replicaCount=4

Port Forward Pinot UI on port 9000

kubectl port-forward service/pinot-controller 9000:9000 -n pinot-quickstart
helm repo add kafka https://charts.bitnami.com/bitnami
helm install -n pinot-quickstart kafka kafka/kafka --set replicas=1,zookeeper.image.tag=latest

Port forward Kafka on port 9090

kubectl port-forward service/kafka-headless 9092:9092 -n pinot-quickstart

Add following line to /etc/hosts

127.0.0.1 kafka-0.kafka-headless.pinot-quickstart.svc.cluster.local

Next, we're going to deploy a Kafka pod and connect to it:

kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r22 --namespace pinot-quickstart --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace pinot-quickstart -- bash

We're going to create the events topic with 5 partitions, by running the following command:

kafka-topics.sh \
  --create --bootstrap-server kafka.pinot-quickstart.svc.cluster.local:9092 \
  --replication-factor 1 \
  --partitions 5 \
  --topic events

...

Data generator

This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:

python datagen.py 2>/dev/null | head -n1 | jq

Output is shown below:

{
  "ts": 1680171022742,
  "uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
  "count": 260
}

Kafka ingestion

We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool. We'll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

schema.json

Now for the table config:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "timeType": "DAYS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "3650",
    "segmentPushType": "APPEND",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
    "schemaName": "events",
    "replication": "2",
    "replicasPerPartition": "2"
  },
  "tenants": {},
  "ingestionConfig":{
    "transformConfigs": [
      {
          "columnName": "ts",
          "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
      }
    ]
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "simple",
      "stream.kafka.topic.name": "events",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181",
      "stream.kafka.zk.broker.url": "kafka-zookeeper:2181",
      "stream.kafka.broker.list": "kafka:9092",
      "realtime.segment.flush.threshold.time": "3600000",
      "realtime.segment.flush.threshold.size": "50000",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}

table.json

You can add the schema and table config by running the following command:

kubectl apply -f config/pinot-events.yml

As soon as this config has been applied, Pinot will start ingesting the data from Kafka.

Segment assignment

Next, we're going to have a look at segment assignment across our 4 servers. Pinot is using balanced segment assignment, which is the default. This means that new segments are assigned to the server with the least segments already assigned.

First, install some dependencies:

pip install click ordered-set requests

We can then run the script to see how partititons have been assigned:

python segments_to_server.py \
  --segments-to-servers true \
  --partitions-to-servers false \
  --servers-to-partitions false

This will return the following (truncated) output:

Output

Segment to Servers
events__0__0__20230605T1334Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__1__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__2__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__3__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__4__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__5__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__1__0__20230605T1334Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__1__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__2__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__3__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__4__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__5__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__2__0__20230605T1334Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__1__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__2__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__3__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__4__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__5__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__3__0__20230605T1334Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__1__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__2__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__3__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__4__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__5__20230605T1335Z       ['Server_pinot-server-2', 'Server_pinot-server-3']
events__4__0__20230605T1334Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__1__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__2__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__3__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__4__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__5__20230605T1335Z       ['Server_pinot-server-0', 'Server_pinot-server-1']

We can visually see that segments with the same prefix are being assigned to the same pair of servers. e.g. segments that begin with events__4 are assigned to servers 0 and 1.

The script also returns mappings of partitions to servers:

python segments_to_server.py \
  --segments-to-servers false \
  --partitions-to-servers true \
  --servers-to-partitions false

Output

Partition to Servers
0     OrderedSet(['Server_pinot-server-0', 'Server_pinot-server-1'])
1     OrderedSet(['Server_pinot-server-2', 'Server_pinot-server-3'])
2     OrderedSet(['Server_pinot-server-0', 'Server_pinot-server-1'])
3     OrderedSet(['Server_pinot-server-2', 'Server_pinot-server-3'])
4     OrderedSet(['Server_pinot-server-0', 'Server_pinot-server-1'])

This gives us a tidier view of how data is being assigned to servers. Partition 0 is assigned to servers 0 and 1, partition 1 is assigned to servers 2 and 3, and so on.

We can also flip it around and see the mapping of servers to partitions:

python segments_to_server.py \
  --segments-to-servers false \
  --partitions-to-servers false \
  --servers-to-partitions true

Output

Server to Partitions
Server_pinot-server-0 OrderedSet(['0', '2', '4'])
Server_pinot-server-1 OrderedSet(['0', '2', '4'])
Server_pinot-server-2 OrderedSet(['1', '3'])
Server_pinot-server-3 OrderedSet(['1', '3'])