Skip to main content

How to ingest data from Apache Pulsar

Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo!. In this guide we'll learn how to ingest data from Pulsar into Pinot.

note
Pinot Version0.10.0
Codestartreedata/pinot-recipes/pulsar

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/pulsar

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Build the Pulsar plugin

The plugin for ingesting data from Apache Pulsar doesn't ship with Apache Pinot, so we'll need to build it ourselves and then add it to Pinot's plugins directory.

We can build the plugin by first closing the Pinot repository and switching to the 0.10.0 tag:

git clone git@github.com:apache/pinot.git
cd pinot
git checkout release-0.10.0

Next, navigate to the Pulsar directory and build the plugin:

cd pinot-plugins/pinot-stream-ingestion/pinot-pulsar
mvn clean install -DskipTests

Once this command has completed, the Pulsar plugin JAR will be available at target/pinot-pulsar-0.10.0-shaded.jar. We will need to make that plugin available from the plugins directory.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pulsar, and Zookeeper. You can find the docker-compose.yml file on GitHub.

We have mounted the Pulsar plugin at /opt/pinot/plugins/pinot-stream-ingestion/pinot-pulsar/pinot-pulsar-0.10.0-shaded.jar.

Pinot Schema and Tables

Now let's create a Pinot Schema, as well as a real-time and table.

Schema

Our schema is going to capture some simple events, and looks like this:

config/schema.json
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

You can create the schema by running the following command:

docker exec -it pinot-controller-pulsar bin/pinot-admin.sh AddSchema   \
-schemaFile /config/schema.json \
-exec

Table

The table config is defined below:

config/table.json
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "pulsar",
"stream.pulsar.topic.name": "events",
"stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
"stream.pulsar.consumer.type": "lowlevel",
"stream.pulsar.fetch.timeout.millis": "10000",
"stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
"stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
"stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"tenants": {},
"metadata": {}
}

You can create the table by running the following command:

docker exec -it pinot-controller-pulsar bin/pinot-admin.sh AddTable   \
-tableConfigFile /config/table.json \
-exec

Data Ingestion

Let's import some data into our tables. We're going to do this using the Pulsar Python client, which we can install by running the following command:

pip install pulsar-client

Create a file producer.py with the following contents:

producer.py
import pulsar
import json
import time
import random
import uuid

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('events')

while True:
message = {
"ts": int(time.time() * 1000.0),
"uuid": str(uuid.uuid4()).replace("-", ""),
"count": random.randint(0, 1000)
}
payload = json.dumps(message, ensure_ascii=False).encode('utf-8')
producer.send(payload)

You can import data into Pulsar by running this script:

python producer.py

Querying

Navigate to localhost:9000/#/query and click on the events table or copy/paste the following query:

select * 
from events
limit 10