guozhangwang commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r512389580
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1605,100 +1708,18 @@ public void poll() throws IOException { } } - private void failPendingAppends(KafkaException exception) { - for (UnwrittenAppend unwrittenAppend : unwrittenAppends) { - unwrittenAppend.fail(exception); - } - unwrittenAppends.clear(); - } - - private void pollPendingAppends(LeaderState state, long currentTimeMs) { - int numAppends = 0; - int maxNumAppends = unwrittenAppends.size(); - - while (!unwrittenAppends.isEmpty() && numAppends < maxNumAppends) { - final UnwrittenAppend unwrittenAppend = unwrittenAppends.poll(); - - if (unwrittenAppend.future.isDone()) - continue; - - if (unwrittenAppend.isTimedOut(currentTimeMs)) { - unwrittenAppend.fail(new TimeoutException("Request timeout " + unwrittenAppend.requestTimeoutMs - + " expired before the records could be appended to the log")); - } else { - int epoch = quorum.epoch(); - LogAppendInfo info = appendAsLeader(unwrittenAppend.records); - OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch); - long numRecords = info.lastOffset - info.firstOffset + 1; - logger.debug("Completed write of {} records at {}", numRecords, offsetAndEpoch); - - if (unwrittenAppend.ackMode == AckMode.LEADER) { - unwrittenAppend.complete(offsetAndEpoch); - } else if (unwrittenAppend.ackMode == AckMode.QUORUM) { - CompletableFuture<Long> future = appendPurgatory.await( - LogOffset.awaitCommitted(offsetAndEpoch.offset), - unwrittenAppend.requestTimeoutMs); - - future.whenComplete((completionTimeMs, exception) -> { - if (exception != null) { - logger.error("Failed to commit append at {} due to {}", offsetAndEpoch, exception); - - unwrittenAppend.fail(exception); - } else { - long elapsedTime = Math.max(0, completionTimeMs - currentTimeMs); - double elapsedTimePerRecord = (double) elapsedTime / numRecords; - kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, currentTimeMs); - unwrittenAppend.complete(offsetAndEpoch); - - logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch); - } - }); - } - } - - numAppends++; - } - - if (numAppends > 0) { - flushLeaderLog(state, currentTimeMs); - } - } - - /** - * Append a set of records to the log. Successful completion of the future indicates a success of - * the append, with the uncommitted base offset and epoch. - * - * @param records The records to write to the log - * @param ackMode The commit mode for the appended records - * @param timeoutMs The maximum time to wait for the append operation to complete (including - * any time needed for replication) - * @return The uncommitted base offset and epoch of the appended records - */ @Override - public CompletableFuture<OffsetAndEpoch> append( - Records records, - AckMode ackMode, - long timeoutMs - ) { - if (records.sizeInBytes() == 0) - throw new IllegalArgumentException("Attempt to append empty record set"); - - if (shutdown.get() != null) - throw new IllegalStateException("Cannot append records while we are shutting down"); - - if (quorum.isObserver()) - throw new IllegalStateException("Illegal attempt to write to an observer"); - - CompletableFuture<OffsetAndEpoch> future = new CompletableFuture<>(); - UnwrittenAppend unwrittenAppend = new UnwrittenAppend( - records, time.milliseconds(), timeoutMs, ackMode, future); + public Long scheduleAppend(int epoch, List<T> records) { + BatchAccumulator<T> accumulator = this.accumulator; + if (accumulator == null) { + return Long.MAX_VALUE; Review comment: Sounds good. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org