Connecting to Kafka with SSL and SASL authentication

Connecting to Kafka with SSL and SASL authentication

In this guide we'll learn how to ingest data into Apache Pinot from an Apache Kafka cluster configured with SSL and SASL authentication (opens in a new tab).

Prerequisites

To follow the code examples in this guide, do the following:

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/kafka-ssl-sasl

Launch Pinot Cluster

You can spin up Pinot cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Pinot Schema and Table

Let's create a Pinot Schema and Table.

The schema is defined below:

{
  "schemaName":"events",
  "dimensionFieldSpecs":[
    {
      "name":"uuid",
      "dataType":"STRING"
    }
  ],
  "metricFieldSpecs":[
    {
      "name":"count",
      "dataType":"INT"
    }
  ],
  "dateTimeFieldSpecs":[
    {
      "name":"ts",
      "dataType":"TIMESTAMP",
      "format":"1:MILLISECONDS:EPOCH",
      "granularity":"1:MILLISECONDS"
    }
  ]
}

config/schema.json

And the table configuration below:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "<bootstrap.servers>",
      "security.protocol": "SASL_SSL",
      "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<cluster-api-key>\" password=\"<cluster-api-secret>\";",
      "sasl.mechanism": "PLAIN",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
    }
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    }
  },
  "tenants": {},
  "metadata": {}
}

config/table.json

You'll need to replace <bootstrap-servers> with the host and port of your Kafka cluster.

The credentials that we want to use are specified in the sasl.jaas.config property. You'll need to replace <cluster-api-key> and <cluster-api-secret> with your own credentials.

💡

If our Kafka cluster does not have SSL enabled, we would need to specify security_protocol as SASL_PLAINTEXT instead of SASL_SSL. For an example of using SASL without SSL, see Connecting to Kafka with SASL authentication

Create the table and schema by running the following command:

docker exec -it pinot-controller-ssl-sasl bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -schemaFile /config/schema.json \
  -exec

Ingesting Data

Ingest a few messages into your Kafka cluster:

{"ts": "1649757242937", "uuid": "fc43b2fafbf64d9e8dff8d6be75d881d", "count": 308}
{"ts": "1649757242941", "uuid": "11f2500386ec42be84debba1d5bfd2f7", "count": 515}
{"ts": "1649757242945", "uuid": "f2dcf496957146eaa12605c5d8c005a0", "count": 142}

If you're using Confluent Cloud you can ingest these messages via the UI.

Querying

Now let's navigate to localhost:9000/#/query (opens in a new tab) and copy/paste the following query:

select count(*), sum(count) 
from events 

You will see the following output:

count(*)sum(count)
3965

Query Results