How to use MinIO as a Deep Store
In this recipe we'll learn how to use MinIO as a Deep Store for Apache Pinot segments. The deep store (opens in a new tab) (or deep storage) is the permanent store for segment files and is used for backup and restore operations.
Pinot Version | 0.12.1 |
Code | startreedata/pinot-recipes/minio-real-time |
Prerequisites
To follow the code examples in this guide, do the following:
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/minio-real-time
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Create MinIO bucket
The MinIO server can be accessed from the host operating system via ports 9100 and 9101.
Navigate to localhost:9101 (opens in a new tab) and login using the username minioadmin
and password minionadmin
.
Click on Identity > Users
and create a miniodeepstorage
user with the password miniodeepstorage
.
Assign this user the readwrite
policy.
Configure your MinIO Demo credentials (opens in a new tab) as environment variables:
export AWS_ACCESS_KEY_ID="miniodeepstorage"
export AWS_SECRET_ACCESS_KEY="miniodeepstorage"
Finally, create a bucket called pinot-events
by running the following command:
aws s3 mb s3://pinot-events \
--endpoint-url http://localhost:9100
Controller configuration
We need to provide configuration parameters to the Pinot Controller to configure MinIO as the Deep Store. This is done in the following section of the Docker Compose file:
pinot-controller:
image: apachepinot/pinot:0.10.0
command: "StartController -zkAddress zookeeper-minio:2181 -config /config/controller-conf.conf"
The configuration is specified in /config/controller-conf.conf
, the contents of which are shown below:
controller.access.protocols.http.port=9000
controller.zk.str=zookeeper-minio:2181
controller.helix.cluster.name=PinotCluster
controller.host=pinot-controller-minio
controller.port=9000
controller.data.dir=s3://pinot-events
controller.local.temp.dir=/tmp/pinot-tmp-data
pinot.controller.segment.fetcher.protocols=file,http,s3
pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.controller.storage.factory.s3.region=us-west-2
pinot.controller.storage.factory.s3.disableAcl=false
pinot.controller.storage.factory.s3.accessKey=miniodeepstorage
pinot.controller.storage.factory.s3.secretKey=miniodeepstorage
pinot.controller.storage.factory.s3.endpoint=http://minio:9000
/config/controller-conf.conf
Let's go through some of these properties:
controller.data.dir
contains the name of our bucket.pinot.controller.storage.factory.s3.accessKey
contains our MinIO user.pinot.controller.storage.factory.s3.secretKey
contains our MinIO password.pinot.controller.storage.factory.s3.endpoint
contains the URL of our MinIO server.
Pinot Schema and Tables
Now let's create a Pinot Schema and real-time table.
Schema
Our schema is going to capture some simple events, and looks like this:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{
"name": "uuid",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "count",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
config/schema.json
Real-Time Table
And the real-time table is defined below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-minio:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "10000",
"realtime.segment.flush.threshold.time": "1h",
"realtime.segment.flush.threshold.segment.size": "5M"
}
},
"tenants": {},
"metadata": {},
"task": {
"taskTypeConfigsMap": {
}
}
}
config/table-realtime.json
The realtime.segment.flush.threshold.rows
config is intentionally set to an extremely small value so that the segment will be committed after 10,000 records have been ingested.
In a production system this value should be set much higher, as described in the configuring segment threshold guide.
You can create the table and schema by running the following command:
docker run \
--network minio \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table-realtime.json \
-controllerHost "pinot-controller-minio" \
-exec
Ingesting Data
Let's ingest data into the events
Kafka topic, by running the following:
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
Data will make its way into the real-time table. We can see how many records have been ingested by running the following query:
SELECT count(*)
FROM events
Exploring Deep Store
Now we're going to check what segments we have and where they're stored.
You can get a list of all segments by running the following:
curl -X GET \
"http://localhost:9000/segments/events" \
-H "accept: application/json" 2>/dev/null |
jq '.[] | .REALTIME[]'
The output is shown below:
Output
"events__0__0__20220505T1038Z"
"events__0__10__20220505T1154Z"
"events__0__1__20220505T1041Z"
"events__0__2__20220505T1041Z"
"events__0__3__20220505T1041Z"
"events__0__4__20220505T1043Z"
"events__0__5__20220505T1044Z"
"events__0__6__20220505T1053Z"
"events__0__7__20220505T1053Z"
"events__0__8__20220505T1054Z"
"events__0__9__20220505T1054Z"
Let's pick one of these segments, events__0__7__20220505T1053Z
and get its metadata, by running the following:
tableName="events"
segmentName="events__0__7__20220505T1053Z"
curl -X GET \
"http://localhost:9000/segments/${tableName}/${segmentName}/metadata" \
-H "accept: application/json" 2>/dev/null |
jq '.'
The output is shown below:
Output
{
"segment.crc": "681941539",
"segment.creation.time": "1651748015770",
"segment.end.time": "1651748049814",
"segment.flush.threshold.size": "10000",
"segment.index.version": "v3",
"segment.name": "events__0__7__20220505T1053Z",
"segment.realtime.download.url": "s3://pinot-events/events/events__0__7__20220505T1053Z",
"segment.realtime.endOffset": "80000",
"segment.realtime.numReplicas": "1",
"segment.realtime.startOffset": "70000",
"segment.realtime.status": "DONE",
"segment.start.time": "1651748015219",
"segment.table.name": "events",
"segment.time.unit": "MILLISECONDS",
"segment.total.docs": "10000",
"segment.type": "REALTIME"
}
We can see from the highlighted line that this segment is persisted at s3://pinot-events/events/events__0__7__20220505T1053Z
.
Let's go back to the terminal and return a list of all the segments in the bucket:
aws s3 ls s3://pinot-events/events/ \
--endpoint-url http://localhost:9100 \
--human-readable
The output is shown below:
Output
2022-05-05 11:41:09 250.7 KiB events__0__0__20220505T1038Z
2022-05-05 11:41:09 250.9 KiB events__0__1__20220505T1041Z
2022-05-05 11:41:32 250.5 KiB events__0__2__20220505T1041Z
2022-05-05 11:43:25 250.3 KiB events__0__3__20220505T1041Z
2022-05-05 11:44:02 251.0 KiB events__0__4__20220505T1043Z
2022-05-05 11:53:08 250.5 KiB events__0__5__20220505T1044Z
2022-05-05 11:53:35 250.4 KiB events__0__6__20220505T1053Z
2022-05-05 11:54:10 250.5 KiB events__0__7__20220505T1053Z
2022-05-05 11:54:45 250.8 KiB events__0__8__20220505T1054Z
2022-05-05 12:54:47 160.6 KiB events__0__9__20220505T1054Z