vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r453086734



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -359,11 +359,8 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
         
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, 
assignmentErrorCode);
         final AtomicLong nextScheduledRebalanceMs = new 
AtomicLong(Long.MAX_VALUE);
         
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, 
nextScheduledRebalanceMs);
-        String originalReset = null;
-        if (!builder.latestResetTopicsPattern().pattern().equals("") || 
!builder.earliestResetTopicsPattern().pattern().equals("")) {
-            originalReset = (String) 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-            consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none");
-        }

Review comment:
       Aha! Thanks, @mjsax. This makes sense. I hadn't considered that the 
consumer never throws `InvalidOffsetException` if there's a valid policy 
configured, so the prior logic only needs the fallback when there's an override.
   
   Now that we also use the resetting logic for our own "manual" reset when 
handling TaskCorruptedException, we do need to capture the Consumer config 
value, but we can still enforce that if there are any overrides in the 
topology, we set the consumer to "none". Just updated the code to do this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to