Google Cloud Pubsub

Google Cloud Pubsub Ingestion

Prerequisites

The following settings need to be configured so that messages can be consumed correctly from a Pubsub topic.

Pubsub Topic Settings

The pubsub topic containing the data to be consumed should have the following configurations:

  1. Topic retention must be enabled. Otherwise, data prior to the creation of the subscriber will be lost.
  2. Topic's message storage policy (opens in a new tab) must configured to use a single region. This is required for ordering guarantees in pubsub.

Pubsub Publisher Settings

The published data should contain certain attributes to work correctly with Pinot.

  1. If data is expected to be partitioned (See attributeFilterKey for more info), then a user-attribute representing the partition ID should be added to each Pubsub message (opens in a new tab). This attribute key should be specified in the streams configuration.
  2. Set orderingKey in the Pubsub message to be the same as the partitionId value used above.

Pinot Table Configuration

Apart from the common real-time table configuration (opens in a new tab), there are some extra config values that you need to setup.

Segment Configs

Segment completion under segmentsConfig should be set to DOWNLOAD in order to avoid errors during the segment completion protocol.

{
  "tableName": "<tableName>",
  ...,
  "segmentsConfig": {
    "completionConfig": {
      "completionMode": "DOWNLOAD"
    }, 
    ...
  }
}

Stream Configs

The following streams configs are available for configuration:

Config PropertyTypeRequiredDescription
stream.pubsub.consumer.factory.class.nameStringYesClass name of the pubsub consumer factory. Allowed values: ai.startree.pinot.plugin.stream.pubsub.PubsubConsumerFactory
stream.pubsub.idle.timeout.millis*****IntegerNoAllowed idle time for the stream. If this threshold is crossed, Pinot will close the current consumer and instantiate a new one. A higher value is recommended to avoid excessive admin operations that occurs during pubsub consumer creation. This behavior can be disable by using value -1 . Default is 3 minutes.
gcpProjectString, should not contain underscore (_)YesProject Identifier configured in GCP
topicIdStringYesPubsub Topic Name
credentialsFileStringNoFile name of the GCP credentials file. This config is recommended only for developmental purposes. See Environmental Variables
numPartitionsIntegerNoNumber of partitions imposed on the pubsub topic. This is required when attributeFilterKey is set. Otherwise, the value is assumed to be 1.
attributeFilterKeyStringNoDefault value is null. If not set, the pubsub subscription will not use attribute-based filter function and will overwrite the configured numPartitions to 1.
When configured, this config provides the name of attribute key that will be used in the subscription's filter.
This configuration has the following side-effects:
1. numPartitions has to be configured and cannot be null.
2. The data published to the topic should always contain this attribute in its attributes map and its valid values are [0, numPartitions).
Failing to satisfy the above condition may cause data loss.
maxConsumedMessagesIntegerNoRepresents the max number of messages that can be indexed in a single Pinot fetch call. Default value is 10000
queueOperationTimeoutMsIntegerNoUsed for accessing the consumer's internal queues. Default value is 10ms"
checkpointRetry*****IntegerNoPubsub consumer implementation performs validation on the snapshot images taken during segment commit to ensure that all indexed messages are acknowledged and not re-delivered. This configuration specifies the max number of snapshot attempts that the consumer may take for this validation. Default is 10.
subscription.ackDeadlineSeconds*****IntegerNoSpecifies the deadline after which an unacknowledged message is sent again to the pubsub client. See Acknowledgment Deadline (opens in a new tab) for more details. Default value used by the Pubsub connector is 600 seconds.
flowcontrol.maxOutstandingCount*****IntegerNoControls the max number of messages that the subscriber receives before pausing the stream. This will be used as the maxOutstandingCount in the Pubsub client's FlowControlSettings. Read more about flow control in pubsub here (opens in a new tab). If unspecified, the pubsub client will use its own defaults.
enableRetry*****BooleanNoControls the request retries for Pubsub client on admin operations like create/get subscription/snapshot. Default is false
exponentialRetry.maxAttempts*****IntegerYes, when enableRetry is trueMax number of attempts for an exponential back-off retry policy
exponentialRetry.initialDelayMs*****LongYes, when enableRetry is trueInitial delay time for an exponential back-off retry policy
exponentialRetry.delayScaleFactor*****DoubleYes, when enableRetry is trueScaling factor for an exponential back-off retry policy

* Available since st-distribution 0.4.0

Environment variables

The PubsubConsumerFactory requires setting up an environmental variable called GOOGLE_APPLICATION_CREDENTIALS which contains the path to the GCP credentials file. Alternatively, the path to the GCP credentials file can be set using the config property credentialsFile. The former approach is strongly recommended.

Google Pubsub emulator

Google cloud SDK provides an emulator that can be used for local testing. To install the emulator, see the Testing apps locally with the emulator (opens in a new tab) documentation.

All configuration is the same as above, except the following:

Config PropertyValue
stream.pubsub.consumer.factory.class.nameai.startree.pinot.plugin.stream.pubsub.EmulatorPubsubConsumerFactory

The environment variable GOOGLE_APPLICATION_CREDENTIALS is not required. Instead, the environmental variable PUBSUB_EMULATOR_HOST should be set to the emulator endpoint in the form <hostname>:<port>.

Config Tuning Recommendations

subscription.ackDeadlineSeconds

This config is used for the pubsub subscription and specifies the deadline after which unacknowledged message is sent again to the pubsub client. It is recommended to use the max allowed value as Pinot consumer acknowledges messages in batches. Default value is 600 seconds (allowed max in pubsub client). Ideally, there should be no need to tune this config.

subscription.maxOutstandingCount

This config is used for flow control as the consumer implementation uses an asynchronous pull-based subscription. If there are more unack'd messages in the subscriber's buffer, there is an increased chance of re-delivery in pubsub. Hence, if you notice a lot of log warnings about duplicate message received, you can tune down this number.

maxConsumedMessages

This config is used to control the number of messages processed (ie. record extracted and indexed into an in-memory pinot segment) in a batch. Message flow in pubsub is adaptive and varies over time. Hence, this value can be tuned in cases where indexing time is large and causes message re-delivery from pubsub.

queueOperationTimeout

This timeout is used by shared internal pubsub queues. The default value should be sufficient for most cases as there isn't expected to be a lot of contention. If the maxConsumedMessages is configured correctly, then the queue will have sufficient capacity for buffering.