Try StarTree Cloud: 30-day free trial
Apache Kafka

Kafka connector

Kafka is an open-source distributed data streaming platform designed to handle high-throughput, fault-tolerant, and real-time data streaming.

Use the Kafka connector to create a table in Apache Pinot by mapping it to a particular topic. StarTree automatically begins to consume data from the connected topic in Kafka.

Connect StarTree Cloud to Kafka

To connect StarTree to Kafka, complete the following procedures as needed:

Prerequisites for SSL

To use SSL encryption, you must do the following:

  • Obtain the bootstrap server endpoint (for the Pinot broker URL)
  • Obtain the server certificate, the client certificate, and the client key
  1. To locate the bootstrap server information to add the broker URL, open the server.properties file.

    Note: The location of the server.properties file varies depending on the operating system being used. Contact your server administrator if you cannot locate this file.

  2. Copy the entire line after bootstrap.servers that contain the list of bootstrap servers and ports. For example, copy hostname1:9092, hostname2:9092 from the line below:

     bootstrap.servers: hostname1:9092, hostname2:9092
  3. If you're using SSL for authentication, obtain the server certificate, the client certificate, and the client key.

    • For Apache Kafka, click the Obtain SSL details for Apache Kafka tab
    • For Aiven Kafka, click the Obtain SSL details for Aiven Kafka tab

Obtain SSL details for Apache Kafka

