vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452951286
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -359,11 +359,8 @@ public static StreamThread create(final
InternalTopologyBuilder builder,
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE,
assignmentErrorCode);
final AtomicLong nextScheduledRebalanceMs = new
AtomicLong(Long.MAX_VALUE);
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS,
nextScheduledRebalanceMs);
- String originalReset = null;
- if (!builder.latestResetTopicsPattern().pattern().equals("") ||
!builder.earliestResetTopicsPattern().pattern().equals("")) {
- originalReset = (String)
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
- consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"none");
- }
Review comment:
I had to change this to get the StreamThread test to actually use the
configured `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG := earliest`
Reading the conditional, it doesn't make any sense to me, but it's been in
the codebase for a long time, so I'm doubting myself. It seems to say that we
will only use the provided client configuration if there **is** an override,
but it seems like it should have been "if there is **not** an override".
Regardless, the "originalReset" is only used as a fallback _after_ we apply
the builder reset patterns, so I don't see why we should leave it null in any
case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]