Google Cloud Pubsub Ingestion
Prerequisites
The following settings need to be configured so that messages can be consumed correctly from a Pubsub topic.
Pubsub Topic Settings
The pubsub topic containing the data to be consumed should have the following configurations:
- Topic retention must be enabled. Otherwise, data prior to the creation of the subscriber will be lost.
- Topic's message storage policy (opens in a new tab) must configured to use a single region. This is required for ordering guarantees in pubsub.
Pubsub Publisher Settings
The published data should contain certain attributes to work correctly with Pinot.
- If data is expected to be partitioned (See
attributeFilterKey
for more info), then a user-attribute representing the partition ID should be added to each Pubsub message (opens in a new tab). This attribute key should be specified in the streams configuration. - Set
orderingKey
in the Pubsub message to be the same as thepartitionId
value used above.
Pinot Table Configuration
Apart from the common real-time table configuration (opens in a new tab), there are some extra config values that you need to setup.
Segment Configs
Segment completion under segmentsConfig
should be set to DOWNLOAD
in order to avoid errors during the segment completion protocol.
{
"tableName": "<tableName>",
...,
"segmentsConfig": {
"completionConfig": {
"completionMode": "DOWNLOAD"
},
...
}
}
Stream Configs
The following streams configs are available for configuration:
Config Property | Type | Required | Description |
---|---|---|---|
stream.pubsub.consumer.factory.class.name | String | Yes | Class name of the pubsub consumer factory. Allowed values: ai.startree.pinot.plugin.stream.pubsub.PubsubConsumerFactory |
stream.pubsub.idle.timeout.millis***** | Integer | No | Allowed idle time for the stream. If this threshold is crossed, Pinot will close the current consumer and instantiate a new one. A higher value is recommended to avoid excessive admin operations that occurs during pubsub consumer creation. This behavior can be disable by using value -1 . Default is 3 minutes. |
gcpProject | String, should not contain underscore (_ ) | Yes | Project Identifier configured in GCP |
topicId | String | Yes | Pubsub Topic Name |
credentialsFile | String | No | File name of the GCP credentials file. This config is recommended only for developmental purposes. See Environmental Variables |
numPartitions | Integer | No | Number of partitions imposed on the pubsub topic. This is required when attributeFilterKey is set. Otherwise, the value is assumed to be 1 . |
attributeFilterKey | String | No | Default value is null . If not set, the pubsub subscription will not use attribute-based filter function and will overwrite the configured numPartitions to 1 . When configured, this config provides the name of attribute key that will be used in the subscription's filter. This configuration has the following side-effects: 1. numPartitions has to be configured and cannot be null. 2. The data published to the topic should always contain this attribute in its attributes map and its valid values are [ 0 , numPartitions ). Failing to satisfy the above condition may cause data loss. |
maxConsumedMessages | Integer | No | Represents the max number of messages that can be indexed in a single Pinot fetch call. Default value is 10000 |
queueOperationTimeoutMs | Integer | No | Used for accessing the consumer's internal queues. Default value is 10ms" |
checkpointRetry***** | Integer | No | Pubsub consumer implementation performs validation on the snapshot images taken during segment commit to ensure that all indexed messages are acknowledged and not re-delivered. This configuration specifies the max number of snapshot attempts that the consumer may take for this validation. Default is 10. |
subscription.ackDeadlineSeconds***** | Integer | No | Specifies the deadline after which an unacknowledged message is sent again to the pubsub client. See Acknowledgment Deadline (opens in a new tab) for more details. Default value used by the Pubsub connector is 600 seconds. |
flowcontrol.maxOutstandingCount***** | Integer | No | Controls the max number of messages that the subscriber receives before pausing the stream. This will be used as the maxOutstandingCount in the Pubsub client's FlowControlSettings . Read more about flow control in pubsub here (opens in a new tab). If unspecified, the pubsub client will use its own defaults. |
enableRetry***** | Boolean | No | Controls the request retries for Pubsub client on admin operations like create/get subscription/snapshot. Default is false |
exponentialRetry.maxAttempts***** | Integer | Yes, when enableRetry is true | Max number of attempts for an exponential back-off retry policy |
exponentialRetry.initialDelayMs***** | Long | Yes, when enableRetry is true | Initial delay time for an exponential back-off retry policy |
exponentialRetry.delayScaleFactor***** | Double | Yes, when enableRetry is true | Scaling factor for an exponential back-off retry policy |
* Available since st-distribution 0.4.0
Environment variables
The PubsubConsumerFactory
requires setting up an environmental variable called GOOGLE_APPLICATION_CREDENTIALS
which contains the path to the GCP credentials file.
Alternatively, the path to the GCP credentials file can be set using the config property credentialsFile
.
The former approach is strongly recommended.
Google Pubsub emulator
Google cloud SDK provides an emulator that can be used for local testing. To install the emulator, see the Testing apps locally with the emulator (opens in a new tab) documentation.
All configuration is the same as above, except the following:
Config Property | Value |
---|---|
stream.pubsub.consumer.factory.class.name | ai.startree.pinot.plugin.stream.pubsub.EmulatorPubsubConsumerFactory |
The environment variable GOOGLE_APPLICATION_CREDENTIALS
is not required.
Instead, the environmental variable PUBSUB_EMULATOR_HOST
should be set to the emulator endpoint in the form <hostname>:<port>
.
Config Tuning Recommendations
subscription.ackDeadlineSeconds
This config is used for the pubsub subscription and specifies the deadline after which unacknowledged message is sent again to the pubsub client. It is recommended to use the max allowed value as Pinot consumer acknowledges messages in batches. Default value is 600 seconds (allowed max in pubsub client). Ideally, there should be no need to tune this config.
subscription.maxOutstandingCount
This config is used for flow control as the consumer implementation uses an asynchronous pull-based subscription. If there are more unack'd messages in the subscriber's buffer, there is an increased chance of re-delivery in pubsub. Hence, if you notice a lot of log warnings about duplicate message received, you can tune down this number.
maxConsumedMessages
This config is used to control the number of messages processed (ie. record extracted and indexed into an in-memory pinot segment) in a batch. Message flow in pubsub is adaptive and varies over time. Hence, this value can be tuned in cases where indexing time is large and causes message re-delivery from pubsub.
queueOperationTimeout
This timeout is used by shared internal pubsub queues. The default value should be sufficient for most cases as there
isn't expected to be a lot of contention. If the maxConsumedMessages
is configured correctly, then the queue will
have sufficient capacity for buffering.