jsancio commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r555472538
########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -211,6 +214,33 @@ public long timeUntilDrain(long currentTimeMs) { } } + /** + * Check if the current batch size has exceeded the min flush size. + * + * Note that this method works on best effort i.e it tries to acquire the append lock and if it can't + * then instead of blocking, it returns false. + * + * This means that if the thread responsible for appending is holding the lock and the linger time hasn't expired + * yet, then even though the batch size exceeds the min flush size, the records won't be drained as the lock + * couldn't be acquired. This also means that in subsequent run(s), this method should be able to acquire the lock + * and return true in the event the linger time hasn't expired yet. + * + * @return true if the append lock could be acquired and the accumulated bytes are greater than configured min flush + * bytes size, false otherwise. + */ + public boolean batchSizeExceedsMinFlushSize() { Review comment: > 1. Inside append, while we are already holding the lock, we can check if the accumulated bytes (including completed and currentBatch) have reached minFlushSize. If so, we can call completeCurrentBatch to ensure that completed holds all the data that needs to be drained. > 2. Inside timeUntilDrain, if the linger timer hasn't been reached, we can iterate completed and check if there are enough bytes to flush. Then we don't need to acquire the lock unless we need to drain. I like this suggestion. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org