jsancio commented on a change in pull request #9881:
URL: https://github.com/apache/kafka/pull/9881#discussion_r557001148



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -62,11 +62,12 @@ default void handleResign() {}
     }
 
     /**
-     * Initialize the client. This should only be called once on startup.
+     * Initialize the client with the given voter string config.
+     * This should only be called once on startup.

Review comment:
       Add a `@param` to the documentation.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -149,6 +152,15 @@ public static ConfigDef configDef() {
         return new ConfigDef(CONFIG);
     }
 
+    public static List<Node> quorumVoterStringsToNodes(String 
quorumVotersString) {
+        return 
parseVoterConnections(Arrays.stream(quorumVotersString.split(","))
+                .filter(part -> !part.isEmpty())
+                .collect(Collectors.toList())).entrySet().stream()

Review comment:
       I think we can avoid this pattern if we refactor `parseVoterConnections` 
to use a new method called `parseVoterConnection` that knows how to convert a 
`String` to a `Node`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -325,8 +341,30 @@ private void fireHandleResign() {
     }
 
     @Override
-    public void initialize() throws IOException {
-        quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+    public void initialize(String quorumVoterStrings) throws IOException {

Review comment:
       How about storing the `RaftConfig` as a field of `KafkaRaftClient`? This 
would allow us to remove a few of the configuration parameter passed through 
the constructor.
   
   @hachikuji is there a reason why `KafkaRaftClient` has one constructor which 
accepts a `RaftConfig` and one constructor that accepts the configurations as 
explicit parameters?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -325,8 +341,30 @@ private void fireHandleResign() {
     }
 
     @Override
-    public void initialize() throws IOException {
-        quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+    public void initialize(String quorumVoterStrings) throws IOException {
+        List<Node> quorumVoterNodes = 
quorumVoterStringsToNodes(quorumVoterStrings);
+        Set<Integer> quorumVoterIds = 
quorumVoterNodes.stream().map(Node::id).collect(Collectors.toSet());

Review comment:
       This is the same as `voterAddresses.keySet()` which is computed below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to