How to ingest data from Apache Pulsar
Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo!. In this guide we'll learn how to ingest data from Pulsar into Pinot.
Pinot Version | 0.10.0 |
Code | startreedata/pinot-recipes/pulsar |
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/pulsar
Download the Pulsar plugin
The plugin for ingesting data from Apache Pulsar doesn't ship with Apache Pinot, but we can download it from Maven Central (opens in a new tab). Download the file with the following name structure:
pinot-pulsar-<version>-shaded.jar
And then move or copy it into the plugins
directory.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pulsar, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
We have mounted the Pulsar plugin at /opt/pinot/plugins/pinot-stream-ingestion/pinot-pulsar/pinot-pulsar-0.10.0-shaded.jar
.
Pinot Schema and Tables
Now let's create a Pinot Schema, as well as a real-time and table.
Schema
Our schema is going to capture some simple events, and looks like this:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
config/schema.json
Table
The table config is defined below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "pulsar",
"stream.pulsar.topic.name": "events",
"stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
"stream.pulsar.consumer.type": "lowlevel",
"stream.pulsar.fetch.timeout.millis": "10000",
"stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
"stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
"stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"tenants": {},
"metadata": {}
}
config/table.json
You can create the schema and table by running the following command:
docker run \
--network pulsar \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-pulsar" \
-exec
Data Ingestion
Let's import some data into our tables. We're going to do this using the Pulsar Python client, which we can install by running the following command:
pip install pulsar-client
Create a file producer.py
with the following contents:
import pulsar
import json
import time
import random
import uuid
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('events')
while True:
message = {
"ts": int(time.time() * 1000.0),
"uuid": str(uuid.uuid4()).replace("-", ""),
"count": random.randint(0, 1000)
}
payload = json.dumps(message, ensure_ascii=False).encode('utf-8')
producer.send(payload)
producer.py
You can import data into Pulsar by running this script:
python producer.py
Querying
Navigate to localhost:9000/#/query (opens in a new tab) and click on the events
table or copy/paste the following query:
select *
from events
limit 10