aloknnikhil commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r561315012



##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -941,4 +950,38 @@ class KafkaConfigTest {
     })
   }
 
+  @Test
+  def testInvalidQuorumVotersConfig(): Unit = {
+    assertInvalidQuorumVoters("1")
+    assertInvalidQuorumVoters("1@")
+    assertInvalidQuorumVoters("1:")
+    assertInvalidQuorumVoters("blah@")
+    assertInvalidQuorumVoters("1@kafka1")
+    assertInvalidQuorumVoters("1@kafka1:9092,")
+    assertInvalidQuorumVoters("1@kafka1:9092,")
+    assertInvalidQuorumVoters("1@kafka1:9092,2")
+    assertInvalidQuorumVoters("1@kafka1:9092,2@")
+    assertInvalidQuorumVoters("1@kafka1:9092,2@blah")
+    assertInvalidQuorumVoters("1@kafka1:9092,2@blah,")
+  }
+
+  private def assertInvalidQuorumVoters(value: String): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+  }
+
+  @Test
+  def testValidQuorumVotersConfig(): Unit = {
+    assertValidQuorumVoters("", 0)
+    assertValidQuorumVoters("[email protected]:9092", 1)
+    assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3)
+  }
+
+  private def assertValidQuorumVoters(value: String, expectedVoterCount: Int): 
Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+    assertDoesNotThrow(() => KafkaConfig.fromProps(props))

Review comment:
       Makes sense. Removed.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) {
         return voterMap;
     }
 
+    public static class ControllerQuorumVotersValidator implements 
ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            if (value == null) {
+                throw new ConfigException(name, null);
+            }
+
+            @SuppressWarnings("unchecked")
+            List<String> voterStrings = (List) value;
+
+            if (voterStrings.size() == 0) {
+                // TODO: Add a flag to skip validation for an empty voter 
string, conditionally.
+                //       For now, skip anyway. See 
https://github.com/apache/kafka/pull/9916#discussion_r560611932

Review comment:
       Fair enough. Removed.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1258,6 +1267,15 @@ object KafkaConfig {
       .define(PasswordEncoderCipherAlgorithmProp, STRING, 
Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc)
       .define(PasswordEncoderKeyLengthProp, INT, 
Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
       .define(PasswordEncoderIterationsProp, INT, 
Defaults.PasswordEncoderIterations, atLeast(1024), LOW, 
PasswordEncoderIterationsDoc)
+
+      /** ********* Raft Quorum Configuration *********/
+      .defineInternal(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, 
Defaults.QuorumVoters, new RaftConfig.ControllerQuorumVotersValidator(), HIGH)

Review comment:
       Ack. Fixed.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##########
@@ -35,24 +35,10 @@ public static RaftConfig buildRaftConfig(
             int appendLingerMs,
             List<Node> voterNodes
     ) {
-        Properties properties = new Properties();
-        properties.put(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeoutMs);
-        properties.put(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);
-        properties.put(RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, 
electionTimeoutMs);
-        properties.put(RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, 
electionBackoffMs);
-        properties.put(RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, 
fetchTimeoutMs);
-        properties.put(RaftConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs);
-
-        StringBuilder votersString = new StringBuilder();
-        String prefix = "";
-        for (Node voter : voterNodes) {
-            votersString.append(prefix);
-            
votersString.append(voter.id()).append('@').append(voter.host()).append(':').append(voter.port());
-            prefix = ",";
-        }
-        properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
votersString.toString());
-
-        return new RaftConfig(properties);
+        Map<Integer, InetSocketAddress> voterConnections = voterNodes.stream()
+            .collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port())));
+        return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,

Review comment:
       You're right. This is an artifact from the previous constructor usage. 
