How to check segment to server assignment
In this recipe we'll learn how to work out to which servers our segments have been assigned.
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/segment-assignment
Launch Pinot Cluster
Create Kubernetes cluster:
kind create cluster
You can spin up a Pinot Cluster by running the following command:
helm repo add pinot https://raw.githubusercontent.com/apache/pinot/master/kubernetes/helm
kubectl create ns pinot-quickstart
helm install pinot pinot/pinot \
-n pinot-quickstart \
--set cluster.name=pinot \
--set server.replicaCount=4
Port Forward Pinot UI on port 9000
kubectl port-forward service/pinot-controller 9000:9000 -n pinot-quickstart
helm repo add kafka https://charts.bitnami.com/bitnami
helm install -n pinot-quickstart kafka kafka/kafka --set replicas=1,zookeeper.image.tag=latest
Port forward Kafka on port 9090
kubectl port-forward service/kafka-headless 9092:9092 -n pinot-quickstart
Add following line to /etc/hosts
127.0.0.1 kafka-0.kafka-headless.pinot-quickstart.svc.cluster.local
Next, we're going to deploy a Kafka pod and connect to it:
kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r22 --namespace pinot-quickstart --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace pinot-quickstart -- bash
We're going to create the events
topic with 5 partitions, by running the following command:
kafka-topics.sh \
--create --bootstrap-server kafka.pinot-quickstart.svc.cluster.local:9092 \
--replication-factor 1 \
--partitions 5 \
--topic events
...
Data generator
This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:
python datagen.py 2>/dev/null | head -n1 | jq
Output is shown below:
{
"ts": 1680171022742,
"uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
"count": 260
}
Kafka ingestion
We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool.
We'll also use jq
to structure the data in the key:payload
structure that Kafka expects:
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "events",
"dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
"metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
schema.json
Now for the table config:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"timeType": "DAYS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "3650",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "events",
"replication": "2",
"replicasPerPartition": "2"
},
"tenants": {},
"ingestionConfig":{
"transformConfigs": [
{
"columnName": "ts",
"transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
}
]
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "events",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181",
"stream.kafka.zk.broker.url": "kafka-zookeeper:2181",
"stream.kafka.broker.list": "kafka:9092",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.size": "50000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}
table.json
You can add the schema and table config by running the following command:
kubectl apply -f config/pinot-events.yml
As soon as this config has been applied, Pinot will start ingesting the data from Kafka.
Segment assignment
Next, we're going to have a look at segment assignment across our 4 servers. Pinot is using balanced segment assignment, which is the default. This means that new segments are assigned to the server with the least segments already assigned.
First, install some dependencies:
pip install click ordered-set requests
We can then run the script to see how partititons have been assigned:
python segments_to_server.py \
--segments-to-servers true \
--partitions-to-servers false \
--servers-to-partitions false
This will return the following (truncated) output:
Output
Segment to Servers
events__0__0__20230605T1334Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__1__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__2__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__3__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__4__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__0__5__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__1__0__20230605T1334Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__1__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__2__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__3__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__4__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__1__5__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__2__0__20230605T1334Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__1__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__2__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__3__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__4__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__2__5__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__3__0__20230605T1334Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__1__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__2__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__3__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__4__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__3__5__20230605T1335Z ['Server_pinot-server-2', 'Server_pinot-server-3']
events__4__0__20230605T1334Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__1__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__2__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__3__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__4__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
events__4__5__20230605T1335Z ['Server_pinot-server-0', 'Server_pinot-server-1']
We can visually see that segments with the same prefix are being assigned to the same pair of servers.
e.g. segments that begin with events__4
are assigned to servers 0 and 1.
The script also returns mappings of partitions to servers:
python segments_to_server.py \
--segments-to-servers false \
--partitions-to-servers true \
--servers-to-partitions false
Output
Partition to Servers
0 OrderedSet(['Server_pinot-server-0', 'Server_pinot-server-1'])
1 OrderedSet(['Server_pinot-server-2', 'Server_pinot-server-3'])
2 OrderedSet(['Server_pinot-server-0', 'Server_pinot-server-1'])
3 OrderedSet(['Server_pinot-server-2', 'Server_pinot-server-3'])
4 OrderedSet(['Server_pinot-server-0', 'Server_pinot-server-1'])
This gives us a tidier view of how data is being assigned to servers. Partition 0 is assigned to servers 0 and 1, partition 1 is assigned to servers 2 and 3, and so on.
We can also flip it around and see the mapping of servers to partitions:
python segments_to_server.py \
--segments-to-servers false \
--partitions-to-servers false \
--servers-to-partitions true
Output
Server to Partitions
Server_pinot-server-0 OrderedSet(['0', '2', '4'])
Server_pinot-server-1 OrderedSet(['0', '2', '4'])
Server_pinot-server-2 OrderedSet(['1', '3'])
Server_pinot-server-3 OrderedSet(['1', '3'])