jsancio commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r828471282
########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -673,10 +679,19 @@ public void run() throws Exception { replay(message.message(), Optional.empty(), offset); } snapshotRegistry.getOrCreateSnapshot(offset); + log.debug("Read-write operation {} will be completed when the log " + "reaches offset {}.", this, resultAndOffset.offset()); } - purgatory.add(resultAndOffset.offset(), this); + + // After every controller write event, schedule a leader rebalance if there are any topic partition + // with leader that is not the preferred leader. + maybeScheduleNextBalancePartitionLeaders(); + + // Remember the latest offset and future if it is not already completed + if (!future.isDone()) { Review comment: We need this check. The old code returned early and completed the future when the event didn't have any records to append. For are write event there are three cases: 1. The write operation generated records. In this case we append them to the log, wait for the return offset to get commit and finally complete the future when the committed offset it reached. 2. The write operation doesn't generate any records but we have uncommitted state in-memory. This case is similar to 1. but instead of waiting for the appended offset we wait for the highest offset in the purgatory. 3. The write operation doesn't generate any records and there is no uncommitted state. In this case we complete the future immediately and don't store in the purgatory. I had to add this check because I removed the `return` early in the previous code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org