How to pause and resume ingestion of a stream
In this recipe we'll learn how to use the pauseConsumption
and resumeConsumption
APIs, which are used to pause and resume ingestion of a stream.
We might want to do this for the following reasons:
- There’s a problem with the underlying stream, and we need to restart the server, reset offsets, or recreate a topic
- We want to ingest data from different streams into the same table.
- We made a mistake in our ingestion config in Pinot, and it’s now throwing exceptions and isn’t able to ingest any more data.
If we've made an update to our table config and want that to be picked up, we don't need to pause and resume, we can instead use the forceCommit
API.
Pinot Version | 0.12.0 |
Code | startreedata/pinot-recipes/pause-resume |
Prerequisites
You will need to install Docker to follow the code examples in this guide.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/pause-resume
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.
Data generator
This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:
python datagen.py 2>/dev/null | head -n1 | jq
Output is shown below:
{
"tsString": "2022-11-23T12:08:44.127481Z",
"uuid": "e1c58795-a009-4e21-ae76-cdd66e090797",
"count": 203
}
Kafka ingestion
We're going to ingest this data into an Apache Kafka topic using the kcat command line tool.
We'll also use jq
to structure the data in the key:payload
structure that Kafka expects:
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
We can check that Kafka has some data by running the following command:
docker exec -it kafka-querysegment kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events
We'll see something like the following:
events:0:19960902
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "events",
"dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
"metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Now for the table config:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig":{
"transformConfigs": [
{
"columnName": "ts",
"transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SS''Z''')"
}
]
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-pauseresume:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"tenants": {},
"metadata": {}
}
This highlighted section highlights a transformation function that has a subtle error. The second parameter passed to the FromDateTime function describes the format of the DateTime string, which we defined as:
YYYY-MM-dd''T''HH:mm:ss.SS''Z''
But tsString has values in the following format:
2022-11-23T12:08:44.127550Z
i.e., we don’t have enough S values - there should be 5 rather than 2.
We'll create the table by running the following:
docker run \
--network forcecommit \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-forcecommit" \
-exec
If we navigate to the Pinot UI, we'll notice that no records have been ingested. Instead our server logs are full of the following error:
2023/06/02 15:00:31.705 ERROR [LLRealtimeSegmentDataManager_events__0__0__20230602T1459Z] [events__0__0__20230602T1459Z] Caught exception while transforming the record: org.apache.pinot.spi.stream.StreamDataDecoderResult@19717934
java.lang.RuntimeException: Caught exception while evaluation transform function for column: ts
at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:126) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:90) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.segment.creator.TransformPipeline.processPlainRow(TransformPipeline.java:97) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.segment.creator.TransformPipeline.processRow(TransformPipeline.java:92) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.processStreamEvents(LLRealtimeSegmentDataManager.java:559) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.consumeLoop(LLRealtimeSegmentDataManager.java:434) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:629) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: Caught exception while executing function: fromDateTime(tsString,'YYYY-MM-dd'T'HH:mm:ss.SS'Z'')
at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:231) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
... 7 more
Caused by: java.lang.IllegalStateException: Caught exception while invoking method: public static long org.apache.pinot.common.function.scalar.DateTimeFunctions.fromDateTime(java.lang.String,java.lang.String) with arguments: [2023-06-02T15:59:58.186685Z, YYYY-MM-dd'T'HH:mm:ss.SS'Z']
at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:130) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
... 7 more
Caused by: java.lang.reflect.InvocationTargetException
at jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:127) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
... 7 more
Caused by: java.lang.IllegalArgumentException: Invalid format: "2023-06-02T15:59:58.186685Z" is malformed at "6685Z"
at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.common.function.DateTimePatternHandler.parseDateTimeStringToEpochMillis(DateTimePatternHandler.java:38) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.common.function.scalar.DateTimeFunctions.fromDateTime(DateTimeFunctions.java:271) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:127) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
... 7 more
The Pause/Resume workflow
We're going to run the following workflow to sort this out:
- Pause ingestion for the table
- Fix the transformation function
- Resume ingestion
Pause consumption by running the following command:
curl -X POST \
"http://localhost:9000/tables/events/pauseConsumption" \
-H "accept: application/json"
Then we need to fix the config, with the following one:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"ingestionConfig":{
"transformConfigs": [
{
"columnName": "ts",
"transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
}
]
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-pauseresume:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"tenants": {},
"metadata": {}
}
Let's apply that:
docker run \
--network pauseresume \
-v $PWD/config:/config \
apachepinot/pinot:0.12.0-arm64 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table-fixed.json \
-controllerHost "pinot-controller-pauseresume" \
-exec -update
And resume consumption:
curl -X POST \
"http://localhost:9000/tables/events/resumeConsumption?consumeFrom=smallest" \
-H "accept: application/json"
If we navigate back to the Pinot UI, it will now be ingesting data. And we have no more errors in our server logs either!