How to ingest data from Apache Pulsar
Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo!. In this guide we'll learn how to ingest data from Pulsar into Pinot.
Pinot Version | 0.10.0 |
Code | startreedata/pinot-recipes/pulsar |
Pre-requisites
You will need to install Docker locally to follow the code examples in this guide.
Download Recipe
First, clone the GitHub repository to your local machine and navigate to this recipe:
git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/pulsar
If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.
Build the Pulsar plugin
The plugin for ingesting data from Apache Pulsar doesn't ship with Apache Pinot, so we'll need to build it ourselves and then add it to Pinot's plugins
directory.
We can build the plugin by first closing the Pinot repository and switching to the 0.10.0 tag:
git clone git@github.com:apache/pinot.git
cd pinot
git checkout release-0.10.0
Next, navigate to the Pulsar directory and build the plugin:
cd pinot-plugins/pinot-stream-ingestion/pinot-pulsar
mvn clean install -DskipTests
Once this command has completed, the Pulsar plugin JAR will be available at target/pinot-pulsar-0.10.0-shaded.jar
.
We will need to make that plugin available from the plugins
directory.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pulsar, and Zookeeper. You can find the docker-compose.yml file on GitHub.
We have mounted the Pulsar plugin at /opt/pinot/plugins/pinot-stream-ingestion/pinot-pulsar/pinot-pulsar-0.10.0-shaded.jar
.
Pinot Schema and Tables
Now let's create a Pinot Schema, as well as a real-time and table.
Schema
Our schema is going to capture some simple events, and looks like this:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
You can create the schema by running the following command:
docker exec -it pinot-controller-pulsar bin/pinot-admin.sh AddSchema \
-schemaFile /config/schema.json \
-exec
Table
The table config is defined below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "pulsar",
"stream.pulsar.topic.name": "events",
"stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
"stream.pulsar.consumer.type": "lowlevel",
"stream.pulsar.fetch.timeout.millis": "10000",
"stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
"stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
"stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"tenants": {},
"metadata": {}
}
You can create the table by running the following command:
docker exec -it pinot-controller-pulsar bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-exec
Data Ingestion
Let's import some data into our tables. We're going to do this using the Pulsar Python client, which we can install by running the following command:
pip install pulsar-client
Create a file producer.py
with the following contents:
import pulsar
import json
import time
import random
import uuid
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('events')
while True:
message = {
"ts": int(time.time() * 1000.0),
"uuid": str(uuid.uuid4()).replace("-", ""),
"count": random.randint(0, 1000)
}
payload = json.dumps(message, ensure_ascii=False).encode('utf-8')
producer.send(payload)
You can import data into Pulsar by running this script:
python producer.py
Querying
Navigate to localhost:9000/#/query and click on the events
table or copy/paste the following query:
select *
from events
limit 10