Connecting to Kafka with SASL authentication

Connecting to Kafka with SASL authentication

In this guide we'll learn how to ingest data into Apache Pinot from an Apache Kafka cluster configured with SASL authentication (opens in a new tab).

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/kafka-sasl

Launch Pinot and Kafka Clusters

You can spin up Pinot and Kafka clusters by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

The Kafka cluster is launched with the following JAAS (Java Authentication and Authorization Service) for SASL configuration

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};
Client{};

We have two users:

  • admin with the password admin-secret
  • alice with the password alice-secret

Pinot Schema and Table

Let's create a Pinot Schema and Table.

The schema is defined below:

{
  "schemaName":"events",
  "dimensionFieldSpecs":[
    {
      "name":"uuid",
      "dataType":"STRING"
    }
  ],
  "metricFieldSpecs":[
    {
      "name":"count",
      "dataType":"INT"
    }
  ],
  "dateTimeFieldSpecs":[
    {
      "name":"ts",
      "dataType":"TIMESTAMP",
      "format":"1:MILLISECONDS:EPOCH",
      "granularity":"1:MILLISECONDS"
    }
  ]
}

config/schema.json

And the table config below:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "kafka-sasl:9093",
      "security.protocol": "SASL_PLAINTEXT",
      "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";",
      "sasl.mechanism": "PLAIN",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
    }
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    }
  },
  "tenants": {},
  "metadata": {}
}

config/table.json

The part of this configuration that we're interested in is highlighted. The credentials that we want to use are specified in the sasl.jaas.config property.

💡

If our Kafka cluster has SSL enabled, we would need to specify security_protocol as SASL_SSL instead of SASL_PLAINTEXT. For an example of using SSL and SASL, see Connecting to Kafka with SSL and SASL authentication

Create the table and schema by running the following command:

docker exec -it pinot-controller-sasl bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -schemaFile /config/schema.json \
  -exec

Ingesting Data

Next, we're going to ingest some data into Kafka:

while true; do
  ts=`date +%s%N | cut -b1-13`;
  uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
  count=$[ $RANDOM % 1000 + 0 ]
  echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done | docker exec -i kafka-sasl /opt/kafka/bin/kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --producer.config /etc/kafka/kafka_client.conf \
    --topic events;

We need to pass in a configuration file with SASL credentials when producing events as well. The contents of the configuration file are shown below:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="alice" \
        password="alice-secret";

/etc/kafka/kafka_client.conf

Querying

Now let's navigate to localhost:9000/#/query (opens in a new tab) and copy/paste the following query:

select count(*), sum(count) 
from events 

You will see the following output:

count(*)sum(count)
6420931917036

Query Results