hachikuji commented on a change in pull request #9916: URL: https://github.com/apache/kafka/pull/9916#discussion_r561283768
########## File path: raft/src/test/java/org/apache/kafka/raft/RaftConfigTest.java ########## @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -import org.apache.kafka.common.config.ConfigException; -import org.junit.jupiter.api.Test; - -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashMap; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class RaftConfigTest { - - @Test - public void testSingleQuorumVoterConnections() { - Properties properties = new Properties(); - properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "[email protected]:9092"); - RaftConfig config = new RaftConfig(properties); - assertEquals(Collections.singletonMap(1, new InetSocketAddress("127.0.0.1", 9092)), - config.quorumVoterConnections()); - } - - @Test - public void testMultiQuorumVoterConnections() { - Properties properties = new Properties(); - properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@kafka1:9092,2@kafka2:9092,3@kafka3:9092"); - RaftConfig config = new RaftConfig(properties); - - HashMap<Integer, InetSocketAddress> expected = new HashMap<>(); - expected.put(1, new InetSocketAddress("kafka1", 9092)); - expected.put(2, new InetSocketAddress("kafka2", 9092)); - expected.put(3, new InetSocketAddress("kafka3", 9092)); - - assertEquals(expected, config.quorumVoterConnections()); Review comment: This test case seems stronger than the one that was ported to `KafkaConfigTest`. It is validating the endpoints in addition to the number of voters. Is there any way we can recover this? ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ########## @@ -236,4 +176,31 @@ private static Integer parseVoterId(String idString) { return voterMap; } + public static class ControllerQuorumVotersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + if (value == null) { + throw new ConfigException(name, null); + } + + @SuppressWarnings("unchecked") + List<String> voterStrings = (List) value; + + if (voterStrings.size() == 0) { + // TODO: Add a flag to skip validation for an empty voter string, conditionally. + // For now, skip anyway. See https://github.com/apache/kafka/pull/9916#discussion_r560611932 Review comment: We typically do not leave TODOs in the code. We can file a jira if we think it's important to remember. I'd suggest we just leave this check out and skip the empty check below. ########## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ########## @@ -941,4 +950,38 @@ class KafkaConfigTest { }) } + @Test + def testInvalidQuorumVotersConfig(): Unit = { + assertInvalidQuorumVoters("1") + assertInvalidQuorumVoters("1@") + assertInvalidQuorumVoters("1:") + assertInvalidQuorumVoters("blah@") + assertInvalidQuorumVoters("1@kafka1") + assertInvalidQuorumVoters("1@kafka1:9092,") + assertInvalidQuorumVoters("1@kafka1:9092,") + assertInvalidQuorumVoters("1@kafka1:9092,2") + assertInvalidQuorumVoters("1@kafka1:9092,2@") + assertInvalidQuorumVoters("1@kafka1:9092,2@blah") + assertInvalidQuorumVoters("1@kafka1:9092,2@blah,") + } + + private def assertInvalidQuorumVoters(value: String): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value) + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + } + + @Test + def testValidQuorumVotersConfig(): Unit = { + assertValidQuorumVoters("", 0) + assertValidQuorumVoters("[email protected]:9092", 1) + assertValidQuorumVoters("1@kafka1:9092,2@kafka2:9092,3@kafka3:9092", 3) + } + + private def assertValidQuorumVoters(value: String, expectedVoterCount: Int): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value) + assertDoesNotThrow(() => KafkaConfig.fromProps(props)) Review comment: Hmm.. `assertDoesNotThrow` seems kind of redundant. The test would fail anyway if it did throw. ########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -1258,6 +1267,15 @@ object KafkaConfig { .define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc) .define(PasswordEncoderKeyLengthProp, INT, Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc) .define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc) + + /** ********* Raft Quorum Configuration *********/ + .defineInternal(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QuorumVoters, new RaftConfig.ControllerQuorumVotersValidator(), HIGH) Review comment: Can we change `defineInternal` to include the documentation string so that we don't forget to add it back later? ########## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ########## @@ -35,24 +35,10 @@ public static RaftConfig buildRaftConfig( int appendLingerMs, List<Node> voterNodes ) { - Properties properties = new Properties(); - properties.put(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); - properties.put(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); - properties.put(RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, electionTimeoutMs); - properties.put(RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, electionBackoffMs); - properties.put(RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, fetchTimeoutMs); - properties.put(RaftConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs); - - StringBuilder votersString = new StringBuilder(); - String prefix = ""; - for (Node voter : voterNodes) { - votersString.append(prefix); - votersString.append(voter.id()).append('@').append(voter.host()).append(':').append(voter.port()); - prefix = ","; - } - properties.put(RaftConfig.QUORUM_VOTERS_CONFIG, votersString.toString()); - - return new RaftConfig(properties); + Map<Integer, InetSocketAddress> voterConnections = voterNodes.stream() + .collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port()))); + return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, Review comment: The usage is a tad odd in the tests. We use `voterNodesFromIds` to build `Node` objects from `InetSocketAddress`, then we call `buildRaftConfig` which then turns `Node` back into `InetSocketAddress`. It would be simpler to let `voterNodesFromIds` return `Map<Integer, InetSocketAddress>`. Then we could remove `buildRaftConfig`. ########## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ########## @@ -750,9 +748,9 @@ void start(int nodeId) { PersistentState persistentState = nodes.get(nodeId); MockNetworkChannel channel = new MockNetworkChannel(correlationIdCounter); MockMessageQueue messageQueue = new MockMessageQueue(); - List<Node> voterNodes = voterNodesFromIds(voters, Cluster::nodeAddress); - RaftConfig raftConfig = buildRaftConfig(REQUEST_TIMEOUT_MS, RETRY_BACKOFF_MS, ELECTION_TIMEOUT_MS, - ELECTION_JITTER_MS, FETCH_TIMEOUT_MS, LINGER_MS, voterNodes); + Map<Integer, InetSocketAddress> voterNodes = voterNodesFromIds(voters, Cluster::nodeAddress); Review comment: nit: `voterNodes` -> `voterAddresses` or `voterAddressMap`? ########## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ########## @@ -16,36 +16,15 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.Node; - import java.net.InetSocketAddress; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; public class RaftTestUtil { - public static RaftConfig buildRaftConfig( - int requestTimeoutMs, - int retryBackoffMs, - int electionTimeoutMs, - int electionBackoffMs, - int fetchTimeoutMs, - int appendLingerMs, - List<Node> voterNodes - ) { - Map<Integer, InetSocketAddress> voterConnections = voterNodes.stream() - .collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port()))); - return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, - fetchTimeoutMs, appendLingerMs); - } - - public static List<Node> voterNodesFromIds(Set<Integer> voterIds, + public static Map<Integer, InetSocketAddress> voterNodesFromIds(Set<Integer> voterIds, Review comment: nit: maybe `buildAddressMap` or something like that? ########## File path: raft/src/test/java/org/apache/kafka/raft/RaftTestUtil.java ########## @@ -16,36 +16,15 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.Node; - import java.net.InetSocketAddress; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; public class RaftTestUtil { - public static RaftConfig buildRaftConfig( - int requestTimeoutMs, - int retryBackoffMs, - int electionTimeoutMs, - int electionBackoffMs, - int fetchTimeoutMs, - int appendLingerMs, - List<Node> voterNodes - ) { - Map<Integer, InetSocketAddress> voterConnections = voterNodes.stream() - .collect(Collectors.toMap(Node::id, node -> new InetSocketAddress(node.host(), node.port()))); - return new RaftConfig(voterConnections, requestTimeoutMs, retryBackoffMs, electionTimeoutMs, electionBackoffMs, - fetchTimeoutMs, appendLingerMs); - } - - public static List<Node> voterNodesFromIds(Set<Integer> voterIds, + public static Map<Integer, InetSocketAddress> voterNodesFromIds(Set<Integer> voterIds, Function<Integer, InetSocketAddress> voterAddressGenerator) { - return voterIds.stream().map(voterId -> { - InetSocketAddress voterAddress = voterAddressGenerator.apply(voterId); - return new Node(voterId, voterAddress.getHostName(), voterAddress.getPort()); - }).collect(Collectors.toList()); + return voterIds.stream().collect(Collectors.toMap(id -> id, voterAddressGenerator)); Review comment: Guess we can keep it, but this helper doesn't seem to be doing much for us anymore. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
