jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r622489394



##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -67,6 +74,30 @@ protected LeaderState(
         }
         this.grantingVoters.addAll(grantingVoters);
         this.log = logContext.logger(LeaderState.class);
+        this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+    }
+
+    public BatchAccumulator<T> accumulator() {
+        return this.accumulator;
+    }
+
+    private static List<Voter> convertToVoters(Set<Integer> voterIds) {
+        return voterIds.stream()
+            .map(follower -> new Voter().setVoterId(follower))
+            .collect(Collectors.toList());
+    }
+
+    public void appendLeaderChangeMessage(long currentTimeMs) {
+        List<Voter> voters = convertToVoters(voterStates.keySet());
+        List<Voter> grantingVoters = convertToVoters(this.grantingVoters());
+
+        LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
+            .setLeaderId(this.election().leaderId())
+            .setVoters(voters)
+            .setGrantingVoters(grantingVoters);
+        
+        accumulator.appendLeaderChangeMessage(leaderChangeMessage, 
currentTimeMs);
+        accumulator.forceDrain();

Review comment:
       Okay. We are attempting to keep the semantic for 
`appendLeaderChangeMessage` similar to that of the other `append` methods. In 
that case I am okay with this API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to