hachikuji commented on a change in pull request #9916:
URL: https://github.com/apache/kafka/pull/9916#discussion_r560467367
##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -138,31 +75,35 @@ public String toString() {
private final int appendLingerMs;
private final Map<Integer, InetSocketAddress> voterConnections;
- public RaftConfig(Map<?, ?> props) {
- this(props, true);
- }
-
- protected RaftConfig(Map<?, ?> props, boolean doLog) {
- super(CONFIG, props, doLog);
- requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
- retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
- electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
- electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
- fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
- appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);
- voterConnections =
parseVoterConnections(getList(QUORUM_VOTERS_CONFIG));
- }
-
- public static Set<String> configNames() {
- return CONFIG.names();
- }
-
- public static ConfigDef configDef() {
- return new ConfigDef(CONFIG);
- }
-
- public static void main(String[] args) {
- System.out.println(CONFIG.toHtml());
+ public RaftConfig(AbstractConfig abstractConfig) {
+ this(abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG),
+ abstractConfig.getString(QUORUM_VOTERS_CONFIG));
+ }
+
+ public RaftConfig(
+ int requestTimeoutMs,
Review comment:
nit: usual alignment is just 4 spaces
##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -574,6 +583,15 @@ object KafkaConfig {
val PasswordEncoderKeyLengthProp = "password.encoder.key.length"
val PasswordEncoderIterationsProp = "password.encoder.iterations"
+ /** ********* Raft Quorum Configuration *********/
+ val QuorumVotersProp = RaftConfig.QUORUM_VOTERS_CONFIG
Review comment:
I know we have done so for other configs, but do we really need to
duplicate these property strings?
##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -138,31 +75,35 @@ public String toString() {
private final int appendLingerMs;
private final Map<Integer, InetSocketAddress> voterConnections;
- public RaftConfig(Map<?, ?> props) {
- this(props, true);
- }
-
- protected RaftConfig(Map<?, ?> props, boolean doLog) {
- super(CONFIG, props, doLog);
- requestTimeoutMs = getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
- retryBackoffMs = getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
- electionTimeoutMs = getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
- electionBackoffMaxMs = getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
- fetchTimeoutMs = getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
- appendLingerMs = getInt(QUORUM_LINGER_MS_CONFIG);
- voterConnections =
parseVoterConnections(getList(QUORUM_VOTERS_CONFIG));
- }
-
- public static Set<String> configNames() {
- return CONFIG.names();
- }
-
- public static ConfigDef configDef() {
- return new ConfigDef(CONFIG);
- }
-
- public static void main(String[] args) {
- System.out.println(CONFIG.toHtml());
+ public RaftConfig(AbstractConfig abstractConfig) {
+ this(abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG),
+ abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG),
+ abstractConfig.getString(QUORUM_VOTERS_CONFIG));
+ }
+
+ public RaftConfig(
+ int requestTimeoutMs,
+ int retryBackoffMs,
+ int electionTimeoutMs,
+ int electionBackoffMaxMs,
+ int fetchTimeoutMs,
+ int appendLingerMs,
+ String votersConnectString
Review comment:
nit: doesn't matter too much, but could we move this argument to the top
to emphasize its importance?
##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1258,6 +1276,15 @@ object KafkaConfig {
.define(PasswordEncoderCipherAlgorithmProp, STRING,
Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc)
.define(PasswordEncoderKeyLengthProp, INT,
Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
.define(PasswordEncoderIterationsProp, INT,
Defaults.PasswordEncoderIterations, atLeast(1024), LOW,
PasswordEncoderIterationsDoc)
+
+ /** ********* Raft Quorum Configuration *********/
+ .defineInternal(QuorumVotersProp, STRING, Defaults.QuorumVoters, HIGH)
Review comment:
Why don't we make the type a LIST like we had previously?
##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1687,6 +1714,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
def compressionType = getString(KafkaConfig.CompressionTypeProp)
+ /** ********* Raft Quorum Configuration *********/
+ val quorumVoters = getString(KafkaConfig.QuorumVotersProp)
Review comment:
Can we add a check somewhere that `quorum.voters` is required if
`process.roles` is non-empty?
##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -18,117 +18,54 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import java.net.InetSocketAddress;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import static
org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+public class RaftConfig {
-public class RaftConfig extends AbstractConfig {
- private static final ConfigDef CONFIG;
-
- private static final String QUORUM_PREFIX = "quorum.";
+ private static final String QUORUM_PREFIX = "controller.quorum.";
public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters";
- private static final String QUORUM_VOTERS_DOC = "Map of id/endpoint
information for " +
+ public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint
information for " +
"the set of voters in a comma-separated list of `{id}@{host}:{port}`
entries. " +
"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`";
+ public static final String DEFAULT_QUORUM_VOTERS = "";
public static final String QUORUM_ELECTION_TIMEOUT_MS_CONFIG =
QUORUM_PREFIX + "election.timeout.ms";
- private static final String QUORUM_ELECTION_TIMEOUT_MS_DOC = "Maximum time
in milliseconds to wait " +
+ public static final String QUORUM_ELECTION_TIMEOUT_MS_DOC = "Maximum time
in milliseconds to wait " +
"without being able to fetch from the leader before triggering a new
election";
+ public static final int DEFAULT_QUORUM_ELECTION_TIMEOUT_MS = 5_000;
public static final String QUORUM_FETCH_TIMEOUT_MS_CONFIG = QUORUM_PREFIX
+ "fetch.timeout.ms";
- private static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time
without a successful fetch from " +
+ public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time
without a successful fetch from " +
"the current leader before becoming a candidate and triggering a
election for voters; Maximum time without " +
"receiving fetch from a majority of the quorum before asking around to
see if there's a new epoch for leader";
+ public static final int DEFAULT_QUORUM_FETCH_TIMEOUT_MS = 15_000;
public static final String QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG =
QUORUM_PREFIX + "election.backoff.max.ms";
- private static final String QUORUM_ELECTION_BACKOFF_MAX_MS_DOC = "Maximum
time in milliseconds before starting new elections. " +
+ public static final String QUORUM_ELECTION_BACKOFF_MAX_MS_DOC = "Maximum
time in milliseconds before starting new elections. " +
"This is used in the binary exponential backoff mechanism that helps
prevent gridlocked elections";
+ public static final int DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS = 5_000;
public static final String QUORUM_LINGER_MS_CONFIG = QUORUM_PREFIX +
"append.linger.ms";
- private static final String QUORUM_LINGER_MS_DOC = "The duration in
milliseconds that the leader will " +
+ public static final String QUORUM_LINGER_MS_DOC = "The duration in
milliseconds that the leader will " +
"wait for writes to accumulate before flushing them to disk.";
+ public static final int DEFAULT_QUORUM_LINGER_MS = 25;
- // Package-private for testing
- static final String QUORUM_REQUEST_TIMEOUT_MS_CONFIG = QUORUM_PREFIX +
+ public static final String QUORUM_REQUEST_TIMEOUT_MS_CONFIG =
QUORUM_PREFIX +
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+ public static final String QUORUM_REQUEST_TIMEOUT_MS_DOC =
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+ public static final int DEFAULT_QUORUM_REQUEST_TIMEOUT_MS = 20_000;
- // Package-private for testing
- static final String QUORUM_RETRY_BACKOFF_MS_CONFIG = QUORUM_PREFIX +
+ public static final String QUORUM_RETRY_BACKOFF_MS_CONFIG = QUORUM_PREFIX +
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
-
- static {
- CONFIG = new ConfigDef()
- .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG,
- ConfigDef.Type.INT,
- 20000,
- atLeast(0),
- ConfigDef.Importance.MEDIUM,
- REQUEST_TIMEOUT_MS_DOC)
- .define(QUORUM_RETRY_BACKOFF_MS_CONFIG,
- ConfigDef.Type.INT,
- 100,
- atLeast(0L),
- ConfigDef.Importance.LOW,
- CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
- .define(QUORUM_VOTERS_CONFIG,
- ConfigDef.Type.LIST,
- ConfigDef.NO_DEFAULT_VALUE,
- new ConfigDef.Validator() {
- @Override
- public void ensureValid(String name, Object value) {
- if (value == null) {
- throw new ConfigException(name, null);
- }
-
- @SuppressWarnings("unchecked")
- Map<Integer, InetSocketAddress> voterConnections =
parseVoterConnections((List) value);
Review comment:
We seem to have lost this validation. Can we just move it to
`KafkaConfig` instead?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]