Try StarTree Cloud: 30-day free trial
Ingest from Apache Pulsar

How to ingest data from Apache Pulsar

Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo!. In this guide we'll learn how to ingest data from Pulsar into Pinot.

Pinot Version0.10.0


To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/pulsar

Download the Pulsar plugin

The plugin for ingesting data from Apache Pulsar doesn't ship with Apache Pinot, but we can download it from Maven Central (opens in a new tab). Download the file with the following name structure:


And then move or copy it into the plugins directory.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pulsar, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

We have mounted the Pulsar plugin at /opt/pinot/plugins/pinot-stream-ingestion/pinot-pulsar/pinot-pulsar-0.10.0-shaded.jar.

Pinot Schema and Tables

Now let's create a Pinot Schema, as well as a real-time and table.


Our schema is going to capture some simple events, and looks like this:

  "schemaName": "events",
  "dimensionFieldSpecs": [
      "name": "uuid",
      "dataType": "STRING"
  "metricFieldSpecs": [
      "name": "count",
      "dataType": "INT"
  "dateTimeFieldSpecs": [{
    "name": "ts",
    "dataType": "TIMESTAMP",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"



The table config is defined below:

  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "pulsar",
      "": "events",
      "stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
      "stream.pulsar.consumer.type": "lowlevel",
      "stream.pulsar.fetch.timeout.millis": "10000",
      "": "smallest",
      "": "",
      "": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
  "tenants": {},
  "metadata": {}


You can create the schema and table by running the following command:

docker run \
   --network pulsar \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
   -schemaFile /config/schema.json \
   -tableConfigFile /config/table.json \
   -controllerHost "pinot-controller-pulsar" \

Data Ingestion

Let's import some data into our tables. We're going to do this using the Pulsar Python client, which we can install by running the following command:

pip install pulsar-client

Create a file with the following contents:

import pulsar
import json
import time
import random
import uuid
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('events')
while True:
    message = {
        "ts": int(time.time() * 1000.0),
        "uuid": str(uuid.uuid4()).replace("-", ""),
        "count": random.randint(0, 1000)
    payload = json.dumps(message, ensure_ascii=False).encode('utf-8')

You can import data into Pulsar by running this script:



Navigate to localhost:9000/#/query (opens in a new tab) and click on the events table or copy/paste the following query:

select * 
from events 
limit 10