Connecting to Kafka with SSL and SASL authentication
In this guide we'll learn how to ingest data into Apache Pinot from an Apache Kafka cluster configured with SSL and SASL authentication.
Pinot Version | 0.10.0 |
Code | startreedata/pinot-recipes/kafka-ssl-sasl |
Pre-requisites
You will need to install Docker locally to follow the code examples in this guide.
This guide assumes that you have a Kafka cluster running with SSL enabled. Confluent Cloud offers a hosted Kafka service with some free credits to get you started.
Download Recipe
First, clone the GitHub repository to your local machine and navigate to this recipe:
git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/kafka-ssl-sasl
If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.
Launch Pinot Cluster
You can spin up Pinot cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml file on GitHub.
Pinot Schema and Table
Let's create a Pinot Schema and Table.
The schema is defined below:
{
"schemaName":"events",
"dimensionFieldSpecs":[
{
"name":"uuid",
"dataType":"STRING"
}
],
"metricFieldSpecs":[
{
"name":"count",
"dataType":"INT"
}
],
"dateTimeFieldSpecs":[
{
"name":"ts",
"dataType":"TIMESTAMP",
"format":"1:MILLISECONDS:EPOCH",
"granularity":"1:MILLISECONDS"
}
]
}
And the table config below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "<bootstrap.servers>",
"security.protocol": "SASL_SSL",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<cluster-api-key>\" password=\"<cluster-api-secret>\";",
"sasl.mechanism": "PLAIN",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"tenants": {},
"metadata": {}
}
The part of this configuration that we're interested in is highlighted.
You'll need to replace <bootstrap-servers>
with the host and port of your Kafka cluster.
The credentials that we want to use are specified in the sasl.jaas.config
property.
You'll need to replace <cluster-api-key>
and <cluster-api-secret>
with your own credentials.
If our Kafka cluster does not have SSL enabled, we would need to specify security_protocol
as SASL_PLAINTEXT
instead of SASL_SSL
.
For an example of using SASL without SSL, see Connecting to Kafka with SASL authentication
Create the table and schema by running the following command:
docker exec -it pinot-controller-ssl-sasl bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json \
-exec
Ingesting Data
Ingest a few messages into your Kafka cluster:
{"ts": "1649757242937", "uuid": "fc43b2fafbf64d9e8dff8d6be75d881d", "count": 308}
{"ts": "1649757242941", "uuid": "11f2500386ec42be84debba1d5bfd2f7", "count": 515}
{"ts": "1649757242945", "uuid": "f2dcf496957146eaa12605c5d8c005a0", "count": 142}
If you're using Confluent Cloud you can ingest these messages via the UI.
Querying
Now let's navigate to localhost:9000/#/query and copy/paste the following query:
select count(*), sum(count)
from events
You will see the following output:
count(*) | sum(count) |
---|---|
3 | 965 |
Query Results