DynamoDB Message Decoder
DynamoDB provides Change Data Capture (CDC) capabilities through DynamoDB Streams, which capture data modifications in DynamoDB tables. The generated CDC data is written to streaming systems like Kafka and made available in real-time for downstream applications.
Native support for the DynamoDB data format in Pinot allows users to consume CDC data in real-time from DynamoDB tables without complex transformations. As long as the data is available in any of Pinot's supported streaming connectors, it can be ingested into a Pinot table.
DynamoDB Message Decoder Configurations
To configure a Pinot table to use a DynamoDB formatted streaming source, Pinot provides a decoder - ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder
.
The properties of this decoder are listed below:
Configuration Key | Description |
---|---|
decoder.class.name | Specifies the primary decoder for DynamoDB messages. Set this to ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder to enable DynamoDB CDC ingestion. |
dynamodb.timeColumnName | The column name where the ApproximateCreationDateTime from the DynamoDB JSON record should be stored. This timestamp can be used as the table's default time column. |
dynamodb.deleteColumnName | The column name that will be set to true when a REMOVE record is received from DynamoDB, and false otherwise. This helps track deletion events in your Pinot table. |
dynamodb.envelope.decoder.class.name | Specifies the underlying decoder used to parse the message format. Since DynamoDB messages are in JSON format, this should typically be set to org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder . |
dynamodb.envelope.decoder.prop. | Prefix to be used for any properties associated with the envelope decoder class. |
Sample Stream Configuration
When ingesting a DynamoDB formatted payload from a stream, the decoder used for the stream must be ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder
.
The following is an example stream config where the Pinot table is consuming from a JSON-encoded Kafka topic containing DynamoDB CDC payload:
{
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "dynamodb-cdc-topic",
"stream.kafka.decoder.class.name": "ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2181",
"stream.kafka.broker.list": "localhost:9092",
"realtime.segment.flush.threshold.time": "1h",
"realtime.segment.flush.threshold.size": "100",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.decoder.prop.dynamodb.timeColumnName": "created_at_timestamp",
"stream.kafka.decoder.prop.dynamodb.deleteColumnName": "is_deleted",
"stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
}
}
In the above sample, the Kafka consumer factory used is org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
and the decoder associated with this stream is ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder
.
The configuration uses several key components:
- The primary decoder
ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder
handles the DynamoDB-specific message format - The
dynamodb.timeColumnName
is populated with theApproximateCreationDateTime
from the DynamoDB JSON record - The
dynamodb.deleteColumnName
is set totrue
whenREMOVE
records are received from DynamoDB - The
dynamodb.envelope.decoder.class.name
is set toorg.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
since the underlying DynamoDB messages are in JSON format
Simplified Configuration
The DynamoDB Message Decoder significantly simplifies the configuration needed for ingesting DynamoDB CDC data. Without this decoder, you would need to specify multiple JSON path transformations for each column. Here's a comparison:
Without DynamoDB Decoder
{
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "rsvp_count",
"transformFunction": "JSONPATHLONG(dynamodb, '$.NewImage.rsvp_count.N')"
},
{
"columnName": "venue_name",
"transformFunction": "JSONPATHSTRING(dynamodb, '$.NewImage.venue_name.S')"
},
{
"columnName": "group_city",
"transformFunction": "JSONPATHSTRING(dynamodb, '$.NewImage.group_city.S')"
}
// ... more transform configs for each column
]
}
}
With DynamoDB Decoder
{
"streamConfigs": {
"stream.kafka.decoder.class.name": "ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder",
"stream.kafka.decoder.prop.dynamodb.timeColumnName": "created_at_timestamp",
"stream.kafka.decoder.prop.dynamodb.deleteColumnName": "is_deleted",
"stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
}
}
The decoder automatically handles the complex DynamoDB JSON structure, including the type indicators (S, N, B) and nested structure, making it much easier to ingest DynamoDB CDC data into Pinot.
Properties related to a stream's decoder are prefixed with stream.$streamType.decoder.prop.
. For the Dynamodb message decoder all properties begin with stream.kafka.decoder.prop
, as seen above in the stream.kafka.decoder.prop.dynamodb.source
,
stream.kafka.decoder.prop.dynamodb.timeColumn
and stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name
properties.