To obtain your Certificate Authority (CA) (server certificate), the client certificate, and the client key for Apache Kafka, complete the following steps:

  1. Optional if you already have a Certificate Authority (CA) to sign. To create a CA, run the following command on the Kafka server command line:

    openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
  2. To generate the client certificate and client key, do the following:

    a. Enter the following command on the Kafka server command line:

       keytool -keystore {key_store_name}.jks -alias {key_store_name_alias} -keyalg RSA -validity {validity} -genkey

    Note: The alias is just a shorter name for the key store. The same alias needs to be reused throughout the steps. Remember your passwords for each keystore or truststore to use later.

    b. Provide the answers to questions that are displayed on the interactive prompt. For the question What is your first and last name?, enter the CN for your certificate.

    Note: The Common Name (CN) must match the fully qualified domain name (FQDN) of the server to ensure that StarTree connects to the correct server. Refer to this page to find the FQDN based on the server type.

  3. To add the CA to the broker's truststore, run the following on the Kafka server command line:

       keytool -keystore {broker_trust_store}.jks -alias CARoot -importcert -file ca-cert
  4. To sign the certificates with the CA file, do the following on the Kafka server command line:

    a. Export the certificate from the keystore:

       keytool -keystore {key_store_name}.jks -alias {key_store_name_alias} -certreq -file cert-file

    b. Sign the certificate with the CA:

       openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}

    c. Add the certificates back to the keystore:

       keytool -keystore {key_store_name}.jks -alias CARoot -importcert -file ca-cert  
       
       keytool -keystore {key_store_name}.jks -alias {key_store_name_alias} -importcert -file cert-signed

    d. Extract the client certificate key from keystore a. Convert the keystore file from its existing .jks format to the PKCS12 (.p12) format for interoperability. To do this, enter the following command in the Kafka server command line:

    keytool -v -importkeystore -srckeystore
    {key_store_name}.jks -srcalias
    {key_store_name_alias} -destkeystore 
    {key_store_name}.p12 -deststoretype PKCS12

    b. Extract the client certificate key into a .pem file (the format StarTree Cloud uses):

    openssl pkcs12 -in {key_store_name}.p12 -nocerts -nodes > cert-key.pem

    c. Go to the Kafka server folder with the Kafka truststore and keystore, and upload the following files to StarTree Cloud:

    • Client certificate: cert-signed
    • Client key: cert-key.pem
    • CA (server certificate): `ca-cert

Renew Kafka SSL certificates

To renew your Apache Kafka or Aiven Kafka SSL certificates, do the following:

  1. Update the SSL certificate entries in your Table configuration using Pinot APIs.
  2. Use the forceCommit API to flush out the current consuming segments and start new Kafka consumers. The SSL certificates are updated.

Create a dataset in StarTree Cloud

  1. On the StarTree Data Manager overview page, click Create a dataset. Then, on the Select Connection Type page, click Kafka.

  2. Enter the following information:

    • Broker URL: You can find this information from your Kafka cluster. The broker URL contains the port number at the end.
    • Authentication Type:
      • Default: Select this option if you've deployed StarTree Cloud BYOC and Kafka in the same VPC. In this case, connect to your Kafka cluster without any authentication. When selecting this option, no further information is needed to create a connection between StarTree Data Manager and the Kafka broker.
      • SASL: SASL is a framework for data security and authentication. Choose this option to ensure only authenticated actors can connect to the Kafka cluster. This option supports both unencrypted and encrypted data. Enter the following information when selecting this option:
        • Security Protocol: Your chosen communication type, select from the following available options:
        • SASL_PLAINTEXT: The communication between the client (StarTree Data Manager in this case) and Kafka broker is not encrypted.
        • SASL_SSL: The communication between the client (StarTree Data Manager in this case) and Kafka broker is encrypted.
      • SASL Mechanism: This is the authentication mechanism and not coupled with security protocol. Select from the following options:
        • PLAIN: In this mode, authentication credentials are exchanged between StarTree Data Manager and Kafka broker as entered in the UI.
        • SCRAM-SHA-256: In this case, the authentication is established by passing a 32-byte hash token which is generated based on the username and password.
        • SCRAM-SHA-512: In this case, the authentication is established by passing a 64-byte hash token which is generated based on the username and password.
      • Username: Username to connect to the broker.
      • Password: Password associated with the username to connect to broker.
      • Advanced Options* are available if you are using a schema registry. This is only supported for Confluent Cloud currently. The schema registry stored in other services, for example, AWS Glue and others, is not supported. For Confluent Cloud, enter the following information:
        • Schema Registry URL
        • Username:
        • Password:
      • SSL Setup: SSL can be set up in two different ways. One is using an open certificate authority like 'Let’s Encrypt' and the other is using a commercial certificate authority. To set up SSL, you need a Server certificates (CA), client certificates, and a client key. If you haven't already obtained these, see Prerequisites for SSL.
  3. To configure dataset details, enter a dataset description and select an existing topic name.

  4. From the second drop-down list, select the appropriate Data Format (supported data formats include):

    • Avro
    • JSON
    • Parquet
    • protobuf Note: Schema registry is required for AVRO and Protobuf data formats. If you’ve not configured it already, go back to the previous screen to provide schema registry information in the Connection details.
  5. (Optional) In the Record reader config field, provide additional details about the source schema for your data format. For more information, see Configure record reader.

  6. Data Modeling

    • Configure Columns: You can change the schema, add columns, remove columns, and configure the time columns. Along with this, you can create the new transformed column using defined mapping functions.
    • Configure Pinot Time Column
  7. Configure encoding types and indexes to the appropriate columns to improve query performance.

  8. Configure Tenant-related additional configurations such as broker and server tenant, data retention period, and the primary key for upsert.

  9. Review and Submit: Check the dataset and connection details. When ready, click Create Dataset.

Configure record reader

Add a record reader configuration to specify how to read file formats during batch ingestion or streaming ingestion.

Note: Delta ingestion tasks: Record reader configurations are not supported for Parquet (ParquetRecordReaderConfig).

Configure files for batch ingestion (file ingestion task) for S3, Upload File, GCS

Configure the record reader to customize how the file format is read during ingestion. Data manager supports configuring batch ingestion for the following file formats:

CSV

Data Manager provides the CSVRecordReaderConfig for handling CSV files with the following customizable options:

  • header: Provide a header when the input file has no headers.
  • skipHeader: Provide an alternate header when the input file has a corrupt header.
  • delimiter: Use an alternate delimiter when fields are not separated by the default delimiter comma.
  • skipUnParseableLines: Skip records that are not parseable instead of failing ingestion.

Example: Provide a header when the input file has no headers.

{
  "header": "colA,colB,colC"
}

Example: Provide an alternate header when the input file has a corrupt header.

 "header": "colA,colB,colC",
 "skipHeader": "true"
}

Example: Use an alternate delimiter when fields are not separated by the default delimiter comma.

 "delimiter": ";"
}

OR

  "delimiter": "\\t"
}

Example: Skipping records that are not parseable instead of failing ingestion. This option to be used with caution as it can lead to data loss.

  skipUnParseableLines: "true"
}

Example: Handling CSV files with no header, tab-separated fields, empty lines, and unparsable records.

  "header": "colA,colB,colC",
  "delimiter": "\\t",
  "ignoreEmptyLines": "true",
  "skipUnParseableLines": "true"
}

For a comprehensive list of available CSV record reader configurations, see the Pinot CSV documentation (opens in a new tab).

AVRO

Data Manager supports one configuration option in AvroRecordReaderConfig:

enableLogicalTypes: Enable logical type conversions for specific Avro logical types, such as DECIMAL, UUID, DATE, TIME_MILLIS, TIME_MICROS, TIMESTAMP_MILLIS, and TIMESTAMP_MICROS.

  "enableLogicalTypes": "true"
}

For example, if the schema type is INT, logical type is DATE, the conversion applied is a TimeConversion, and the value is V; then a date is generated V days from epoch start.

Parquet

For Parquet files, Data Manager provides the ParquetRecordReaderConfig with customizable configurations in Data Manager.

Use Parquet Avro Record Reader:

    "useParquetAvroRecordReader" : "true"
}

When this config is used the parquet record reader used is: org.apache.pinot.plugin.inputformat.parquet.ParquetAvroRecordReader

Use Parquet Native Record Reader:

    "useParquetNativeRecordReader" : "true"
}

When this config is used the parquet record reader used is: org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader

JSON

The JSONRecordReader isn't a supported record reader configuration, so no customizations are possible.

ORC

The ORCRecordReader in Pinot does not support a record reader config, hence no customizations are possible.

Streaming ingestion from Kafka

AVRO

The configurations for Kafka streaming ingestion are similar to those described in the Batch Ingestion section.

JSON

No specific record reader config is available for JSON in the streaming ingestion context.

PROTO

ProtoBufRecordReaderConfig exists in Pinot and the following configuration is possible.

  "descriptorFile": "/path/to/descriptor/file"
}

Streaming ingestion from Kinesis

AVRO

Similar to Kafka streaming ingestion, the configurations for AVRO in Kinesis streaming ingestion are similar to those described in the Batch Ingestion section.

JSON

No specific record reader config is available for JSON in the Kinesis streaming ingestion context.

SQL ingestion from BigQuery

RecordReader configs are not applicable for this data source. DataManager UI does not expose a RecordReader config for this data source.

SQL ingestion from Snowflake

RecordReader configs are not applicable for this data source. DataManager UI does not expose a RecordReader config for this data source.