Try StarTree Cloud: 30-day free trial
Ingest from Apache Pulsar

How to ingest data from Apache Pulsar

Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo!. In this guide we'll learn how to ingest data from Pulsar into Pinot.

Pinot Version0.10.0
Codestartreedata/pinot-recipes/pulsar

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/pulsar

Download the Pulsar plugin

The plugin for ingesting data from Apache Pulsar doesn't ship with Apache Pinot, but we can download it from Maven Central (opens in a new tab). Download the file with the following name structure:

pinot-pulsar-<version>-shaded.jar

And then move or copy it into the plugins directory.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pulsar, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

We have mounted the Pulsar plugin at /opt/pinot/plugins/pinot-stream-ingestion/pinot-pulsar/pinot-pulsar-0.10.0-shaded.jar.

Pinot Schema and Tables

Now let's create a Pinot Schema, as well as a real-time and table.

Schema

Our schema is going to capture some simple events, and looks like this:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [
    {
      "name": "uuid",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "count",
      "dataType": "INT"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "ts",
    "dataType": "TIMESTAMP",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}

config/schema.json

Table

The table config is defined below:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "pulsar",
      "stream.pulsar.topic.name": "events",
      "stream.pulsar.bootstrap.servers": "pulsar://pulsar:6650",
      "stream.pulsar.consumer.type": "lowlevel",
      "stream.pulsar.fetch.timeout.millis": "10000",
      "stream.pulsar.consumer.prop.auto.offset.reset": "smallest",
      "stream.pulsar.consumer.factory.class.name": "org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory",
      "stream.pulsar.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
    }
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    }
  },
  "tenants": {},
  "metadata": {}
}

config/table.json

You can create the schema and table by running the following command:

docker run \
   --network pulsar \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
   -schemaFile /config/schema.json \
   -tableConfigFile /config/table.json \
   -controllerHost "pinot-controller-pulsar" \
   -exec

Data Ingestion

Let's import some data into our tables. We're going to do this using the Pulsar Python client, which we can install by running the following command:

pip install pulsar-client

Create a file producer.py with the following contents:

import pulsar
import json
import time
import random
import uuid
 
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('events')
 
while True:
    message = {
        "ts": int(time.time() * 1000.0),
        "uuid": str(uuid.uuid4()).replace("-", ""),
        "count": random.randint(0, 1000)
    }
    payload = json.dumps(message, ensure_ascii=False).encode('utf-8')
    producer.send(payload)

producer.py

You can import data into Pulsar by running this script:

python producer.py

Querying

Navigate to localhost:9000/#/query (opens in a new tab) and click on the events table or copy/paste the following query:

select * 
from events 
limit 10