Google Cloud Pubsub Ingestion
The following settings need to be configured so that messages can be consumed correctly from a Pubsub topic.
Pubsub Topic Settings
The pubsub topic containing the data to be consumed should have the following configurations:
- Topic retention must be enabled. Otherwise, data prior to the creation of the subscriber will be lost.
- Topic's message storage policy must configured to use a single region. This is required for ordering guarantees in pubsub.
Pubsub Publisher Settings
The published data should contain certain attributes to work correctly with Pinot.
- If data is expected to be partitioned (See
attributeFilterKeyfor more info), then a user-attribute representing the partition ID should be added to each Pubsub message. This attribute key should be specified in the streams configuration.
orderingKeyin the Pubsub message to be the same as the
partitionIdvalue used above.
Pinot Table Configuration
Apart from the common realtime table configuration, there are some extra config values that you need to setup.
Segment completion under
segmentsConfig should be set to
DOWNLOAD in order to avoid errors during the segment completion protocol.
The following streams configs are available for configuration:
|stream.pubsub.consumer.factory.class.name||String||Yes||Class name of the pubsub consumer factory. Allowed values: |
|stream.pubsub.idle.timeout.millis*||Integer||No||Allowed idle time for the stream. If this threshold is crossed, Pinot will close the current consumer and instantiate a new one. A higher value is recommended to avoid excessive admin operations that occurs during pubsub consumer creation. This behavior can be disable by using value |
|gcpProject||String, should not contain underscore (||Yes||Project Identifier configured in GCP|
|topicId||String||Yes||Pubsub Topic Name|
|credentialsFile||String||No||File name of the GCP credentials file. This config is recommended only for developmental purposes. See Environmental Variables|
|numPartitions||Integer||No||Number of partitions imposed on the pubsub topic. This is required when |
|attributeFilterKey||String||No||Default value is |
When configured, this config provides the name of attribute key that will be used in the subscription's filter.
This configuration has the following side-effects:
2. The data published to the topic should always contain this attribute in its attributes map and its valid values are [
Failing to satisfy the above condition may cause data loss.
|maxConsumedMessages||Integer||No||Represents the max number of messages that can be indexed in a single Pinot fetch call. Default value is 10000|
|queueOperationTimeoutMs||Integer||No||Used for accessing the consumer's internal queues. Default value is 10ms"|
|checkpointRetry*||Integer||No||Pubsub consumer implementation performs validation on the snapshot images taken during segment commit to ensure that all indexed messages are acknowledged and not re-delivered. This configuration specifies the max number of snapshot attempts that the consumer may take for this validation. Default is 10.|
|subscription.ackDeadlineSeconds*||Integer||No||Specifies the deadline after which an unacknowledged message is sent again to the pubsub client. See Acknowledgment Deadline for more details. Default value used by the Pubsub connector is 600 seconds.|
|flowcontrol.maxOutstandingCount*||Integer||No||Controls the max number of messages that the subscriber receives before pausing the stream. This will be used as the |
|enableRetry*||Boolean||No||Controls the request retries for Pubsub client on admin operations like create/get subscription/snapshot. Default is |
|exponentialRetry.maxAttempts*||Integer||Yes, when ||Max number of attempts for an exponential back-off retry policy|
|exponentialRetry.initialDelayMs*||Long||Yes, when ||Initial delay time for an exponential back-off retry policy|
|exponentialRetry.delayScaleFactor*||Double||Yes, when ||Scaling factor for an exponential back-off retry policy|
* Available since st-distribution 0.4.0
PubsubConsumerFactory requires setting up an environmental variable called
GOOGLE_APPLICATION_CREDENTIALS which contains the path to the GCP credentials file.
Alternatively, the path to the GCP credentials file can be set using the config property
The former approach is strongly recommended.
Google Pubsub emulator
Google cloud SDK provides an emulator that can be used for local testing. To install the emulator, see the Testing apps locally with the emulator documentation.
All configuration is the same as above, except the following:
The environment variable
GOOGLE_APPLICATION_CREDENTIALS is not required.
Instead, the environmental variable
PUBSUB_EMULATOR_HOST should be set to the emulator endpoint in the form
Config Tuning Recommendations
This config is used for the pubsub subscription and specifies the deadline after which unacknowledged message is sent again to the pubsub client. It is recommended to use the max allowed value as Pinot consumer acknowledges messages in batches. Default value is 600 seconds (allowed max in pubsub client). Ideally, there should be no need to tune this config.
This config is used for flow control as the consumer implementation uses an asynchronous pull-based subscription. If there are more unack'd messages in the subscriber's buffer, there is an increased chance of re-delivery in pubsub. Hence, if you notice a lot of log warnings about duplicate message received, you can tune down this number.
This config is used to control the number of messages processed (ie. record extracted and indexed into an in-memory pinot segment) in a batch. Message flow in pubsub is adaptive and varies over time. Hence, this value can be tuned in cases where indexing time is large and causes message re-delivery from pubsub.
This timeout is used by shared internal pubsub queues. The default value should be sufficient for most cases as there
isn't expected to be a lot of contention. If the
maxConsumedMessages is configured correctly, then the queue will
have sufficient capacity for buffering.