vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r453086734
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -359,11 +359,8 @@ public static StreamThread create(final
InternalTopologyBuilder builder,
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE,
assignmentErrorCode);
final AtomicLong nextScheduledRebalanceMs = new
AtomicLong(Long.MAX_VALUE);
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS,
nextScheduledRebalanceMs);
- String originalReset = null;
- if (!builder.latestResetTopicsPattern().pattern().equals("") ||
!builder.earliestResetTopicsPattern().pattern().equals("")) {
- originalReset = (String)
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
- consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"none");
- }
Review comment:
Aha! Thanks, @mjsax. This makes sense. I hadn't considered that the
consumer never throws `InvalidOffsetException` if there's a valid policy
configured, so the prior logic only needs the fallback when there's an override.
Now that we also use the resetting logic for our own "manual" reset when
handling TaskCorruptedException, we do need to capture the Consumer config
value, but we can still enforce that if there are any overrides in the
topology, we set the consumer to "none". Just updated the code to do this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]