Ingest from a streaming data source
In this guide you will learn how to import data from a streaming data set. You should have a local cluster up and running, following the instructions in Create Cluster
In this guide we'll learn how to ingest data from Kafka, a stream processing platform.
Install and Launch Kafka
Let's start by downloading Kafka to our local machine.
To pull down the latest Docker image, run the following command:
docker pull wurstmeister/kafka:latest
Next we'll spin up a Kafka broker:
docker run \
--network pinot-demo \
--name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka \
Data Source
We're going to generate some JSON messages from the terminal using the following script:
import datetime
import uuid
import random
import json
while True:
ts ="%Y-%m-%dT%H:%M:%S.%fZ")
id = str(uuid.uuid4())
count = random.randint(0, 1000)
json.dumps({"ts": ts, "uuid": id, "count": count})
If you run this script (python
), you'll see the following output:
{"ts": 1644586485807, "uuid": "93633f7c01d54453a144", "count": 807}
{"ts": 1644586485836, "uuid": "87ebf97feead4e848a2e", "count": 41}
{"ts": 1644586485866, "uuid": "960d4ffa201a4425bb18", "count": 146}
Ingesting Data into Kafka
Let's now pipe that stream of messages into Kafka, by running the following command:
python |
docker exec -i kafka /opt/kafka/bin/ \
--bootstrap-server localhost:9092 \
--topic events;
We can check how many messages have been ingested by running the following command:
docker exec -i kafka \
--broker-list localhost:9092 \
--topic events
And we can print out the messages themselves by running the following command
docker exec -i kafka /opt/kafka/bin/ \
--bootstrap-server localhost:9092 \
--topic events
{"ts": 1644586485807, "uuid": "93633f7c01d54453a144", "count": 807}
{"ts": 1644586485836, "uuid": "87ebf97feead4e848a2e", "count": 41}
{"ts": 1644586485866, "uuid": "960d4ffa201a4425bb18", "count": 146}
A schema defines what fields are present in the table along with their data types in JSON format.
Create a file called /tmp/pinot/schema-stream.json
and add the following content to it.
"schemaName": "events",
"dimensionFieldSpecs": [
"name": "uuid",
"dataType": "STRING"
"metricFieldSpecs": [
"name": "count",
"dataType": "INT"
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
Table Config
A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The table config defines the table's properties in JSON format.
Create a file called /tmp/pinot/table-config-stream.json
and add the following content to it.
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replicasPerPartition": "1"
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"": "events",
"": "",
"": "",
"": "kafka:9092",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M",
"": "smallest"
"metadata": {
"customConfigs": {}
Create schema and table
Create the table and schema by running the appropriate command below:
docker run --rm -ti \
--network=pinot-demo \
-v /tmp/pinot:/tmp/pinot \
apachepinot/pinot:1.0.0 AddSchema \
-schemaFile /tmp/pinot/schema-stream.json \
-tableConfigFile /tmp/pinot/table-config-stream.json \
-controllerHost pinot-controller \
-controllerPort 9000 -exec
Navigate to localhost:9000/#/query (opens in a new tab) and click on the events
table to run a query that shows the first 10 rows in this table.
Querying the events table