dajac commented on code in PR #16215: URL: https://github.com/apache/kafka/pull/16215#discussion_r1633209182
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -583,11 +674,339 @@ private void unload() { } timer.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); + failCurrentBatch(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { coordinator.onUnloaded(); } coordinator = null; } + + /** + * Frees the current batch. + */ + private void freeCurrentBatch() { + // Cancel the linger timeout. + currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); + + // Release the buffer. + bufferSupplier.release(currentBatch.buffer); + + currentBatch = null; + } + + /** + * Flushes the current (or pending) batch to the log. When the batch is written + * locally, a new snapshot is created in the snapshot registry and the events + * associated with the batch are added to the deferred event queue. + */ + private void flushCurrentBatch() { + if (currentBatch != null) { + try { + // Write the records to the log and update the last written offset. + long offset = partitionWriter.append( + tp, + currentBatch.verificationGuard, + currentBatch.builder.build() + ); + coordinator.updateLastWrittenOffset(offset); + + if (offset != currentBatch.nextOffset) { + throw new IllegalStateException("The state machine of coordinator " + tp + " is out of sync with the " + + "underlying log. The last write returned " + offset + " while " + currentBatch.nextOffset + " was expected"); + } + + // Add all the pending deferred events to the deferred event queue. + for (DeferredEvent event : currentBatch.deferredEvents) { + deferredEventQueue.add(offset, event); + } + + // Free up the current batch. + freeCurrentBatch(); + } catch (Throwable t) { + log.error("Writing records to {} failed due to: {}.", tp, t.getMessage()); + failCurrentBatch(t); + // We rethrow the exception for the caller to handle it too. + throw t; + } + } + } + + /** + * Flushes the current batch if it is transactional or if it has passed the append linger time. + */ + private void maybeFlushCurrentBatch(long currentTimeMs) { + if (currentBatch != null) { + if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) { + flushCurrentBatch(); + } + } + } + + /** + * Fails the current batch, reverts to the snapshot to the base/start offset of the + * batch, fails all the associated events. + */ + private void failCurrentBatch(Throwable t) { + if (currentBatch != null) { + coordinator.revertLastWrittenOffset(currentBatch.baseOffset); + for (DeferredEvent event : currentBatch.deferredEvents) { + event.complete(t); + } + freeCurrentBatch(); + } + } + + /** + * Allocates a new batch if none already exists. + */ + private void maybeAllocateNewBatch( + long producerId, + short producerEpoch, + VerificationGuard verificationGuard, + long currentTimeMs + ) { + if (currentBatch == null) { + LogConfig logConfig = partitionWriter.config(tp); + byte magic = logConfig.recordVersion().value; + int maxBatchSize = logConfig.maxMessageSize(); + long prevLastWrittenOffset = coordinator.lastWrittenOffset(); + ByteBuffer buffer = bufferSupplier.get(maxBatchSize); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder( + buffer, + magic, + compression, + TimestampType.CREATE_TIME, + 0L, + currentTimeMs, + producerId, + producerEpoch, + 0, + producerId != RecordBatch.NO_PRODUCER_ID, + false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + maxBatchSize + ); + + Optional<TimerTask> lingerTimeoutTask = Optional.empty(); + if (appendLingerMs > 0) { + lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs) { + @Override + public void run() { + scheduleInternalOperation("FlushBatch", tp, () -> { Review Comment: Updated the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org