Pause and Resume Consumption

How to pause and resume ingestion of a stream

Pause and resume ingestion of a stream for the following reasons:

  • There’s a problem with the underlying stream, and we need to restart the server, reset offsets, or recreate a topic
  • We want to ingest data from different streams into the same table.
  • Discover a mistake in the Pinot ingestion configuration, and Pinot is throwing exceptions and you're not able to ingest any more data.

To learn how to use the pauseConsumption and resumeConsumption APIs, which are used to pause and resume ingestion of a stream. watch the following video, or complete the tutorial below, starting with Prerequites.

For changes to the table configuration, you don't need to pause and resume; use the forceCommit API.

Prerequisites

You will need to install Docker (opens in a new tab) to follow the code examples in this guide.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/pause-resume

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Data generator

This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:

python datagen.py 2>/dev/null | head -n1 | jq

Output is shown below:

{
  "tsString": "2022-11-23T12:08:44.127481Z",
  "uuid": "e1c58795-a009-4e21-ae76-cdd66e090797",
  "count": 203
}

Kafka ingestion

We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool. We'll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

We can check that Kafka has some data by running the following command:

docker exec -it kafka-querysegment kafka-run-class.sh \
  kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic events

We'll see something like the following:

events:0:19960902

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

Now for the table config:

{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "ingestionConfig":{
      "transformConfigs": [
        {
            "columnName": "ts",
            "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SS''Z''')"
        }
      ]
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-pauseresume:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
      }
    },
    "tenants": {},
    "metadata": {}
  }

config/table.json

This highlighted section highlights a transformation function that has a subtle error. The second parameter passed to the FromDateTime function describes the format of the DateTime string, which we defined as:

YYYY-MM-dd''T''HH:mm:ss.SS''Z''

But tsString has values in the following format:

2022-11-23T12:08:44.127550Z

i.e., we don’t have enough S values - there should be 5 rather than 2.

We'll create the table by running the following:

docker run \
   --network forcecommit \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-forcecommit" \
    -exec

If we navigate to the Pinot UI, we'll notice that no records have been ingested. Instead our server logs are full of the following error:

2023/06/02 15:00:31.705 ERROR [LLRealtimeSegmentDataManager_events__0__0__20230602T1459Z] [events__0__0__20230602T1459Z] Caught exception while transforming the record: org.apache.pinot.spi.stream.StreamDataDecoderResult@19717934
java.lang.RuntimeException: Caught exception while evaluation transform function for column: ts
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:126) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:90) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.segment.creator.TransformPipeline.processPlainRow(TransformPipeline.java:97) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.segment.creator.TransformPipeline.processRow(TransformPipeline.java:92) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.processStreamEvents(LLRealtimeSegmentDataManager.java:559) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.consumeLoop(LLRealtimeSegmentDataManager.java:434) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:629) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: Caught exception while executing function: fromDateTime(tsString,'YYYY-MM-dd'T'HH:mm:ss.SS'Z'')
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:231) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	... 7 more
Caused by: java.lang.IllegalStateException: Caught exception while invoking method: public static long org.apache.pinot.common.function.scalar.DateTimeFunctions.fromDateTime(java.lang.String,java.lang.String) with arguments: [2023-06-02T15:59:58.186685Z, YYYY-MM-dd'T'HH:mm:ss.SS'Z']
	at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:130) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	... 7 more
Caused by: java.lang.reflect.InvocationTargetException
	at jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:127) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	... 7 more
Caused by: java.lang.IllegalArgumentException: Invalid format: "2023-06-02T15:59:58.186685Z" is malformed at "6685Z"
	at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.common.function.DateTimePatternHandler.parseDateTimeStringToEpochMillis(DateTimePatternHandler.java:38) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.common.function.scalar.DateTimeFunctions.fromDateTime(DateTimeFunctions.java:271) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:127) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	... 7 more

The Pause/Resume workflow

We're going to run the following workflow to sort this out:

  • Pause ingestion for the table
  • Fix the transformation function
  • Resume ingestion

Pause consumption by running the following command:

curl -X POST \
  "http://localhost:9000/tables/events/pauseConsumption" \
  -H "accept: application/json"

Then we need to fix the config, with the following one:

{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "ingestionConfig":{
      "transformConfigs": [
        {
            "columnName": "ts",
            "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
        }
      ]
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-pauseresume:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
      }
    },
    "tenants": {},
    "metadata": {}
  }

config/table.json

Let's apply that:

docker run \
   --network pauseresume \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table-fixed.json \
     -controllerHost "pinot-controller-pauseresume" \
    -exec -update

And resume consumption:

curl -X POST \
  "http://localhost:9000/tables/events/resumeConsumption?consumeFrom=smallest" \
  -H "accept: application/json"

If we navigate back to the Pinot UI, it will now be ingesting data. And we have no more errors in our server logs either!