Skip to main content

Ingest from a streaming data source

info

In this guide you will learn how to import data from a streaming data set. You should have a local cluster up and running, following the instructions in Create Cluster

In this guide we'll learn how to ingest data from Kafka, a stream processing platform.

Install and Launch Kafka

Let's start by downloading Kafka to our local machine.

To pull down the latest Docker image, run the following command:

docker pull wurstmeister/kafka:latest

Next we'll spin up a Kafka broker:

docker run \
--network pinot-demo_default \
--name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ADVERTISED_HOST_NAME=kafka \
wurstmeister/kafka:latest

Data Source

We're going to generate some JSON messages from the terminal using the following script:

while true; do
ts=`date +%s%N | cut -b1-13`;
uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g' | head -c 5`
count=$[ $RANDOM % 1000 + 0 ]
echo "{\"ts\": ${ts}, \"uuid\": \"${uuid}\", \"count\": $count}";
sleep 0.01;
done

If you run this script, you'll see the following output:

Output
{"ts": 1644586485807, "uuid": "93633f7c01d54453a144", "count": 807}
{"ts": 1644586485836, "uuid": "87ebf97feead4e848a2e", "count": 41}
{"ts": 1644586485866, "uuid": "960d4ffa201a4425bb18", "count": 146}

Ingesting Data into Kafka

Let's now pipe that stream of messages into Kafka, by running the following command:

while true; do
ts=`date +%s%N | cut -b1-13`;
uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g' | head -c 5`
count=$[ $RANDOM % 1000 + 0 ]
echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
sleep 0.01;
done | docker exec -i kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events;

We can check how many messages have been ingested by running the following command:

docker exec -i kafka kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events
Output
events:0:11940

And we can print out the messages themselves by running the following command

docker exec -i kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic events
Output
...
{"ts": 1644586485807, "uuid": "93633f7c01d54453a144", "count": 807}
{"ts": 1644586485836, "uuid": "87ebf97feead4e848a2e", "count": 41}
{"ts": 1644586485866, "uuid": "960d4ffa201a4425bb18", "count": 146}
...

Schema

A schema defines what fields are present in the table along with their data types in JSON format.

Create a file called /tmp/pinot/schema-stream.json and add the following content to it.

/tmp/pinot/schema-stream.json
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

Create the schema by running the appropriate command below:

docker run --rm -ti \
--network=pinot-demo_default \
-v /tmp/pinot:/tmp/pinot \
apachepinot/pinot:0.9.3 AddSchema \
-schemaFile /tmp/pinot/schema-stream.json \
-controllerHost pinot-controller \
-controllerPort 9000 -exec

Table Config

A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The table config defines the table's properties in JSON format.

Create a file called /tmp/pinot/table-config-stream.json and add the following content to it.

/tmp/pinot/table-config-stream.json
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "events",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:9092",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}

Create the table by running the appropriate command below:

docker run --rm -ti \
--network=pinot-demo_default \
-v /tmp/pinot:/tmp/pinot \
apachepinot/pinot:0.9.3 AddTable \
-tableConfigFile /tmp/pinot/table-config-stream.json \
-controllerHost pinot-controller \
-controllerPort 9000 -exec

Querying

Navigate to localhost:9000/#/query and click on the events table to run a query that shows the first 10 rows in this table.

Querying the events tableQuerying the events table