How to ingest data from Apache Pulsar
Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo!. In this guide we'll learn how to ingest data from Pulsar into Pinot.
Pinot Version | 0.10.0 |
Code | startreedata/pinot-recipes/pulsar |
Prerequisites
To follow the code examples in this guide, you must install Docker locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/pulsar
Download the Pulsar plugin
The plugin for ingesting data from Apache Pulsar doesn't ship with Apache Pinot, but we can download it from Maven Central. Download the file with the following name structure:
pinot-pulsar-<version>-shaded.jar
And then move or copy it into the plugins
directory.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pulsar, and Zookeeper. You can find the docker-compose.yml file on GitHub.
We have mounted the Pulsar plugin at /opt/pinot/plugins/pinot-stream-ingestion/pinot-pulsar/pinot-pulsar-0.10.0-shaded.jar
.
Pinot Schema and Tables
Now let's create a Pinot Schema, as well as a real-time and table.
Schema
Our schema is going to capture some simple events, and looks like this:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
Table
The table config is defined below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "pulsar",
"stream.pulsar.topic.name": "events",
"stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
"stream.pulsar.consumer.type": "lowlevel",
"stream.pulsar.fetch.timeout.millis": "10000",
"stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
"stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
"stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"tenants": {},
"metadata": {}
}
You can create the schema and table by running the following command:
docker run \
--network pulsar \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-pulsar" \
-exec
Data Ingestion
Let's import some data into our tables. We're going to do this using the Pulsar Python client, which we can install by running the following command:
pip install pulsar-client
Create a file producer.py
with the following contents:
import pulsar
import json
import time
import random
import uuid
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('events')
while True:
message = {
"ts": int(time.time() * 1000.0),
"uuid": str(uuid.uuid4()).replace("-", ""),
"count": random.randint(0, 1000)
}
payload = json.dumps(message, ensure_ascii=False).encode('utf-8')
producer.send(payload)
You can import data into Pulsar by running this script:
python producer.py
Querying
Navigate to localhost:9000/#/query and click on the events
table or copy/paste the following query:
select *
from events
limit 10