This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push: new b3cfc6f KAFKA-6367: StateRestoreListener use actual last restored offset for restored batch (#4507) b3cfc6f is described below commit b3cfc6fb3dfd029c5d9914419408492edd6919a7 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Wed Feb 7 14:07:32 2018 -0500 KAFKA-6367: StateRestoreListener use actual last restored offset for restored batch (#4507) Author: Bill Bejeck <b...@confluent.io> Reviewers: Guozhang Wang <guozh...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/processor/StateRestoreListener.java | 4 ++-- .../processor/internals/StoreChangelogReader.java | 5 +++-- .../internals/StoreChangelogReaderTest.java | 24 +++++++++++++++++----- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java index c80a736..ea1c288 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java @@ -43,7 +43,7 @@ public interface StateRestoreListener { * @param topicPartition the TopicPartition containing the values to restore * @param storeName the name of the store undergoing restoration * @param startingOffset the starting offset of the entire restoration process for this TopicPartition - * @param endingOffset the ending offset of the entire restoration process for this TopicPartition + * @param endingOffset the exclusive ending offset of the entire restoration process for this TopicPartition */ void onRestoreStart(final TopicPartition topicPartition, final String storeName, @@ -62,7 +62,7 @@ public interface StateRestoreListener { * * @param topicPartition the TopicPartition containing the values to restore * @param storeName the name of the store undergoing restoration - * @param batchEndOffset the ending offset for the current restored batch for this TopicPartition + * @param batchEndOffset the inclusive ending offset for the current restored batch for this TopicPartition * @param numRestored the total number of records restored in this batch for this TopicPartition */ void onBatchRestored(final TopicPartition topicPartition, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 950a2c6..8d85b1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -260,12 +260,14 @@ public class StoreChangelogReader implements ChangelogReader { long nextPosition = -1; int numberRecords = records.size(); int numberRestored = 0; + long lastRestoredOffset = -1; for (final ConsumerRecord<byte[], byte[]> record : records) { final long offset = record.offset(); if (restorer.hasCompleted(offset, endOffset)) { nextPosition = record.offset(); break; } + lastRestoredOffset = offset; numberRestored++; if (record.key() != null) { restoreRecords.add(KeyValue.pair(record.key(), record.value())); @@ -281,8 +283,7 @@ public class StoreChangelogReader implements ChangelogReader { if (!restoreRecords.isEmpty()) { restorer.restore(restoreRecords); - restorer.restoreBatchCompleted(nextPosition, records.size()); - + restorer.restoreBatchCompleted(lastRestoredOffset, records.size()); } return nextPosition; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 9401029..bb0c51e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -198,15 +198,28 @@ public class StoreChangelogReaderTest { assertThat(callbackTwo.restored.size(), equalTo(3)); assertAllCallbackStatesExecuted(callback, "storeName1"); - assertCorrectOffsetsReportedByListener(callback, 0L, 10L, 10L); + assertCorrectOffsetsReportedByListener(callback, 0L, 9L, 10L); assertAllCallbackStatesExecuted(callbackOne, "storeName2"); - assertCorrectOffsetsReportedByListener(callbackOne, 0L, 5L, 5L); + assertCorrectOffsetsReportedByListener(callbackOne, 0L, 4L, 5L); assertAllCallbackStatesExecuted(callbackTwo, "storeName3"); - assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 3L, 3L); + assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 2L, 3L); } + @Test + public void shouldOnlyReportTheLastRestoredOffset() { + setupConsumer(10, topicPartition); + changelogReader + .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1")); + changelogReader.restore(active); + + assertThat(callback.restored.size(), equalTo(5)); + assertAllCallbackStatesExecuted(callback, "storeName1"); + assertCorrectOffsetsReportedByListener(callback, 0L, 4L, 5L); + } + + private void assertAllCallbackStatesExecuted(final MockStateRestoreListener restoreListener, final String storeName) { assertThat(restoreListener.storeNameCalledStates.get(RESTORE_START), equalTo(storeName)); @@ -217,11 +230,12 @@ public class StoreChangelogReaderTest { private void assertCorrectOffsetsReportedByListener(final MockStateRestoreListener restoreListener, final long startOffset, - final long batchOffset, final long endOffset) { + final long batchOffset, + final long totalRestored) { assertThat(restoreListener.restoreStartOffset, equalTo(startOffset)); assertThat(restoreListener.restoredBatchOffset, equalTo(batchOffset)); - assertThat(restoreListener.restoreEndOffset, equalTo(endOffset)); + assertThat(restoreListener.totalNumRestored, equalTo(totalRestored)); } @Test -- To stop receiving notification emails like this one, please contact mj...@apache.org.