How to use Google Cloud Storage as a Deep Store
In this recipe we'll learn how to use Google Cloud Storage as a Deep Store for Apache Pinot segments. The deep store (opens in a new tab) (or deep storage) is the permanent store for segment files and is used for backup and restore operations.
Pinot Version | 0.9.3 |
Code | startreedata/pinot-recipes/minio-real-time |
Prerequisites
To follow the code examples in this guide, do the following:
- Install Docker (opens in a new tab) and the Google Cloud CLI (opens in a new tab) locally.
- Create a GCP project and a user or service account that has permission to list and create buckets, and then navigate to https://console.cloud.google.com/storage/browser (opens in a new tab) and create a bucket, for example
pinot-deepstore.yourdomain.com
- download recipes
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/google-cloud-storage
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Controller configuration
We need to provide configuration parameters to the Pinot Controller to configure MinIO as the Deep Store. This is done in the following section of the Docker Compose file:
pinot-controller:
image: apachepinot/pinot:0.10.0
command: "StartController -zkAddress zookeeper-gcs:2181 -config /config/controller-conf.conf"
The configuration is specified in /config/controller-conf.conf
, the contents of which are shown below:
controller.access.protocols.http.port=9000
controller.zk.str=zookeeper-gcs:2181
controller.helix.cluster.name=PinotCluster
controller.host=pinot-controller-gcs
controller.port=9000
controller.data.dir=gs://<bucket-name>
controller.local.temp.dir=/tmp/pinot-tmp-data
pinot.controller.segment.fetcher.protocols=file,http,gs
pinot.controller.segment.fetcher.gs.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
pinot.controller.storage.factory.class.gs=org.apache.pinot.plugin.filesystem.GcsPinotFS
pinot.controller.storage.factory.gs.projectId=<project-id>
pinot.controller.storage.factory.gs.gcpKey=/config/service-account.json
/config/controller-conf.conf
Let's go through some of these properties:
controller.data.dir
contains the name of our bucket.pinot.controller.storage.factory.gs.projectId
contains the name of our GCP project.pinot.controller.storage.factory.gs.gcpKey
contains the path to our GCP JSON key file.
You'll need to update the following lines:
controller.data.dir=gs://<bucket-name>
pinot.controller.storage.factory.gs.projectId=<project-id>
- Replace
<bucket-name>
with the name of your bucket. - Replace
<project-id>
with the name of your GCP project.
You should also paste the contents of your GCP JSON key file into config/service-account.json
.
Pinot Schema and Tables
Now let's create a Pinot Schema and real-time table.
Schema
Our schema is going to capture some simple events, and looks like this:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
config/schema.json
You can create the schema by running the following command:
docker exec -it pinot-controller-gcs bin/pinot-admin.sh AddSchema \
-schemaFile /config/schema.json \
-exec
Real-Time Table
And the real-time table is defined below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-gcs:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "10000",
"realtime.segment.flush.threshold.time": "1h",
"realtime.segment.flush.threshold.segment.size": "5M"
}
},
"tenants": {},
"metadata": {},
"task": {
"taskTypeConfigsMap": {
}
}
}
config/table-realtime.json
The realtime.segment.flush.threshold.rows
config is intentionally set to an extremely small value so that the segment will be committed after 10,000 records have been ingested.
In a production system this value should be set much higher, as described in the configuring segment threshold guide.
You can create the table by running the following command:
docker exec -it pinot-controller-gcs bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table-realtime.json \
-exec
Ingesting Data
Let's ingest data into the events
Kafka topic, by running the following:
while true; do
ts=`date +%s%N | cut -b1-13`;
uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
count=$[ $RANDOM % 1000 + 0 ]
echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done |
docker exec -i kafka-minio /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events
Data will make its way into the real-time table. We can see how many records have been ingested by running the following query:
SELECT count(*)
FROM events
Exploring Deep Store
Now we're going to check what segments we have and where they're stored.
You can get a list of all segments by running the following:
curl -X GET \
"http://localhost:9000/segments/events" \
-H "accept: application/json" 2>/dev/null |
jq '.[] | .REALTIME[]'
The output is shown below:
Output
"events__0__0__20220505T1339Z"
"events__0__1__20220505T1342Z"
"events__0__2__20220505T1342Z"
"events__0__3__20220505T1343Z"
"events__0__4__20220505T1344Z"
Let's pick one of these segments, events__0__3__20220505T1343Z
and get its metadata, by running the following:
tableName="events"
segmentName="events__0__3__20220505T1343Z"
curl -X GET \
"http://localhost:9000/segments/${tableName}/${segmentName}/metadata" \
-H "accept: application/json" 2>/dev/null |
jq '.'
The output is shown below:
Output
{
"segment.crc": "532660340",
"segment.creation.time": "1651758198369",
"segment.download.url": "gs://pinot-events/events/events__0__3__20220505T1343Z",
"segment.end.time": "1651758238283",
"segment.flush.threshold.size": "10000",
"segment.index.version": "v3",
"segment.realtime.endOffset": "40000",
"segment.realtime.numReplicas": "1",
"segment.realtime.startOffset": "30000",
"segment.realtime.status": "DONE",
"segment.start.time": "1651758188443",
"segment.time.unit": "MILLISECONDS",
"segment.total.docs": "10000"
}
We can see from the highlighted line that this segment is persisted at gs://pinot-events/events/events__0__3__20220505T1343Z
.
Let's go back to the terminal and return a list of all the segments in the bucket:
bucketName="pinot-events"
gsutil ls -l gs://${bucketName}/events/
The output is shown below:
Output
256712 2022-05-05T13:42:07Z gs://pinot-events/events/events__0__0__20220505T1339Z
256817 2022-05-05T13:42:32Z gs://pinot-events/events/events__0__1__20220505T1342Z
257174 2022-05-05T13:43:15Z gs://pinot-events/events/events__0__2__20220505T1342Z
257224 2022-05-05T13:44:05Z gs://pinot-events/events/events__0__3__20220505T1343Z
TOTAL: 4 objects, 1027927 bytes (1003.83 KiB)