Removing a server

How to remove a server

In this recipe we'll learn how to work out to remove a server from a Pinot cluster.

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/removing-server

Launching Pinot Cluster

Create Kubernetes cluster:

kind create cluster

You can spin up a Pinot Cluster by running the following command:

helm repo add pinot https://raw.githubusercontent.com/apache/pinot/master/kubernetes/helm
kubectl create ns pinot-quickstart
helm install pinot pinot/pinot \
    -n pinot-quickstart \
    --set cluster.name=pinot \
    --set server.replicaCount=4

Port Forward Pinot UI on port 9000

kubectl port-forward service/pinot-controller 9000:9000 -n pinot-quickstart
helm repo add kafka https://charts.bitnami.com/bitnami
helm install -n pinot-quickstart kafka kafka/kafka --set replicas=1,zookeeper.image.tag=latest

Port forward Kafka on port 9090

kubectl port-forward service/kafka-headless 9092:9092 -n pinot-quickstart

Add following line to /etc/hosts

127.0.0.1 kafka-0.kafka-headless.pinot-quickstart.svc.cluster.local

Next, we're going to deploy a Kafka pod and connect to it:

kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r22 --namespace pinot-quickstart --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace pinot-quickstart -- bash

We're going to create the events topic with 5 partitions, by running the following command:

kafka-topics.sh \
  --create --bootstrap-server localhost:9092 \
  --replication-factor 1 \
  --partitions 5 \
  --topic events

Generating data

This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:

python datagen.py 2>/dev/null | head -n1 | jq

Output is shown below:

{
  "ts": 1680171022742,
  "uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
  "count": 260
}

Ingesting data into Kafka

We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool. We'll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

Adding Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

schema.json

Now for the table config:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "timeType": "DAYS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "3650",
    "segmentPushType": "APPEND",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
    "schemaName": "events",
    "replication": "2",
    "replicasPerPartition": "2"
  },
  "tenants": {},
  "ingestionConfig":{
    "transformConfigs": [
      {
          "columnName": "ts",
          "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
      }
    ]
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "simple",
      "stream.kafka.topic.name": "events",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181",
      "stream.kafka.zk.broker.url": "kafka-zookeeper:2181",
      "stream.kafka.broker.list": "kafka:9092",
      "realtime.segment.flush.threshold.time": "3600000",
      "realtime.segment.flush.threshold.size": "50000",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}

table.json

kubectl apply -f config/pinot-events.yml

Checking segment assignment

As soon as this config has been applied, Pinot will start ingesting the data from Kafka. We'll let it run for a little while and then run the following script to check segment assignment:

python segments_to_server.py \
  --segments-to-servers false \
  --partitions-to-servers false \
  --servers-to-partitions true

Output

Server_pinot-server-0 OrderedSet(['1', '3'])
Server_pinot-server-1 OrderedSet(['0', '2', '4'])
Server_pinot-server-2 OrderedSet(['0', '2', '4'])
Server_pinot-server-3 OrderedSet(['1', '3'])

We can see that the partitions are spread across servers 0-3.

Removing server from cluster

Next, we're going to remove a server from the Pinot cluster:

kubectl scale statefulset.apps/pinot-server -n pinot-quickstart --replicas=3

After we've done this, be should see one 'dead' server on the Pinot UI:

List of Pinot servers with one deadList of Pinot servers with one dead

In this example server 3 has been removed, but your mileage may vary.

So, the underlying server has been removed, but Pinot still thinks its there. Pinot has an API that lets us remove instances, so let's try that out:

Try to remove the instance:

instance="Server_pinot-server-3.pinot-server-headless.pinot-quickstart.svc.cluster.local_8098"
curl -X DELETE \
  "http://localhost:9000/instances/${instance}" \
  -H "accept: application/json"
Output (shortened for brevity)

It didn't work because the server is still part of the ideal state for the table.

Removing tags from server

To fix this, we need to first remove all the tags from that server, by running the following:

instance="Server_pinot-server-3.pinot-server-headless.pinot-quickstart.svc.cluster.local_8098"
curl -X PUT \
  "http://localhost:9000/instances/${instance}/updateTags?tags=&updateBrokerResource=false" \
  -H "accept: application/json"

Output

{
    "status":"Updated tags: [] for instance: Server_pinot-server-3.pinot-server-headless.pinot-quickstart.svc.cluster.local_8098"
}

This will prevent any new segments being assigned to this server, but we still have one more step to do.

Rebalancing segments

The next is to rebalanae the segments so that any assigned to server 3 will be moved elsewhere. The API that does this takes in a lot of parameters, so we've wrapped it in the rebalance.py script, which you can download from GitHub. The contents are shown below:

import requests
import click
import json
 
@click.command()
@click.option('--table_name', default="events", help='Table name')
def run_rebalance(table_name):
    base_url = f"http://localhost:9000/tables/{table_name}/rebalance"
    params = {
        'type': 'realtime',
        'dryRun': 'false',
        'reassignInstances': 'true',
        'includeConsuming': 'false',
        'bootstrap': 'false',
        'downtime': 'false',
        'minAvailableReplicas': '1',
        'bestEfforts': 'false',
        'externalViewCheckIntervalInMs': '1000',
        'externalViewStabilizationTimeoutInMs': '3600000',
        'updateTargetTier': 'false',
    }
 
    response = requests.post(base_url, params=params)
 
    print(response.url)
    print(response.status_code)
    print(json.dumps(response.json(), indent=4))
 
if __name__ == '__main__':
    run_rebalance()

rebalance.py

Adjust those parameters accordingly before you run the script. You can run the script like this:

python rebalance.py

If you then wait a few seconds or maybe longer depending on how many segments you have, the segments will be rebalanced. At the time we're writing this guide there isn't an API to check on the progress of rebalancing, but this will be added in Pinot 0.13.

You can manually check if it's completed by running the following script again:

python segments_to_server.py \
  --segments-to-servers false \
  --partitions-to-servers false \
  --servers-to-partitions true

Once rebalancing has completed, you shouldn't see server 3 in the output:

Output

Server_pinot-server-0 OrderedSet(['1', '2', '4'])
Server_pinot-server-1 OrderedSet(['0', '1', '3', '4'])
Server_pinot-server-2 OrderedSet(['0', '2', '3'])

Removing instance from Pinot

And once that's the case, we can retry the API that removes the instance from Pinot:

instance="Server_pinot-server-3.pinot-server-headless.pinot-quickstart.svc.cluster.local_8098"
curl -X DELETE \
  "http://localhost:9000/instances/${instance}" \
  -H "accept: application/json"

Output

{
  "status":"Successfully dropped instance"
}