guozhangwang commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r512389580



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1605,100 +1708,18 @@ public void poll() throws IOException {
         }
     }
 
-    private void failPendingAppends(KafkaException exception) {
-        for (UnwrittenAppend unwrittenAppend : unwrittenAppends) {
-            unwrittenAppend.fail(exception);
-        }
-        unwrittenAppends.clear();
-    }
-
-    private void pollPendingAppends(LeaderState state, long currentTimeMs) {
-        int numAppends = 0;
-        int maxNumAppends = unwrittenAppends.size();
-
-        while (!unwrittenAppends.isEmpty() && numAppends < maxNumAppends) {
-            final UnwrittenAppend unwrittenAppend = unwrittenAppends.poll();
-
-            if (unwrittenAppend.future.isDone())
-                continue;
-
-            if (unwrittenAppend.isTimedOut(currentTimeMs)) {
-                unwrittenAppend.fail(new TimeoutException("Request timeout " + 
unwrittenAppend.requestTimeoutMs
-                    + " expired before the records could be appended to the 
log"));
-            } else {
-                int epoch = quorum.epoch();
-                LogAppendInfo info = appendAsLeader(unwrittenAppend.records);
-                OffsetAndEpoch offsetAndEpoch = new 
OffsetAndEpoch(info.lastOffset, epoch);
-                long numRecords = info.lastOffset - info.firstOffset + 1;
-                logger.debug("Completed write of {} records at {}", 
numRecords, offsetAndEpoch);
-
-                if (unwrittenAppend.ackMode == AckMode.LEADER) {
-                    unwrittenAppend.complete(offsetAndEpoch);
-                } else if (unwrittenAppend.ackMode == AckMode.QUORUM) {
-                    CompletableFuture<Long> future = appendPurgatory.await(
-                        LogOffset.awaitCommitted(offsetAndEpoch.offset),
-                        unwrittenAppend.requestTimeoutMs);
-
-                    future.whenComplete((completionTimeMs, exception) -> {
-                        if (exception != null) {
-                            logger.error("Failed to commit append at {} due to 
{}", offsetAndEpoch, exception);
-
-                            unwrittenAppend.fail(exception);
-                        } else {
-                            long elapsedTime = Math.max(0, completionTimeMs - 
currentTimeMs);
-                            double elapsedTimePerRecord = (double) elapsedTime 
/ numRecords;
-                            
kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, currentTimeMs);
-                            unwrittenAppend.complete(offsetAndEpoch);
-
-                            logger.debug("Completed commit of {} records at 
{}", numRecords, offsetAndEpoch);
-                        }
-                    });
-                }
-            }
-
-            numAppends++;
-        }
-
-        if (numAppends > 0) {
-            flushLeaderLog(state, currentTimeMs);
-        }
-    }
-
-    /**
-     * Append a set of records to the log. Successful completion of the future 
indicates a success of
-     * the append, with the uncommitted base offset and epoch.
-     *
-     * @param records The records to write to the log
-     * @param ackMode The commit mode for the appended records
-     * @param timeoutMs The maximum time to wait for the append operation to 
complete (including
-     *                  any time needed for replication)
-     * @return The uncommitted base offset and epoch of the appended records
-     */
     @Override
-    public CompletableFuture<OffsetAndEpoch> append(
-        Records records,
-        AckMode ackMode,
-        long timeoutMs
-    ) {
-        if (records.sizeInBytes() == 0)
-            throw new IllegalArgumentException("Attempt to append empty record 
set");
-
-        if (shutdown.get() != null)
-            throw new IllegalStateException("Cannot append records while we 
are shutting down");
-
-        if (quorum.isObserver())
-            throw new IllegalStateException("Illegal attempt to write to an 
observer");
-
-        CompletableFuture<OffsetAndEpoch> future = new CompletableFuture<>();
-        UnwrittenAppend unwrittenAppend = new UnwrittenAppend(
-            records, time.milliseconds(), timeoutMs, ackMode, future);
+    public Long scheduleAppend(int epoch, List<T> records) {
+        BatchAccumulator<T> accumulator = this.accumulator;
+        if (accumulator == null) {
+            return Long.MAX_VALUE;

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to