hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509714949
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1443,15 +1485,79 @@ private void pollShutdown(GracefulShutdown shutdown) throws IOException { } } + private void appendBatch( + LeaderState state, + BatchAccumulator.CompletedBatch<T> batch, + long appendTimeMs + ) { + try { + List<T> records = batch.records; + int epoch = state.epoch(); + + LogAppendInfo info = appendAsLeader(batch.data); + OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch); + CompletableFuture<Long> future = appendPurgatory.await( + LogOffset.awaitCommitted(offsetAndEpoch.offset), + Integer.MAX_VALUE + ); + + future.whenComplete((commitTimeMs, exception) -> { + int numRecords = batch.records.size(); + if (exception != null) { + logger.debug("Failed to commit {} records at {}", numRecords, offsetAndEpoch, exception); + } else { + long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs); + double elapsedTimePerRecord = (double) elapsedTime / numRecords; + kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs); + logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch); + listener.handleCommit(epoch, info.lastOffset, records); + } + }); + } finally { + batch.release(); + } + } + + private long maybeAppendBatches( + LeaderState state, + long currentTimeMs + ) { + long timeUnitFlush = accumulator.timeUntilFlush(currentTimeMs); + if (timeUnitFlush <= 0) { + List<BatchAccumulator.CompletedBatch<T>> batches = accumulator.flush(); + Iterator<BatchAccumulator.CompletedBatch<T>> iterator = batches.iterator(); + + try { + while (iterator.hasNext()) { + BatchAccumulator.CompletedBatch<T> batch = iterator.next(); + appendBatch(state, batch, currentTimeMs); + } + flushLeaderLog(state, currentTimeMs); Review comment: Yes, I agree with you. Of course it is ok if unflushed data gets replicated. The main thing we need to protect is incrementing the high watermark. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org