Instructions as of October 2022
Steps to enable Tiered Storage on an existing table
Decide the segment age boundary for splitting across local and remote tier
For example, if you have 30 days of data, if you pick segment age boundary as 7 days, segments less than 7 days old will be on the local tier, and segments older than 7 days will be moved to the remote tier.
Choose your tenant setup
- Use same tenant as table
You could use the same tenant as what you've set in
tableConfig->tenants->server, in which case, this step is a no-op for you.
Note that while this is a great way to get started quickly, this is not a recommended setup for production scenaios. Processing the segments from cloud object store will cause more than usual memory/cpu utilization on the nodes, and might affect the performance of the local data queries. If your local data queries are serving latency sensitive, user-facing traffic, we recommend using setup 2.2.
- Use dedicated different tenant (Recommended)
You can create a new tenant, which will serve as compute-only nodes. This tenant will be dedicated to serving the segments which are on S3, and hence do not need to have as much local storage as your table's default local tenant.
- Add servers (need at least as many as table replication) .
- Tag these as
RemoteTenant_OFFLINE(or anything that is appropriate) using this updating tags API
Add cluster configs and restart servers
POST /cluster/configs API from Swagger, add following cluster-level tier storage configurations.
In future releases, many of these properties will be made default, and will not have to be set.
"pinot.server.instance.use.local.tier.migrator" : "true",
"pinot.server.instance.local.tier.migrator.schedule.interval.sec" : "3600"
|tier.enabled||Config to enable tier storage for the cluster||false|
|ondemand.total.size||Direct memory available for OnDemand cache. Ensure this is smaller than direct memory available on server (else you’ll get OutOfDirectMemory exceptions)||2G|
|ondemand.reserved.size||This is a portion of the above total.size. Ensure this is larger than size of largest uncompressed segment||500M|
|pinot.helix.instance.state.maxStateTransitions||A good idea to cap this if you will be doing frequent restarts, re-indexing, capacity changes||unlimited|
|use.local.tier.migrator||Needed if using same tenant for local and remote (setup 2.1). This is the periodic task that facilitates the movement of segments to S3||false|
|ondemand.use.async.client||Whether to use async client lib to fetch data, e.g. AsyncS3Client. Usually the sync client works just fine. If there is abundant ondemand space to allow hundreds of concurrent I/O requests to happen, using async client can avoid creating too many I/O threads as controlled by the next config.||false|
|ondemand.fetch.worker.threads||Size of the I/O thread pool. With sync client, threads do the I/O themselves; with async client, threads process I/O completion callbacks||5 * numCore|
After adding these configs, restart any existing servers for changes to take effect on them.
Update table config
- Ensure you have set timeColumn in the tableConfig.
- Ensure you don’t have any startree-index or text-index, as this is not supported yet (you can still enable tiered storage for such tables, but these 2 indexes will be ignored)
tierConfigsto the tableConfig. The
segmentAgewill be calculated using the primary
timeColumnNameas set in tableConfig:
On adding this configuration, a periodic task will move segments to the right tier, as and when the the segments cross the segmentAge. For setup 2.1, the periodic task is called
LocalTierMigrator and it runs on the servers (configs to enable and change frequency in Step 3). For setup 2.2, the periodic task is called
SegmentRelocator and it runs on the controller (configs to enable and change frequency here. Both run at hourly cadence by default.
|name||Set this to anything you want, this is more for referring to this tier uniquely.|
|segmentSelectorType||Can be |
|segmentAge||Applicable if |
|segmentList||Applicable if |
|storageType||Has to be |
|serverTag||If using setup 2.1, set this to table's server tag. If using setup 2.2, set this to the tag of the remote tier created in Step 2.2|
|tierBackend||Has to be |
|tierBackendProperties||Set these as per your S3 details. Supported properties are |
This segmentAge setting is independent of the table's retention. Segments will continue to be forgotten from Pinot once the table's retention is reached, regardless of what tier the segments are on.
Upload new segments + migrate existing segments
New segments - When you upload segments, they first land on the DefaultTenant_OFFLINE servers, and are only periodically moved to the new tenant and S3 by the periodic tasks.
Existing segments - When the periodic task runs, it will also move the existing segments over to the right tier.
Validate and Query
After uploading segments, for those that move to remote tenant, verify the following:
- Segments are not present on the pinot-server’s local disk (in dataDir and indexDir)
- Segments are present in the S3 bucket used, in uncompressed form
When querying, increase timeout of your query
- Restart/reload of your server will take much longer than it did before this feature, depending on the number of segments on S3. This is because when loading the segment on the server, it needs to make a call to S3.
- Make sure to set the
pinot.helix.instance.state.maxStateTransitionsif doing any schema evolution. Change to the schema will result in the servers downloading the segments onto the servers for the segment processing.
- As of now, we do not have the ability to automatically delete segments from S3 after retention/deletion. This is different from deep store segments, which will continue to follow the segment lifecycle as it would.
S3 client configs
Sometime, there may be need to fine tune the S3 client, like to tolerate longer request latency or use less threads
for I/O. Below is a list of current configs to customize S3 clients when fetching data from remote segments. All those
configs are put inside
tierBackendProperties. Most of them need to reload the table to get into effect. Restarting
servers also works, but overall we would like to make configs able to apply more dynamically at runtime.
We support both sync and async s3 client. By default, sync s3 client is used and a thread pool is created to fetch data
via the sync s3 clients in parallel without blocking the query processing threads. The configs about sync s3 client are
s3client.http. The async s3 client can reduce the size of the thread pool aforementioned a lot. It uses
async I/O under the hood to fetch data in parallel in a non-blocking manner; and use the thread pool mentioned above to
process the I/O completion callbacks only. The configs about async s3 client are prefixed with
Configs prefixed with
s3client.general apply to both kinds of clients.
In general, we try to expose the configs supported by S3lient or AsyncS3Client as many and directly as possible, and
leave their default values and behaviors as is. According to the use cases, like how many segments may be accessed
during a query execution, the
maxConcurrency might be increased, together with the size of I/O
thread pool controlled by
ondemand.fetch.worker.threads explained earlier on, in order to reduce query latency. The
configs on request timeout are usually left to default values, tolerating delays to some extent. But they can be
decreased to fail query faster to be more responsive, although as failure.
|s3client.general.apiCallTimeout||Timeout for e2e API call, which may involve a few http request retries.||no limit|
|s3client.general.apiCallAttemptTimeout||Timeout for a single http request.||no limit|
|s3client.general.numRetries||How many times to retry the http request, with exponential backoff and jitter.||3|
|s3client.general.readBufferSize||Size a buffer to stream data into buffers for query processing.||8KB|
|s3client.http.maxConnections||The max number of connections to pool for reuse||50|
|s3client.http.socketTimeout||Timeout to read/write data||30s|
|s3client.http.connectionTimeout||Timeout to establish connection||2s|
|s3client.http.connectionTimeToLive||Timeout to reclaim the idle connection||0s|
|s3client.http.connectionAcquireTimeout||Timeout to get a connection from pool||10s|
|s3client.http.connectionMaxIdleTimeout||Timeout to mark a connection as idle to be candidate to reclaim||60s|
|s3client.http.reapIdleConnections||Reclaim the idle connection in pool||true|
|s3client.asynchttp.maxConcurrency||The max number of concurrent requests allowed. We use HTTP/1.1, so this controls max connections.||50|
|s3client.asynchttp.readTimeout||Timeout to read data||30s|
|s3client.asynchttp.writeTimeout||Timeout to write data||30s|
|s3client.asynchttp.connectionTimeout||Timeout to establish connection||2s|
|s3client.asynchttp.connectionTimeToLive||Timeout to reclaim the idle connection||0s|
|s3client.asynchttp.connectionAcquireTimeout||Timeout to get a connection from pool||10s|
|s3client.asynchttp.connectionMaxIdleTimeout||Timeout to mark a connection as idle to be candidate to reclaim||60s|
|s3client.asynchttp.reapIdleConnections||Reclaim the idle connection in pool||true|
Below are metrics we emit to help understand how tiered storage works. The metrics mainly cover: 1) how segments get uploaded to the tier backend, like data volume and operation duration, rates or failures; 2) how queries fetch data from the remote segments, like data volume and query duration breakdown over query operators, query rates or failures; 3) how segment data is kept on servers temporarily, like data volume from segments temporarily held in memory or on disk.
|pinot_server_s3Segments_UploadedCount||How many segments uploaded to S3 tier backend when reloading segment.|
|pinot_server_s3Segments_UploadedSizeBytes||How many bytes in total uploaded to S3 tier backend when reloading segment.|
|pinot_server_s3SegmentUploadTimeMs||How long segment uploading takes when reloading segment.|
|pinot_server_s3Segments_DownloadedCount||How many segments downloaded to server when reloading segment.|
|pinot_server_s3Segments_DownloadedSizeBytes||How many bytes in total downloaded to server when reloading segment.|
|pinot_server_s3SegmentDownloadTimeMs||How long segment downloading takes when reloading segment (this is not on query path).|
|pinot_server_totalPrefetch_99thPercentile/OneMinuteRate||Rate and time spent to kick off data prefetching, and should be low. The prefetching is done in async. The metrics is also broken down for cache layers: |
|pinot_server_totalAcquire_99thPercentile/OneMinuteRate||Rate and time spent to wait for segment data to be available and may be varying based on the effectiveness of prefetching. The metrics is also broken down for cache layers: |
|pinot_server_totalReleas_99thPercentile/OneMinuteRate||Rate and time spent to release the segment data, and should be low. The metrics is also broken down for cache layers: |
|pinot_server_bufferPrefetchExceptions_OneMinuteRate||Error rate when prefetching segment data. This metrics is also broken down for cache layers: |
|pinot_server_bufferAcquireExceptions_OneMinuteRate||Error rate when waiting to acquire segment data. This metrics is also broken down for cache layers: |
|pinot_server_bufferReleaseExceptions_OneMinuteRate||Error rate when releasing the prefetched segment data. This metrics is also broken down for cache layers: |
|pinot_server_ondemandCacheLayerReserve_99thPercentile||Time for prefetching threads to wait for ondemand memory space to be available to fetch data. If this is high, try to config more ondemand memory or adjust the query to access less segments like making query predicates more selective.|
|pinot_server_ondemandCacheLayerBufferDownloadBytes_OneMinuteRate||Bytes rate of data prefetching.|
|pinot_server_ondemandCacheLayerSingleBufferFetchersCount_OneMinuteRate||Number of column index pairs being prefetched.|
|pinot_server_ondemandCacheLayerBufferDownloadBytes_OneMinuteRate||Bytes rate of data prefetching.|
Other than those emit from tiered storage modules, the cpu util and network bandwidth util are also very important to fine tune the system. The set of metrics are updated frequently as we are enhancing the system. You can try to explore the metrics from tiered storage via Grafana.
- GCS support
- Support for cleanup of segments in S3 past the table retention
- More optimizations for query latency
- Star-tree index support