[
https://issues.apache.org/jira/browse/KAFKA-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364752#comment-16364752
]
ASF GitHub Bot commented on KAFKA-6364:
---
guozhangwang closed pull request #4511: KAFKA-6364: second check for ensuring
changelog topic not changed during restore
URL: https://github.com/apache/kafka/pull/4511
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c4929959211..ceb7024b97b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -51,7 +51,7 @@
private final Map> partitions;
private final SubscriptionState subscriptions;
private final Map beginningOffsets;
-private final Map endOffsets;
+private final Map> endOffsets;
private final Map committed;
private final Queue pollTasks;
private final Set paused;
@@ -290,8 +290,26 @@ public synchronized void
seekToEnd(Collection partitions) {
subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
}
-public synchronized void updateEndOffsets(Map
newOffsets) {
-endOffsets.putAll(newOffsets);
+// needed for cases where you make a second call to endOffsets
+public synchronized void addEndOffsets(final Map
newOffsets) {
+innerUpdateEndOffsets(newOffsets, false);
+}
+
+public synchronized void updateEndOffsets(final Map
newOffsets) {
+innerUpdateEndOffsets(newOffsets, true);
+}
+
+private void innerUpdateEndOffsets(final Map
newOffsets,
+ final boolean replace) {
+
+for (final Map.Entry entry :
newOffsets.entrySet()) {
+List offsets = endOffsets.get(entry.getKey());
+if (replace || offsets == null) {
+offsets = new ArrayList<>();
+}
+offsets.add(entry.getValue());
+endOffsets.put(entry.getKey(), offsets);
+}
}
@Override
@@ -354,7 +372,7 @@ public synchronized void resume(Collection
partitions) {
public synchronized Map
endOffsets(Collection partitions) {
Map result = new HashMap<>();
for (TopicPartition tp : partitions) {
-Long endOffset = endOffsets.get(tp);
+Long endOffset = getEndOffset(endOffsets.get(tp));
if (endOffset == null)
throw new IllegalStateException("The partition " + tp + " does
not have an end offset.");
result.put(tp, endOffset);
@@ -430,7 +448,7 @@ private void resetOffsetPosition(TopicPartition tp) {
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have
beginning offset specified, but tried to seek to beginning");
} else if (strategy == OffsetResetStrategy.LATEST) {
-offset = endOffsets.get(tp);
+offset = getEndOffset(endOffsets.get(tp));
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end
offset specified, but tried to seek to end");
} else {
@@ -438,4 +456,11 @@ private void resetOffsetPosition(TopicPartition tp) {
}
seek(tp, offset);
}
+
+private Long getEndOffset(List offsets) {
+if (offsets == null || offsets.isEmpty()) {
+return null;
+}
+return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
+}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index b11c45ba313..5fcba76570e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -255,6 +255,15 @@ private void restorePartition(final
ConsumerRecords allRecords,
throw new TaskMigratedException(task, topicPartition,
endOffset, pos);
}
+// need to check for changelog topic
+if (restorer.offsetLimit() == Long.MAX_VALUE) {
+final Long updatedEndOffset =
restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
+if (!restorer.hasCompleted(pos, updatedEndOffset)) {
+throw new TaskMigratedException(task, topicPartition,
updatedEndOffset, pos);
+