vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r453086734
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -359,11 +359,8 @@ public static StreamThread create(final InternalTopologyBuilder builder, consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode); final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE); consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs); - String originalReset = null; - if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { - originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); - } Review comment: Aha! Thanks, @mjsax. This makes sense. I hadn't considered that the consumer never throws `InvalidOffsetException` if there's a valid policy configured, so the prior logic only needs the fallback when there's an override. Now that we also use the resetting logic for our own "manual" reset when handling TaskCorruptedException, we do need to capture the Consumer config value, but we can still enforce that if there are any overrides in the topology, we set the consumer to "none". Just updated the code to do this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org