This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0927049a617 KAFKA-14371: Remove unused clusterId field from quorum-state file (#13102) 0927049a617 is described below commit 0927049a617fa2937a211aab895f6590403130fb Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com> AuthorDate: Wed Mar 1 02:13:38 2023 +0000 KAFKA-14371: Remove unused clusterId field from quorum-state file (#13102) Remove clusterId field from the KRaft controller's quorum-state file $LOG_DIR/__cluster_metadata-0/quorum-state Reviewers: Luke Chen <show...@gmail.com>, dengziming <dengziming1...@gmail.com>, Christo Lolov <christolo...@gmail.com> --- .../resources/common/message/QuorumStateData.json | 4 +- .../apache/kafka/raft/FileBasedStateStoreTest.java | 46 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/raft/src/main/resources/common/message/QuorumStateData.json b/raft/src/main/resources/common/message/QuorumStateData.json index d71a32c75de..d0320d16b1b 100644 --- a/raft/src/main/resources/common/message/QuorumStateData.json +++ b/raft/src/main/resources/common/message/QuorumStateData.json @@ -16,10 +16,10 @@ { "type": "data", "name": "QuorumStateData", - "validVersions": "0", + // Version 1 removes clusterId field. + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ - {"name": "ClusterId", "type": "string", "versions": "0+"}, {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"}, {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"}, {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"}, diff --git a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java index 5fa4f5c6880..97002897265 100644 --- a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java @@ -16,6 +16,13 @@ */ package org.apache.kafka.raft; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -90,6 +97,45 @@ public class FileBasedStateStoreTest { assertFalse(stateFile.exists()); } + @Test + public void testCompatibilityWithClusterId() throws IOException { + final File stateFile = TestUtils.tempFile(); + stateStore = new FileBasedStateStore(stateFile); + + // We initialized a state from the metadata log + assertTrue(stateFile.exists()); + + final int epoch = 2; + final int leaderId = 1; + Set<Integer> voters = Utils.mkSet(leaderId); + String jsonString = "{\"clusterId\":\"abc\",\"leaderId\":" + leaderId + ",\"leaderEpoch\":" + epoch + ",\"votedId\":-1,\"appliedOffset\":0,\"currentVoters\":[],\"data_version\":0}"; + writeToStateFile(stateFile, jsonString); + + // verify that we can read the state file that contains the removed "cluserId" field. + assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), stateStore.readElectionState()); + + stateStore.clear(); + assertFalse(stateFile.exists()); + } + + private void writeToStateFile(final File stateFile, String jsonString) { + try (final FileOutputStream fileOutputStream = new FileOutputStream(stateFile); + final BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(jsonString); + + writer.write(node.toString()); + writer.flush(); + fileOutputStream.getFD().sync(); + + } catch (IOException e) { + throw new UncheckedIOException( + String.format("Error while writing to Quorum state file %s", + stateFile.getAbsolutePath()), e); + } + } + @AfterEach public void cleanup() throws IOException { if (stateStore != null) {