vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r453107700
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -760,44 +763,62 @@ void runOnce() {
try {
records = mainConsumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
- resetInvalidOffsets(e);
+ resetOffsets(e.partitions(), e);
}
return records;
}
- private void resetInvalidOffsets(final InvalidOffsetException e) {
- final Set<TopicPartition> partitions = e.partitions();
+ private void resetOffsets(final Set<TopicPartition> partitions, final
Exception cause) {
final Set<String> loggedTopics = new HashSet<>();
final Set<TopicPartition> seekToBeginning = new HashSet<>();
final Set<TopicPartition> seekToEnd = new HashSet<>();
+ final Set<TopicPartition> notReset = new HashSet<>();
for (final TopicPartition partition : partitions) {
if
(builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
addToResetList(partition, seekToBeginning, "Setting topic '{}'
to consume from {} offset", "earliest", loggedTopics);
} else if
(builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
addToResetList(partition, seekToEnd, "Setting topic '{}' to
consume from {} offset", "latest", loggedTopics);
} else {
- if (originalReset == null ||
(!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
- final String errorMessage = "No valid committed offset
found for input topic %s (partition %s) and no valid reset policy configured." +
- " You need to set configuration parameter
\"auto.offset.reset\" or specify a topic specific reset " +
- "policy via StreamsBuilder#stream(...,
Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(...,
Consumed.with(Topology.AutoOffsetReset))";
- throw new StreamsException(String.format(errorMessage,
partition.topic(), partition.partition()), e);
- }
-
- if (originalReset.equals("earliest")) {
+ if ("earliest".equals(originalReset)) {
addToResetList(partition, seekToBeginning, "No custom
setting defined for topic '{}' using original config '{}' for offset reset",
"earliest", loggedTopics);
- } else { // can only be "latest"
+ } else if ("latest".equals(originalReset)) {
addToResetList(partition, seekToEnd, "No custom setting
defined for topic '{}' using original config '{}' for offset reset", "latest",
loggedTopics);
+ } else {
+ notReset.add(partition);
}
}
}
- if (!seekToBeginning.isEmpty()) {
- mainConsumer.seekToBeginning(seekToBeginning);
- }
- if (!seekToEnd.isEmpty()) {
- mainConsumer.seekToEnd(seekToEnd);
+ if (notReset.isEmpty()) {
+ if (!seekToBeginning.isEmpty()) {
Review comment:
Huh, I didn't wonder that before, but ... dear god. From the javadoc on
KafkaConsumer:
> If no partitions are provided, seek to the first offset for all of the
currently assigned partitions.
What a dangerous API!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]