[jira] [Commented] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer
[ https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735625#comment-17735625 ] Tomonari Yamashita commented on KAFKA-15108: Hi [~mjsax], Thank you. I wasn't sure if this issume was unexpected behavior or not, so your advice was very helpful. > task.timeout.ms does not work when TimeoutException is thrown by streams > producer > - > > Key: KAFKA-15108 > URL: https://issues.apache.org/jira/browse/KAFKA-15108 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0 >Reporter: Tomonari Yamashita >Priority: Major > > [Problem] > - task.timeout.ms does not work when TimeoutException is thrown by streams > producer > -- Kafka Streams upgrade guide says, "Kafka Streams is now handling > TimeoutException thrown by the consumer, producer, and admin client."(1) and > "To bound how long Kafka Streams retries a task, you can set task.timeout.ms > (default is 5 minutes)."(1). > -- However, it doesn't look like task.timeout.ms is working for the streams > producer, then it seems to keep retrying forever. > [Environment] > - Kafka Streams 3.5.0 > [Reproduction procedure] > # Create "input-topic" topic > # Put several messages on "input-topic" > # DONT create "output-topic" topic, to fire TimeoutException > # Create the following simple Kafka streams program; this program just > transfers messages from "input-topic" to "output-topic". > -- > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); > props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler"); > // not needed > StreamsBuilder builder = new StreamsBuilder(); > builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) > .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); > KafkaStreams streams = new KafkaStreams(builder.build(), props); > {code} > # Wait for task.timeout.ms (default is 5 minutes). > ## If the debug log is enabled, a large number of > UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not > exist. > ## And every one minute, TimeoutException will be generated (2) > # ==> However, it doesn't look like task.timeout.ms is working for the > streams producer, then it seems to keep retrying forever. > ## My excepted behavior is that task.timeout.ms is working, and the client > will be shutdown because the default behavior is > StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown. > [As far as my investigation] > - TimeoutException thrown by the streams producer is replaced with > TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3) > - And after that it does not appear to be executing code that contains logic > related to task.timeout.ms. > (1) Kafka Streams upgrade guide > - [https://kafka.apache.org/35/documentation/streams/upgrade-guide] > - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams] > {code:java} > Kafka Streams is now handling TimeoutException thrown by the consumer, > producer, and admin client. If a timeout occurs on a task, Kafka Streams > moves to the next task and retries to make progress on the failed task in the > next iteration. To bound how long Kafka Streams retries a task, you can set > task.timeout.ms (default is 5 minutes). If a task does not make progress > within the specified task timeout, which is tracked on a per-task basis, > Kafka Streams throws a TimeoutException (cf. KIP-572). > {code} > (2) TimeoutException occurs > {code:java} > 2023-06-19 19:51:26 WARN NetworkClient:1145 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Error while fetching metadata with correlation id 1065 : > {output-topic=UNKNOWN_TOPIC_OR_PARTITION} > 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Requesting metadata update for topic output-topic due to error > UNKNOWN_TOPIC_OR_PARTITION > 2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Updated cluster metadata updateVersion 1064 to > MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', >
[jira] [Commented] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer
[ https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735476#comment-17735476 ] Matthias J. Sax commented on KAFKA-15108: - There are a few cases for which we cannot handle a `TimeoutException` more gracefully, and the docs gloss over this fact. – The scenario you describe is one of these cases. I agree that we should maybe try to include it – the challenge (and why it was not included in the original work) is, that it will need different handling compared how we handle `TimeoutException` for the regular case... > task.timeout.ms does not work when TimeoutException is thrown by streams > producer > - > > Key: KAFKA-15108 > URL: https://issues.apache.org/jira/browse/KAFKA-15108 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0 >Reporter: Tomonari Yamashita >Priority: Major > > [Problem] > - task.timeout.ms does not work when TimeoutException is thrown by streams > producer > -- Kafka Streams upgrade guide says, "Kafka Streams is now handling > TimeoutException thrown by the consumer, producer, and admin client."(1) and > "To bound how long Kafka Streams retries a task, you can set task.timeout.ms > (default is 5 minutes)."(1). > -- However, it doesn't look like task.timeout.ms is working for the streams > producer, then it seems to keep retrying forever. > [Environment] > - Kafka Streams 3.5.0 > [Reproduce procedure] > # Create "input-topic" topic > # Put several messages on "input-topic" > # DONT create "output-topic" topic, to fire TimeoutException > # Create the following simple Kafka streams program; this program just > transfers messages from "input-topic" to "output-topic". > -- > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); > props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler"); > // not needed > StreamsBuilder builder = new StreamsBuilder(); > builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) > .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); > KafkaStreams streams = new KafkaStreams(builder.build(), props); > {code} > # Wait for task.timeout.ms (default is 5 minutes). > ## If the debug log is enabled, a large number of > UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not > exist. > ## And every one minute, TimeoutException will be generated (2) > # ==> However, it doesn't look like task.timeout.ms is working for the > streams producer, then it seems to keep retrying forever. > ## My excepted behavior is that task.timeout.ms is working, and the client > will be shutdown because the default behavior is > StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown. > [As far as my investigation] > - TimeoutException thrown by the streams producer is replaced with > TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3) > - And after that it does not appear to be executing code that contains logic > related to task.timeout.ms. > (1) Kafka Streams upgrade guide > - [https://kafka.apache.org/35/documentation/streams/upgrade-guide] > - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams] > {code:java} > Kafka Streams is now handling TimeoutException thrown by the consumer, > producer, and admin client. If a timeout occurs on a task, Kafka Streams > moves to the next task and retries to make progress on the failed task in the > next iteration. To bound how long Kafka Streams retries a task, you can set > task.timeout.ms (default is 5 minutes). If a task does not make progress > within the specified task timeout, which is tracked on a per-task basis, > Kafka Streams throws a TimeoutException (cf. KIP-572). > {code} > (2) TimeoutException occurs > {code:java} > 2023-06-19 19:51:26 WARN NetworkClient:1145 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Error while fetching metadata with correlation id 1065 : > {output-topic=UNKNOWN_TOPIC_OR_PARTITION} > 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Requesting metadata update for topic output-topic due to error >