Debezium Message Decoder

Debezium Message Decoder

Debezium provides a unified format for Change Data Capture (CDC) data from various database sources like MySQL, MongoDB, Postgres etc. The generated CDC data is written to a streaming system like Kafka and Kinesis and, thereafter, made available in real-time for downstream applications.

Native support for Debezium data format in Pinot allows users to consumer CDC data in real time from traditional OLTP / NoSQL stores in a source-agnostic manner. As long as the data is available in any of Pinot's supported streaming connectors, it can be ingestion into a Pinot table.

Debezium Message Decoder Configurations

In order to configure a Pinot table to use a Debezium formatted streaming source, Pinot provides a decoder - ai.startree.pinot.plugin.inputformat.debezium.DebeziumMessageDecoder.

The properties of this decoder are listed below:

Configuration KeyDescription
dbz.sourceSpecifies the source database used for CDC.
Allowed values: mysql, postgres
dbz.envelope.decoder.class.nameSpecifies the envelope decoder class name to decode the debezium payload in the stream into a pinot record.
Allowed values - any of the supported StreamMessageDecoder classes in Pinot
dbz.envelope.decoder.prop.Prefix to be used for any of the properties associated with the envelope decoder class specified by dbz.envelope.decoder.class.name
dbz.source.timeColumnName(Optional) If the source DB's event timestamp has to be extracted as a pinot table column, the column name should be specified here.
This is useful when using CDC data with upsert-enabled pinot table. This timestamp can be used as the table's default time column, if the payload itself doesn't have a time column.

Sample Stream Configuration

When ingesting a Debezium formatted payload from a stream, the decoder used for the stream must be ai.startree.pinot.plugin.inputformat.debezium.DebeziumMessageDecoder.

The following is an example stream config where the Pinot table is consuming from a JSON-encoded Kafka topic containing Debezium CDC payload from a MySQL source DB.

{
  "streamConfigs": {
    "streamType": "kafka",
    "stream.kafka.consumer.type": "simple",
    "stream.kafka.topic.name": "test.world.city",
    "stream.kafka.decoder.class.name": "ai.startree.pinot.plugin.inputformat.debezium.DebeziumMessageDecoder",
    "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
    "stream.kafka.zk.broker.url": "localhost:2181",
    "stream.kafka.broker.list": "localhost:9092",
    "realtime.segment.flush.threshold.time": "1h",
    "realtime.segment.flush.threshold.size": "100",
    "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
    "stream.kafka.decoder.prop.dbz.source": "mysql",
    "stream.kafka.decoder.prop.cdc.timeColumn": "created_at_timestamp",
    "stream.kafka.decoder.prop.dbz.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
  }
}

In the above sample, the Kafka consumer factory used is org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory and the decoder associated with this stream is ai.startree.pinot.plugin.inputformat.debezium.DebeziumMessageDecoder.

Since it is a JSON-encoded debezium payload, the dbz.enveloper.decoder.class.name is configured as org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder.

Properties related to a stream's decoder are prefixed with stream.$streamType.decoder.prop.. For the Debezium message decoder all properties begin with stream.kafka.decoder.prop, as seen above in the stream.kafka.decoder.prop.dbz.source, stream.kafka.decoder.prop.cdc.timeColumn and stream.kafka.decoder.prop.dbz.envelope.decoder.class.name properties.