Fixed.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java
##########
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.raft;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.jupiter.api.Test;
-
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-public class RaftConfigTest {
-
-    @Test
-    public void testSingleQuorumVoterConnections() {
-        Properties properties = new Properties();
-        properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "[email protected]:9092");
-        RaftConfig config = new RaftConfig(properties);
-        assertEquals(Collections.singletonMap(1, new 
InetSocketAddress("127.0.0.1", 9092)),
-            config.quorumVoterConnections());
-    }
-
-    @Test
-    public void testMultiQuorumVoterConnections() {
-        Properties properties = new Properties();
-        properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@kafka1:9092,2@kafka2:9092,3@kafka3:9092");
-        RaftConfig config = new RaftConfig(properties);
-
-        HashMap<Integer, InetSocketAddress> expected = new HashMap<>();
-        expected.put(1, new InetSocketAddress("kafka1", 9092));
-        expected.put(2, new InetSocketAddress("kafka2", 9092));
-        expected.put(3, new InetSocketAddress("kafka3", 9092));
-
-        assertEquals(expected, config.quorumVoterConnections());

Review comment:
       Ah, good catch. Fixed

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##########
@@ -16,36 +16,15 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.Node;
-
 import java.net.InetSocketAddress;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class RaftTestUtil {
-    public static RaftConfig buildRaftConfig(
-            int requestTimeoutMs,
-            int retryBackoffMs,
-            int electionTimeoutMs,
-            int electionBackoffMs,
-            int fetchTimeoutMs,
-            int appendLingerMs,
-            List<Node> voterNodes
-    ) {
-        Map<Integer, InetSocketAddress> voterConnections = voterNodes.stream()
-            .collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port())));
-        return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,
-            fetchTimeoutMs, appendLingerMs);
-    }
-
-    public static List<Node> voterNodesFromIds(Set<Integer> voterIds,
+    public static Map<Integer, InetSocketAddress> 
voterNodesFromIds(Set<Integer> voterIds,
                                                Function<Integer, 
InetSocketAddress> voterAddressGenerator) {
-        return voterIds.stream().map(voterId -> {
-            InetSocketAddress voterAddress = 
voterAddressGenerator.apply(voterId);
-            return new Node(voterId, voterAddress.getHostName(), 
voterAddress.getPort());
-        }).collect(Collectors.toList());
+        return voterIds.stream().collect(Collectors.toMap(id -> id, 
voterAddressGenerator));

Review comment:
       I'm up for removing any extra files when we don't need them. Aaaaaaand, 
it's gone.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java
##########
@@ -16,36 +16,15 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.Node;
-
 import java.net.InetSocketAddress;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class RaftTestUtil {
-    public static RaftConfig buildRaftConfig(
-            int requestTimeoutMs,
-            int retryBackoffMs,
-            int electionTimeoutMs,
-            int electionBackoffMs,
-            int fetchTimeoutMs,
-            int appendLingerMs,
-            List<Node> voterNodes
-    ) {
-        Map<Integer, InetSocketAddress> voterConnections = voterNodes.stream()
-            .collect(Collectors.toMap(Node::id, node -> new 
InetSocketAddress(node.host(), node.port())));
-        return new RaftConfig(voterConnections, requestTimeoutMs, 
retryBackoffMs, electionTimeoutMs, electionBackoffMs,
-            fetchTimeoutMs, appendLingerMs);
-    }
-
-    public static List<Node> voterNodesFromIds(Set<Integer> voterIds,
+    public static Map<Integer, InetSocketAddress> 
voterNodesFromIds(Set<Integer> voterIds,

Review comment:
       Removed in favor of inlined code 
   https://github.com/apache/kafka/pull/9916#discussion_r561375612

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -750,9 +748,9 @@ void start(int nodeId) {
             PersistentState persistentState = nodes.get(nodeId);
             MockNetworkChannel channel = new 
MockNetworkChannel(correlationIdCounter);
             MockMessageQueue messageQueue = new MockMessageQueue();
-            List<Node> voterNodes = voterNodesFromIds(voters, 
Cluster::nodeAddress);
-            RaftConfig raftConfig = buildRaftConfig(REQUEST_TIMEOUT_MS, 
RETRY_BACKOFF_MS, ELECTION_TIMEOUT_MS,
-                    ELECTION_JITTER_MS, FETCH_TIMEOUT_MS, LINGER_MS, 
voterNodes);
+            Map<Integer, InetSocketAddress> voterNodes = 
voterNodesFromIds(voters, Cluster::nodeAddress);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to