Google Cloud Storage as Deep Store

How to use Google Cloud Storage as a Deep Store

In this recipe we'll learn how to use Google Cloud Storage as a Deep Store for Apache Pinot segments. The deep store (opens in a new tab) (or deep storage) is the permanent store for segment files and is used for backup and restore operations.

Prerequisites

To follow the code examples in this guide, do the following:

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/google-cloud-storage

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Controller configuration

We need to provide configuration parameters to the Pinot Controller to configure MinIO as the Deep Store. This is done in the following section of the Docker Compose file:

pinot-controller:
  image: apachepinot/pinot:0.10.0
  command: "StartController -zkAddress zookeeper-gcs:2181 -config /config/controller-conf.conf"

The configuration is specified in /config/controller-conf.conf, the contents of which are shown below:

controller.access.protocols.http.port=9000
controller.zk.str=zookeeper-gcs:2181
controller.helix.cluster.name=PinotCluster
controller.host=pinot-controller-gcs
controller.port=9000

controller.data.dir=gs://<bucket-name>
controller.local.temp.dir=/tmp/pinot-tmp-data

pinot.controller.segment.fetcher.protocols=file,http,gs
pinot.controller.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher

pinot.controller.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
pinot.controller.storage.factory.gs.projectId=<project-id>
pinot.controller.storage.factory.gs.gcpKey=/config/service-account.json

/config/controller-conf.conf

Let's go through some of these properties:

  • controller.data.dir contains the name of our bucket.
  • pinot.controller.storage.factory.gs.projectId contains the name of our GCP project.
  • pinot.controller.storage.factory.gs.gcpKey contains the path to our GCP JSON key file.

You'll need to update the following lines:

controller.data.dir=gs://<bucket-name>
pinot.controller.storage.factory.gs.projectId=<project-id>
  • Replace <bucket-name> with the name of your bucket.
  • Replace <project-id> with the name of your GCP project.

You should also paste the contents of your GCP JSON key file into config/service-account.json.

Pinot Schema and Tables

Now let's create a Pinot Schema and real-time table.

Schema

Our schema is going to capture some simple events, and looks like this:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [
    {
      "name": "uuid",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "count",
      "dataType": "INT"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "ts",
    "dataType": "TIMESTAMP",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}

config/schema.json

You can create the schema by running the following command:

docker exec -it pinot-controller-gcs bin/pinot-admin.sh AddSchema   \
  -schemaFile /config/schema.json \
  -exec

Real-Time Table

And the real-time table is defined below:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "kafka-gcs:9093",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "realtime.segment.flush.threshold.rows": "10000",
      "realtime.segment.flush.threshold.time": "1h",
      "realtime.segment.flush.threshold.segment.size": "5M"
    }
  },
  "tenants": {},
  "metadata": {},
  "task": {
    "taskTypeConfigsMap": {
    }
  }
}

config/table-realtime.json

⚠️

The realtime.segment.flush.threshold.rows config is intentionally set to an extremely small value so that the segment will be committed after 10,000 records have been ingested. In a production system this value should be set much higher, as described in the configuring segment threshold guide.

You can create the table by running the following command:

docker exec -it pinot-controller-gcs bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table-realtime.json   \
  -exec

Ingesting Data

Let's ingest data into the events Kafka topic, by running the following:

while true; do
  ts=`date +%s%N | cut -b1-13`;
  uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
  count=$[ $RANDOM % 1000 + 0 ]
  echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done |
docker exec -i kafka-minio /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic events

Data will make its way into the real-time table. We can see how many records have been ingested by running the following query:

SELECT count(*)
FROM events

Exploring Deep Store

Now we're going to check what segments we have and where they're stored.

You can get a list of all segments by running the following:

curl -X GET \
  "http://localhost:9000/segments/events" \
  -H "accept: application/json" 2>/dev/null | 
  jq '.[] | .REALTIME[]'

The output is shown below:

Output

"events__0__0__20220505T1339Z"
"events__0__1__20220505T1342Z"
"events__0__2__20220505T1342Z"
"events__0__3__20220505T1343Z"
"events__0__4__20220505T1344Z"

Let's pick one of these segments, events__0__3__20220505T1343Z and get its metadata, by running the following:

tableName="events"
segmentName="events__0__3__20220505T1343Z"
curl -X GET \
  "http://localhost:9000/segments/${tableName}/${segmentName}/metadata" \
  -H "accept: application/json" 2>/dev/null | 
  jq '.'

The output is shown below:

Output

{
  "segment.crc": "532660340",
  "segment.creation.time": "1651758198369",
  "segment.download.url": "gs://pinot-events/events/events__0__3__20220505T1343Z",
  "segment.end.time": "1651758238283",
  "segment.flush.threshold.size": "10000",
  "segment.index.version": "v3",
  "segment.realtime.endOffset": "40000",
  "segment.realtime.numReplicas": "1",
  "segment.realtime.startOffset": "30000",
  "segment.realtime.status": "DONE",
  "segment.start.time": "1651758188443",
  "segment.time.unit": "MILLISECONDS",
  "segment.total.docs": "10000"
}

We can see from the highlighted line that this segment is persisted at gs://pinot-events/events/events__0__3__20220505T1343Z. Let's go back to the terminal and return a list of all the segments in the bucket:

bucketName="pinot-events"
gsutil ls -l gs://${bucketName}/events/

The output is shown below:

Output

    256712  2022-05-05T13:42:07Z  gs://pinot-events/events/events__0__0__20220505T1339Z
    256817  2022-05-05T13:42:32Z  gs://pinot-events/events/events__0__1__20220505T1342Z
    257174  2022-05-05T13:43:15Z  gs://pinot-events/events/events__0__2__20220505T1342Z
    257224  2022-05-05T13:44:05Z  gs://pinot-events/events/events__0__3__20220505T1343Z
TOTAL: 4 objects, 1027927 bytes (1003.83 KiB)