showuon commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1470688447


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -290,7 +320,15 @@ public Optional<String> serverConfigName(String 
configName) {
             .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
                 TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
             .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
-                TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+                TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+            .define(LOG_DIR_PROP, STRING, DEFAULT_LOG_DIR, HIGH, LOG_DIR_DOC)
+            .define(LOG_DIRS_PROP, STRING, null, HIGH, LOG_DIRS_DOC)
+            .define(METADATA_LOG_DIR_PROP, STRING, null, HIGH, 
METADATA_LOG_DIR_DOC)
+            .define(INTER_BROKER_PROTOCOL_VERSION_PROP, STRING, 
DEFAULT_INTER_BROKER_PROTOCOL_VERSION, new MetadataVersionValidator(), MEDIUM, 
INTER_BROKER_PROTOCOL_VERSION_DOC)
+            // This indicates whether unreleased APIs should be advertised by 
this node.
+            .defineInternal(UNSTABLE_API_VERSIONS_ENABLE_PROP, BOOLEAN, 
DEFAULT_UNSTABLE_API_VERSIONS_ENABLE, HIGH)
+            // This indicates whether unreleased MetadataVersions should be 
enabled on this node.
+            .defineInternal(UNSTABLE_METADATA_VERSIONS_ENABLE_PROP, BOOLEAN, 
DEFAULT_UNSTABLE_METADATA_VERSIONS_ENABLE, HIGH);

Review Comment:
   I didn't see where we remove the definition from original kafkaConfig?



##########
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##########
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
         }
     }
 
+    private static Set<String> parseProcessRoles(List<String> processRoles, 
Map<Integer, AddressSpec> voterConnections, int nodeId) {

Review Comment:
   Where do we remove the original `parseProcessRoles` method?



##########
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##########
@@ -94,6 +94,18 @@ class LogConfigTest {
       case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_boolean")
       case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
       case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
+      case LogConfig.LOG_DIR_PROP => assert(true)
+      case LogConfig.LOG_DIRS_PROP => assert(true)

Review Comment:
   What are we testing here?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -500,6 +553,34 @@ public static Map<String, ConfigKey> configKeys() {
         return Collections.unmodifiableMap(CONFIG.configKeys());
     }
 
+    public List<String> logDirs() {
+        String csvList = logDirs != null ? logDirs : logDir;
+        if (csvList == null || csvList.isEmpty()) {
+            return Collections.emptyList();
+        } else {
+            return Arrays.stream(csvList.split("\\s*,\\s*"))
+                .filter(v -> !v.equals(""))
+                .collect(Collectors.toList());
+        }
+    }
+
+    public String getMetadataLogDir() {
+        if (metadataLogDir != null) {
+            return metadataLogDir;
+        } else {
+            return logDirs().get(0);
+        }
+    }
+
+    public Optional<String> interBrokerProtocolVersion() {
+        String originalIBP = (String) 
originals().get(LogConfig.INTER_BROKER_PROTOCOL_VERSION_PROP);
+        return originalIBP != null ? Optional.of(originalIBP) : 
Optional.empty();
+    }
+
+    public Boolean unstableMetadataVersionsEnabled() {
+        return unstableMetadataVersionsEnabled;
+    }

Review Comment:
   Why do we need these methods?



##########
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##########
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
         }
     }
 
+    private static Set<String> parseProcessRoles(List<String> processRoles, 
Map<Integer, AddressSpec> voterConnections, int nodeId) {
+        Set<String> distinctRoles = new HashSet<>();
+        for (String role : processRoles) {
+            switch (role) {
+                case "broker":
+                    distinctRoles.add("BrokerRole");

Review Comment:
   Why don't we use the `ProcessRole` as before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to