Skip to main content

How to ingest data from Apache Pulsar

Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo!. In this guide we'll learn how to ingest data from Pulsar into Pinot.

note
Pinot Version0.10.0
Codestartreedata/pinot-recipes/pulsar

Prerequisites

To follow the code examples in this guide, you must install Docker locally and download recipes.

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
    cd pinot-recipes/recipes/pulsar

Download the Pulsar plugin

The plugin for ingesting data from Apache Pulsar doesn't ship with Apache Pinot, but we can download it from Maven Central. Download the file with the following name structure:

pinot-pulsar-<version>-shaded.jar

And then move or copy it into the plugins directory.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pulsar, and Zookeeper. You can find the docker-compose.yml file on GitHub.

We have mounted the Pulsar plugin at /opt/pinot/plugins/pinot-stream-ingestion/pinot-pulsar/pinot-pulsar-0.10.0-shaded.jar.

Pinot Schema and Tables

Now let's create a Pinot Schema, as well as a real-time and table.

Schema

Our schema is going to capture some simple events, and looks like this:

config/schema.json
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}

Table

The table config is defined below:

config/table.json
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "pulsar",
"stream.pulsar.topic.name": "events",
"stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
"stream.pulsar.consumer.type": "lowlevel",
"stream.pulsar.fetch.timeout.millis": "10000",
"stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
"stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
"stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"tenants": {},
"metadata": {}
}

You can create the schema and table by running the following command:

docker run \
--network pulsar \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-pulsar" \
-exec

Data Ingestion

Let's import some data into our tables. We're going to do this using the Pulsar Python client, which we can install by running the following command:

pip install pulsar-client

Create a file producer.py with the following contents:

producer.py
import pulsar
import json
import time
import random
import uuid

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('events')

while True:
message = {
"ts": int(time.time() * 1000.0),
"uuid": str(uuid.uuid4()).replace("-", ""),
"count": random.randint(0, 1000)
}
payload = json.dumps(message, ensure_ascii=False).encode('utf-8')
producer.send(payload)

You can import data into Pulsar by running this script:

python producer.py

Querying

Navigate to localhost:9000/#/query and click on the events table or copy/paste the following query:

select * 
from events
limit 10