Skip to main content

Google Cloud Pubsub Ingestion

Pre-requisites

The following settings need to be configured so that messages can be consumed correctly from a Pubsub topic.

Pubsub Topic Settings

The pubsub topic containing the data to be consumed should have the following configurations:

  1. Topic retention must be enabled. Otherwise, data prior to the creation of the subscriber will be lost.
  2. Topic's message storage policy must configured to use a single region. This is required for ordering guarantees in pubsub.

Pubsub Publisher Settings

The published data should contain certain attributes to work correctly with Pinot.

  1. If data is expected to be partitioned (See attributeFilterKey for more info), then a user-attribute representing the partition ID should be added to each Pubsub message. This attribute key should be specified in the streams configuration.
  2. Set orderingKey in the Pubsub message to be the same as the partitionId value used above.

Pinot Table Configuration

Apart from the common realtime table configuration, there are some extra config values that you need to setup.

segmentsConfig

Segment completion should be set to DOWNLOAD in order to avoid errors during the segment completion protocol.

"completionConfig" : {
"completionMode": "DOWNLOAD"
}

tableConfig

The following streams configs should also be configured:

Config PropertyTypeRequiredDescription
stream.pubsub.consumer.factory.class.nameStringYesClass name of the pubsub consumer factory. Allowed values: ai.startree.pinot.plugin.stream.pubsub.PubsubConsumerFactory
gcpProjectString, should not contain underscore (_)YesProject Identifier configured in GCP
topicIdStringYesPubsub Topic Name
numPartitionsIntegerNoNumber of partitions imposed on the pubsub topic. This is required when attributeFilterKey is set. Otherwise, the value is assumed to be 1.
attributeFilterKeyStringNoDefault value is null. If not set, the pubsub subscription will not use attribute-based filter function and will overwrite the configured numPartitions to 1.
When configured, this config provides the name of attribute key that will be used in the subscription's filter.
This configuration has the following side-effects:
1. numPartitions has to be configured and cannot be null.
2. The data published to the topic should always contain this attribute in its attributes map and its valid values are [0, numPartitions).
Failing to satisfy the above condition may cause data loss.
maxConsumedMessagesIntegerNoRepresents the max number of messages that can be consumed and remain unack'd in the consumer. Default value is 10000
queueOperationTimeoutMsIntegerNoUsed for accessing the consumer's internal queues. Default value is 10ms"

Environment variables

The PubsubConsumerFactory requires setting up an environmental variable called GOOGLE_APPLICATION_CREDENTIALS which contains the path to the GCP credentials file. Alternatively, the path to the GCP credentials file can be set using the config property credentialsFile. The former approach is strongly recommended.

Google Pubsub emulator

Google cloud SDK provides an emulator that can be used for local testing. To install the emulator, see the Testing apps locally with the emulator documentation.

All configuration is the same as above, except the following:

Config PropertyValue
stream.pubsub.consumer.factory.class.nameai.startree.pinot.plugin.stream.pubsub.EmulatorPubsubConsumerFactory

The environment variable GOOGLE_APPLICATION_CREDENTIALS is not required. Instead, the environmental variable PUBSUB_EMULATOR_HOST should be set to the emulator endpoint in the form <hostname>:<port>.

Tuning Recommendations

maxConsumedMessages

This config is used for flow control as the implementation uses an asynchronous pull-based subscription. If there are more unack'd messages in the subscriber's buffer, there is an increased chance of re-delivery in pubsub. If you notice a lot of log warnings about duplicate messages being received, you can tune down this number.

If there are a lot of log warnings about buffer full causing message drops, this value of this config may be increased.

In general, this number can be set such that it is less than or equal to the max number of rows expected in a segment. This threshold applies across all segments assigned to that server. For more details, see Configuring the segment threshold.

queueOperationTimeout

This timeout is used by shared internal pubsub queues. The default value should be sufficient for most cases as there isn't expected to be a lot of contention. If the maxConsumedMessages is configured correctly, then the queue will have sufficient capacity for buffering.