Debezium Message Decoder
Debezium provides a unified format for Change Data Capture (CDC) data from various database sources like MySQL, MongoDB, Postgres etc. The generated CDC data is written to a streaming system like Kafka and Kinesis and, thereafter, made available in real-time for downstream applications.
Native support for Debezium data format in Pinot allows users to consumer CDC data in real time from traditional OLTP / NoSQL stores in a source-agnostic manner. As long as the data is available in any of Pinot's supported streaming connectors, it can be ingestion into a Pinot table.
Debezium Message Decoder Configurations
In order to configure a Pinot table to use a Debezium formatted streaming source, Pinot provides a decoder -
ai.startree.pinot.plugin.inputformat.debezium.DebeziumMessageDecoder
.
The properties of this decoder are listed below:
Configuration Key | Description |
---|---|
dbz.source | Specifies the source database used for CDC. Allowed values: mysql , postgres |
dbz.envelope.decoder.class.name | Specifies the envelope decoder class name to decode the debezium payload in the stream into a pinot record. Allowed values - any of the supported StreamMessageDecoder classes in Pinot |
dbz.envelope.decoder.prop. | Prefix to be used for any of the properties associated with the envelope decoder class specified by dbz.envelope.decoder.class.name |
dbz.source.timeColumnName | (Optional) If the source DB's event timestamp has to be extracted as a pinot table column, the column name should be specified here. This is useful when using CDC data with upsert-enabled pinot table. This timestamp can be used as the table's default time column, if the payload itself doesn't have a time column. |
Sample Stream Configuration
When ingesting a Debezium formatted payload from a stream, the decoder used for the stream must be
ai.startree.pinot.plugin.inputformat.debezium.DebeziumMessageDecoder
.
The following is an example stream config where the Pinot table is consuming from a JSON-encoded Kafka topic containing Debezium CDC payload from a MySQL source DB.
{
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "test.world.city",
"stream.kafka.decoder.class.name": "ai.startree.pinot.plugin.inputformat.debezium.DebeziumMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2181",
"stream.kafka.broker.list": "localhost:9092",
"realtime.segment.flush.threshold.time": "1h",
"realtime.segment.flush.threshold.size": "100",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.decoder.prop.dbz.source": "mysql",
"stream.kafka.decoder.prop.cdc.timeColumn": "created_at_timestamp",
"stream.kafka.decoder.prop.dbz.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
}
}
In the above sample, the Kafka consumer factory used is org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
and
the decoder associated with this stream is ai.startree.pinot.plugin.inputformat.debezium.DebeziumMessageDecoder
.
Since it is a JSON-encoded debezium payload, the dbz.enveloper.decoder.class.name
is configured as org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
.
Properties related to a stream's decoder are prefixed with stream.$streamType.decoder.prop.
. For the Debezium message decoder all properties begin with stream.kafka.decoder.prop
, as seen above in the stream.kafka.decoder.prop.dbz.source
,
stream.kafka.decoder.prop.cdc.timeColumn
and stream.kafka.decoder.prop.dbz.envelope.decoder.class.name
properties.