[ https://issues.apache.org/jira/browse/KAFKA-9638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17051072#comment-17051072 ]
Levani Kokhreidze edited comment on KAFKA-9638 at 3/4/20, 10:10 AM: -------------------------------------------------------------------- Hi [~bchen225242] Not sure I understand. Maybe I am missing something but whenever stream thread encounters unrecoverable exception, all its assigned partitions with corresponding state will be migrated to other thread (not necessarily on the same node). Lets have a concrete example, if we have topology similar to this with 6 threads span across 2 physical machines (3 threads on each machine). And for simplicity, lets say that `input-topic` has also 6 partitions. {code:java} streamBuilder.stream("input-topic") .selectKey((key, value) -> value.newKey()) .groupByKey() .aggregate(0d, (key, value, aggr) -> aggr + value.invoiceValue(), Materialized.as("my-store")); {code} In the aggregator function I have a bug, `value.inoiceValue()` may sometime return null, and code above will throw NPE. As a result, stream thread will die, and it's partition will be re-assigned (with corresponding state) to another thread, maybe to another node altogether. Since there was an error, offset for problematic event won't be committed and other threads are also doomed to die with the same exception. So all the rebalancing, state migration, etc is pretty much useless in this case. Instead, would be great if Kafka streams would kill the thread for the partition where problematic event is, instead of rebalancing it across different threads. In this case, we would accumulate lag on a single partition but all other threads would still be processing data. So instead of global downtime on 6 partitions, we would have downtime only on a single one. Does this make sense? Regards, Levani was (Author: lkokhreidze): Hi [~bchen225242] Not sure I understand. Maybe I am missing something but whenever stream thread encounters unrecoverable exception, all its assigned partitions with corresponding state will be migrated to other thread (not necessarily on the same node). Lets have a concrete example, if we have topology similar to this with 6 threads span across 2 physical machines (3 threads on each machine). And for simplicity, lets say that `input-topic` has also 6 partitions. {code:java} streamBuilder.stream("input-topic") .selectKey((key, value) -> value.newKey()) .groupByKey() .aggregate(0d, (key, value, aggr) -> aggr + value.invoiceValue(), Materialized.as("my-store")); {code} In the aggregator function I have a bug, `value.inoiceValue()` may sometime return null, and code above will throw NPE. As a result, stream thread will die, and it's partition will be re-assigned (with corresponding state) to another thread, maybe to another node altogether. Since there was an error, offset for problematic event won't be committed and other threads are also doomed to die with the same exception. So all the rebalancing, state migration, etc is pretty much useless in this case. Instead, would be great if Kafka streams would kill the thread for the partition where problematic event is, instead of rebalancing it across different threads. In this case, we would accumulate lag on a single partition but all other threads would still be processing data. So instead of global downtime on 6 partitions, we would have downtime only on a single one. Does this make sense? Regards, - Levani > Do not trigger REBALANCING when specific exceptions occur in Kafka Streams > --------------------------------------------------------------------------- > > Key: KAFKA-9638 > URL: https://issues.apache.org/jira/browse/KAFKA-9638 > Project: Kafka > Issue Type: New Feature > Components: streams > Reporter: Levani Kokhreidze > Priority: Major > > As of now, when StreamThread encounters exception in Kafka Streams > application, it will result in REBALANCING of all the tasks that were > responsibility of the given thread. Problem with that is, if the exception > was, lets say some logical exception, like NPE, REBALANCING is pretty much > useless, cause all other threads will also die with the same NPE. This kind > of mute rebalancing gives extra costs in terms of network traffic, IOPS, etc > in case of large stateful applications. > In addition, this behaviour causes global outage of the Kafka Streams > application, instead of localized outage of the certain tasks. Would be great > if Kafka Streams users could specify via some interface, exceptions that must > not trigger rebalancing of the tasks. StreamThread may still die, but in this > case, we would have isolated incident. -- This message was sent by Atlassian Jira (v8.3.4#803005)