Dynamodb Message Decoder

DynamoDB Message Decoder

DynamoDB provides Change Data Capture (CDC) capabilities through DynamoDB Streams, which capture data modifications in DynamoDB tables. The generated CDC data is written to streaming systems like Kafka and made available in real-time for downstream applications.

Native support for the DynamoDB data format in Pinot allows users to consume CDC data in real-time from DynamoDB tables without complex transformations. As long as the data is available in any of Pinot's supported streaming connectors, it can be ingested into a Pinot table.

DynamoDB Message Decoder Configurations

To configure a Pinot table to use a DynamoDB formatted streaming source, Pinot provides a decoder - ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder.

The properties of this decoder are listed below:

Configuration KeyDescription
decoder.class.nameSpecifies the primary decoder for DynamoDB messages. Set this to ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder to enable DynamoDB CDC ingestion.
dynamodb.timeColumnNameThe column name where the ApproximateCreationDateTime from the DynamoDB JSON record should be stored. This timestamp can be used as the table's default time column.
dynamodb.deleteColumnNameThe column name that will be set to true when a REMOVE record is received from DynamoDB, and false otherwise. This helps track deletion events in your Pinot table.
dynamodb.envelope.decoder.class.nameSpecifies the underlying decoder used to parse the message format. Since DynamoDB messages are in JSON format, this should typically be set to org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder.
dynamodb.envelope.decoder.prop.Prefix to be used for any properties associated with the envelope decoder class.

Sample Stream Configuration

When ingesting a DynamoDB formatted payload from a stream, the decoder used for the stream must be ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder.

The following is an example stream config where the Pinot table is consuming from a JSON-encoded Kafka topic containing DynamoDB CDC payload:

{
  "streamConfigs": {
    "streamType": "kafka",
    "stream.kafka.consumer.type": "simple",
    "stream.kafka.topic.name": "dynamodb-cdc-topic",
    "stream.kafka.decoder.class.name": "ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder",
    "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
    "stream.kafka.zk.broker.url": "localhost:2181",
    "stream.kafka.broker.list": "localhost:9092",
    "realtime.segment.flush.threshold.time": "1h",
    "realtime.segment.flush.threshold.size": "100",
    "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
    "stream.kafka.decoder.prop.dynamodb.timeColumnName": "created_at_timestamp",
    "stream.kafka.decoder.prop.dynamodb.deleteColumnName": "is_deleted",
    "stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
  }
}

In the above sample, the Kafka consumer factory used is org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory and the decoder associated with this stream is ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder.

The configuration uses several key components:

  • The primary decoder ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder handles the DynamoDB-specific message format
  • The dynamodb.timeColumnName is populated with the ApproximateCreationDateTime from the DynamoDB JSON record
  • The dynamodb.deleteColumnName is set to true when REMOVE records are received from DynamoDB
  • The dynamodb.envelope.decoder.class.name is set to org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder since the underlying DynamoDB messages are in JSON format

Simplified Configuration

The DynamoDB Message Decoder significantly simplifies the configuration needed for ingesting DynamoDB CDC data. Without this decoder, you would need to specify multiple JSON path transformations for each column. Here's a comparison:

Without DynamoDB Decoder

{
  "ingestionConfig": {
    "transformConfigs": [
      {
        "columnName": "rsvp_count",
        "transformFunction": "JSONPATHLONG(dynamodb, '$.NewImage.rsvp_count.N')"
      },
      {
        "columnName": "venue_name",
        "transformFunction": "JSONPATHSTRING(dynamodb, '$.NewImage.venue_name.S')"
      },
      {
        "columnName": "group_city",
        "transformFunction": "JSONPATHSTRING(dynamodb, '$.NewImage.group_city.S')"
      }
      // ... more transform configs for each column
    ]
  }
}

With DynamoDB Decoder

{
  "streamConfigs": {
    "stream.kafka.decoder.class.name": "ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder",
    "stream.kafka.decoder.prop.dynamodb.timeColumnName": "created_at_timestamp",
    "stream.kafka.decoder.prop.dynamodb.deleteColumnName": "is_deleted",
    "stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
  }
}

The decoder automatically handles the complex DynamoDB JSON structure, including the type indicators (S, N, B) and nested structure, making it much easier to ingest DynamoDB CDC data into Pinot.

Properties related to a stream's decoder are prefixed with stream.$streamType.decoder.prop.. For the Dynamodb message decoder all properties begin with stream.kafka.decoder.prop, as seen above in the stream.kafka.decoder.prop.dynamodb.source, stream.kafka.decoder.prop.dynamodb.timeColumn and stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name properties.