This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new 6c89a3f3655 KAFKA-14894: MetadataLoader must call finishSnapshot after loading a snapshot (#13541) 6c89a3f3655 is described below commit 6c89a3f3655cbf310cefc7c5aff5f330f9cde69d Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Tue Apr 11 15:02:33 2023 -0700 KAFKA-14894: MetadataLoader must call finishSnapshot after loading a snapshot (#13541) The MetadataLoader must call finishSnapshot after loading a snapshot. This function removes whatever was in the old snapshot that is not in the new snapshot that was just loaded. While this is not significant when the old snapshot was the empty snapshot, it is important to do when we are loading a snapshot on top of an existing non-empty image. In initializeNewPublishers, the newly installed publishers should be given a MetadataDelta based on MetadataImage.EMPTY, reflecting the fact that they are seeing everything for the first time. Reviewers: David Arthur <mum...@gmail.com> --- .../apache/kafka/image/loader/MetadataLoader.java | 6 ++- .../apache/kafka/image/writer/ImageReWriter.java | 1 + .../kafka/image/loader/MetadataLoaderTest.java | 48 ++++++++++++++++++++++ .../kafka/image/writer/ImageReWriterTest.java | 30 ++++++++++++++ 4 files changed, 84 insertions(+), 1 deletion(-) diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index 4b820fbcdfb..4b8d564971a 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -274,13 +274,16 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> log.debug("InitializeNewPublishers: setting up snapshot image for new publisher(s): {}", uninitializedPublisherNames()); long startNs = time.nanoseconds(); + // We base this delta off of the empty image, reflecting the fact that these publishers + // haven't seen anything previously. MetadataDelta delta = new MetadataDelta.Builder(). - setImage(image). + setImage(MetadataImage.EMPTY). build(); ImageReWriter writer = new ImageReWriter(delta); image.write(writer, new ImageWriterOptions.Builder(). setMetadataVersion(image.features().metadataVersion()). build()); + // ImageReWriter#close invokes finishSnapshot, so we don't need to invoke it here. SnapshotManifest manifest = new SnapshotManifest( image.provenance(), time.nanoseconds() - startNs); @@ -477,6 +480,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> snapshotIndex++; } } + delta.finishSnapshot(); MetadataProvenance provenance = new MetadataProvenance(reader.lastContainedLogOffset(), reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp()); return new SnapshotManifest(provenance, diff --git a/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java b/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java index 42a0aaa93a1..3a1245000e8 100644 --- a/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java +++ b/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java @@ -49,6 +49,7 @@ public class ImageReWriter implements ImageWriter { if (closed) return; closed = true; if (complete) { + delta.finishSnapshot(); image = delta.apply(delta.image().provenance()); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java index 489a9bca80e..8e9d2ed12db 100644 --- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java @@ -523,4 +523,52 @@ public class MetadataLoaderTest { ))); loader.waitForAllEventsToBeHandled(); } + + private void loadTestSnapshot2( + MetadataLoader loader, + long offset + ) throws Exception { + loader.handleSnapshot(MockSnapshotReader.fromRecordLists( + new MetadataProvenance(offset, 100, 4000), asList( + asList(new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(IBP_3_3_IV2.featureLevel()), (short) 0)), + asList(new ApiMessageAndVersion(new TopicRecord(). + setName("bar"). + setTopicId(Uuid.fromString("VcL2Mw-cT4aL6XV9VujzoQ")), (short) 0)) + ))); + loader.waitForAllEventsToBeHandled(); + } + + /** + * Test that loading a snapshot clears the previous state. + */ + @Test + public void testReloadSnapshot() throws Exception { + MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset"); + List<MockPublisher> publishers = asList(new MockPublisher("a")); + try (MetadataLoader loader = new MetadataLoader.Builder(). + setFaultHandler(faultHandler). + setHighWaterMarkAccessor(() -> OptionalLong.of(0)). + build()) { + loadTestSnapshot(loader, 100); + loader.installPublishers(publishers).get(); + loader.waitForAllEventsToBeHandled(); + assertTrue(publishers.get(0).firstPublish.isDone()); + assertTrue(publishers.get(0).latestDelta.image().isEmpty()); + assertEquals(100L, publishers.get(0).latestImage.provenance().lastContainedOffset()); + + loadTestSnapshot(loader, 200); + assertEquals(200L, loader.lastAppliedOffset()); + assertFalse(publishers.get(0).latestDelta.image().isEmpty()); + + loadTestSnapshot2(loader, 400); + assertEquals(400L, loader.lastAppliedOffset()); + + // Make sure the topic in the initial snapshot was overwritten by loading the new snapshot. + assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo")); + assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar")); + } + faultHandler.maybeRethrowFirstException(); + } } diff --git a/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java b/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java index 640924fe076..fd81cc98dcc 100644 --- a/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java @@ -18,11 +18,17 @@ package org.apache.kafka.image.writer; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.Collections; + +import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.metadata.RecordTestUtils.testRecord; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -60,4 +66,28 @@ public class ImageReWriterTest { setName("foo"). setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg")))); } + + @Test + public void testCloseInvokesFinishSnapshot() { + MetadataDelta delta = new MetadataDelta.Builder().build(); + ImageReWriter writer = new ImageReWriter(delta); + writer.write(0, new TopicRecord(). + setName("foo"). + setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg"))); + writer.close(true); + + MetadataDelta delta2 = new MetadataDelta.Builder().setImage(writer.image()).build(); + ImageReWriter writer2 = new ImageReWriter(delta2); + writer2.write(0, new ConfigRecord(). + setResourceName(""). + setResourceType(BROKER.id()). + setName("num.io.threads"). + setValue("12")); + writer2.close(true); + MetadataImage newImage = writer2.image(); + + assertEquals(Collections.emptyMap(), newImage.topics().topicsById()); + assertEquals(Collections.singletonMap("num.io.threads", "12"), + newImage.configs().configMapForResource(new ConfigResource(BROKER, ""))); + } }