This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 6c89a3f3655 KAFKA-14894: MetadataLoader must call finishSnapshot after 
loading a snapshot (#13541)
6c89a3f3655 is described below

commit 6c89a3f3655cbf310cefc7c5aff5f330f9cde69d
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Tue Apr 11 15:02:33 2023 -0700

    KAFKA-14894: MetadataLoader must call finishSnapshot after loading a 
snapshot (#13541)
    
    The MetadataLoader must call finishSnapshot after loading a snapshot. This 
function removes
    whatever was in the old snapshot that is not in the new snapshot that was 
just loaded. While this
    is not significant when the old snapshot was the empty snapshot, it is 
important to do when we are
    loading a snapshot on top of an existing non-empty image.
    
    In initializeNewPublishers, the newly installed publishers should be given 
a MetadataDelta based on
    MetadataImage.EMPTY, reflecting the fact that they are seeing everything 
for the first time.
    
    Reviewers: David Arthur <mum...@gmail.com>
---
 .../apache/kafka/image/loader/MetadataLoader.java  |  6 ++-
 .../apache/kafka/image/writer/ImageReWriter.java   |  1 +
 .../kafka/image/loader/MetadataLoaderTest.java     | 48 ++++++++++++++++++++++
 .../kafka/image/writer/ImageReWriterTest.java      | 30 ++++++++++++++
 4 files changed, 84 insertions(+), 1 deletion(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 4b820fbcdfb..4b8d564971a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -274,13 +274,16 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         log.debug("InitializeNewPublishers: setting up snapshot image for new 
publisher(s): {}",
                 uninitializedPublisherNames());
         long startNs = time.nanoseconds();
+        // We base this delta off of the empty image, reflecting the fact that 
these publishers
+        // haven't seen anything previously.
         MetadataDelta delta = new MetadataDelta.Builder().
-                setImage(image).
+                setImage(MetadataImage.EMPTY).
                 build();
         ImageReWriter writer = new ImageReWriter(delta);
         image.write(writer, new ImageWriterOptions.Builder().
                 setMetadataVersion(image.features().metadataVersion()).
                 build());
+        // ImageReWriter#close invokes finishSnapshot, so we don't need to 
invoke it here.
         SnapshotManifest manifest = new SnapshotManifest(
                 image.provenance(),
                 time.nanoseconds() - startNs);
@@ -477,6 +480,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                 snapshotIndex++;
             }
         }
+        delta.finishSnapshot();
         MetadataProvenance provenance = new 
MetadataProvenance(reader.lastContainedLogOffset(),
                 reader.lastContainedLogEpoch(), 
reader.lastContainedLogTimestamp());
         return new SnapshotManifest(provenance,
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java 
b/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
index 42a0aaa93a1..3a1245000e8 100644
--- a/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
@@ -49,6 +49,7 @@ public class ImageReWriter implements ImageWriter {
         if (closed) return;
         closed = true;
         if (complete) {
+            delta.finishSnapshot();
             image = delta.apply(delta.image().provenance());
         }
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 489a9bca80e..8e9d2ed12db 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -523,4 +523,52 @@ public class MetadataLoaderTest {
                 )));
         loader.waitForAllEventsToBeHandled();
     }
+
+    private void loadTestSnapshot2(
+        MetadataLoader loader,
+        long offset
+    ) throws Exception {
+        loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+                new MetadataProvenance(offset, 100, 4000), asList(
+                        asList(new ApiMessageAndVersion(new 
FeatureLevelRecord().
+                                setName(MetadataVersion.FEATURE_NAME).
+                                setFeatureLevel(IBP_3_3_IV2.featureLevel()), 
(short) 0)),
+                        asList(new ApiMessageAndVersion(new TopicRecord().
+                                setName("bar").
+                                
setTopicId(Uuid.fromString("VcL2Mw-cT4aL6XV9VujzoQ")), (short) 0))
+                )));
+        loader.waitForAllEventsToBeHandled();
+    }
+
+    /**
+     * Test that loading a snapshot clears the previous state.
+     */
+    @Test
+    public void testReloadSnapshot() throws Exception {
+        MockFaultHandler faultHandler = new 
MockFaultHandler("testLastAppliedOffset");
+        List<MockPublisher> publishers = asList(new MockPublisher("a"));
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
+                build()) {
+            loadTestSnapshot(loader, 100);
+            loader.installPublishers(publishers).get();
+            loader.waitForAllEventsToBeHandled();
+            assertTrue(publishers.get(0).firstPublish.isDone());
+            assertTrue(publishers.get(0).latestDelta.image().isEmpty());
+            assertEquals(100L, 
publishers.get(0).latestImage.provenance().lastContainedOffset());
+
+            loadTestSnapshot(loader, 200);
+            assertEquals(200L, loader.lastAppliedOffset());
+            assertFalse(publishers.get(0).latestDelta.image().isEmpty());
+
+            loadTestSnapshot2(loader, 400);
+            assertEquals(400L, loader.lastAppliedOffset());
+
+            // Make sure the topic in the initial snapshot was overwritten by 
loading the new snapshot.
+            
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
+            
assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar"));
+        }
+        faultHandler.maybeRethrowFirstException();
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java 
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
index 640924fe076..fd81cc98dcc 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
@@ -18,11 +18,17 @@
 package org.apache.kafka.image.writer;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.util.Collections;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static org.apache.kafka.metadata.RecordTestUtils.testRecord;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -60,4 +66,28 @@ public class ImageReWriterTest {
                         setName("foo").
                         
setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg"))));
     }
+
+    @Test
+    public void testCloseInvokesFinishSnapshot() {
+        MetadataDelta delta = new MetadataDelta.Builder().build();
+        ImageReWriter writer = new ImageReWriter(delta);
+        writer.write(0, new TopicRecord().
+                setName("foo").
+                setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg")));
+        writer.close(true);
+
+        MetadataDelta delta2 = new 
MetadataDelta.Builder().setImage(writer.image()).build();
+        ImageReWriter writer2 = new ImageReWriter(delta2);
+        writer2.write(0, new ConfigRecord().
+                setResourceName("").
+                setResourceType(BROKER.id()).
+                setName("num.io.threads").
+                setValue("12"));
+        writer2.close(true);
+        MetadataImage newImage = writer2.image();
+
+        assertEquals(Collections.emptyMap(), newImage.topics().topicsById());
+        assertEquals(Collections.singletonMap("num.io.threads", "12"),
+            newImage.configs().configMapForResource(new ConfigResource(BROKER, 
"")));
+    }
 }

Reply via email to