jeffkbkim commented on code in PR #16215: URL: https://github.com/apache/kafka/pull/16215#discussion_r1628486465
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -443,6 +445,81 @@ public int size() { } } + /** + * A simple container class to hold all the attributes + * related to a pending batch. + */ + private static class CoordinatorBatch { + /** + * The base (or first) offset of the batch. If the batch fails + * for any reason, the state machines is rolled back to it. + */ + final long baseOffset; + + /** + * The time at which the batch was created. + */ + final long appendTimeMs; + + /** + * The max batch size. + */ + final int maxBatchSize; + + /** + * The verification guard associated to the batch if it is + * transactional. + */ + final VerificationGuard verificationGuard; + + /** + * The byte buffer backing the records builder. + */ + final ByteBuffer buffer; + + /** + * The records builder. + */ + final MemoryRecordsBuilder builder; + + /** + * The timer used to enfore the append linger time if + * it is non-zero. + */ + final Optional<TimerTask> lingerTimeoutTask; + + /** + * The list of events associated with the batch. + */ + final List<DeferredEvent> events; Review Comment: nit: understandably events is simpler but i still think pendingEvents or deferredEvents gives more context, especially when we add events to this list. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -583,11 +674,330 @@ private void unload() { } timer.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); + failCurrentBatch(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { coordinator.onUnloaded(); } coordinator = null; } + + /** + * Frees the current batch. + */ + private void freeCurrentBatch() { + // Cancel the linger timeout. + currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); + + // Release the buffer. + bufferSupplier.release(currentBatch.buffer); + + currentBatch = null; + } + + /** + * Writes the current (or pending) batch to the log. When the batch is written + * locally, a new snapshot is created in the snapshot registry and the events + * associated with the batch are added to the deferred event queue. + */ + private void writeCurrentBatch() { + if (currentBatch != null) { + try { + // Write the records to the log and update the last written offset. + long offset = partitionWriter.append( + tp, + currentBatch.verificationGuard, + currentBatch.builder.build() + ); + coordinator.updateLastWrittenOffset(offset); + + // Add all the pending deferred events to the deferred event queue. + for (DeferredEvent event : currentBatch.events) { + deferredEventQueue.add(offset, event); + } + + // Free up the current batch. + freeCurrentBatch(); + } catch (Throwable t) { + failCurrentBatch(t); + } + } + } + + /** + * Writes the current batch if it is transactional or if it has past the append linger time. + */ + private void maybeWriteCurrentBatch(long currentTimeMs) { + if (currentBatch != null) { + if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) { + writeCurrentBatch(); + } + } + } + + /** + * Fails the current batch, reverts to the snapshot to the base/start offset of the + * batch, fails all the associated events. + */ + private void failCurrentBatch(Throwable t) { + if (currentBatch != null) { + coordinator.revertLastWrittenOffset(currentBatch.baseOffset); + for (DeferredEvent event : currentBatch.events) { + event.complete(t); + } + freeCurrentBatch(); + } + } + + /** + * Allocates a new batch if none already exists. + */ + private void maybeAllocateNewBatch( + long producerId, + short producerEpoch, + VerificationGuard verificationGuard, + long currentTimeMs + ) { + if (currentBatch == null) { + LogConfig logConfig = partitionWriter.config(tp); + byte magic = logConfig.recordVersion().value; + int maxBatchSize = logConfig.maxMessageSize(); + long prevLastWrittenOffset = coordinator.lastWrittenOffset(); + ByteBuffer buffer = bufferSupplier.get(maxBatchSize); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder( + buffer, + magic, + compression, + TimestampType.CREATE_TIME, + 0L, + currentTimeMs, + producerId, + producerEpoch, + 0, + producerId != RecordBatch.NO_PRODUCER_ID, + false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + maxBatchSize + ); + + Optional<TimerTask> lingerTimeoutTask = Optional.empty(); + if (appendLingerMs > 0) { + lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs) { + @Override + public void run() { + scheduleInternalOperation("FlushBatch", tp, () -> { + if (this.isCancelled()) return; + withActiveContextOrThrow(tp, CoordinatorContext::writeCurrentBatch); + }); + } + }); + } + + currentBatch = new CoordinatorBatch( + prevLastWrittenOffset, + currentTimeMs, + maxBatchSize, + verificationGuard, + buffer, + builder, + lingerTimeoutTask + ); + } + } + + /** + * Appends records to the log and replay them to the state machine. + * + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param verificationGuard The verification guard. + * @param records The records to append. + * @param replay A boolean indicating whether the records + * must be replayed or not. + * @param event The event that must be completed when the + * records are written. + */ + private void append( + long producerId, + short producerEpoch, + VerificationGuard verificationGuard, + List<U> records, + boolean replay, + DeferredEvent event + ) { + if (state != CoordinatorState.ACTIVE) { + throw new IllegalStateException("Coordinator must be active to append records"); + } + + if (records.isEmpty()) { + // If the records are empty, it was a read operation after all. In this case, + // the response can be returned directly iff there are no pending write operations; + // otherwise, the read needs to wait on the last write operation to be completed. + if (currentBatch != null) { + currentBatch.events.add(event); + } else { + OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); + if (pendingOffset.isPresent()) { + deferredEventQueue.add(pendingOffset.getAsLong(), event); + } else { + event.complete(null); + } + } + } else { + // If the records are not empty, first, they are applied to the state machine, + // second, then are appended to the opened batch. + long currentTimeMs = time.milliseconds(); + + // If the current write operation is transactional, the current batch + // is written before proceeding with it. + if (producerId != RecordBatch.NO_PRODUCER_ID) { + writeCurrentBatch(); + } + + // Allocate a new batch if none exists. + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + + // Prepare the records. + List<SimpleRecord> recordsToAppend = new ArrayList<>(records.size()); + for (U record : records) { + recordsToAppend.add(new SimpleRecord( + currentTimeMs, + serializer.serializeKey(record), + serializer.serializeValue(record) + )); + } + + // Compute the estimated size of the records. + int estimatedSize = AbstractRecords.estimateSizeInBytes( + currentBatch.builder.magic(), + compression.type(), + recordsToAppend + ); + + // Check if the current batch has enough space. We check is before + // replaying the records in order to avoid having to revert back + // changes if the records do not fit within a batch. + if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + if (currentBatch.builder.numRecords() == 0) { + // If the number of records in the current batch is zero, it means that + // the records are larger than the max batch size. + throw new RecordTooLargeException("Message batch size is " + estimatedSize + + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + currentBatch.maxBatchSize + "."); + } else { + // Otherwise, we write the current batch, allocate a new one and re-verify + // whether the records fit in it. + writeCurrentBatch(); + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + throw new RecordTooLargeException("Message batch size is " + estimatedSize + + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + currentBatch.maxBatchSize + "."); + } + } + } + + // Add the event to the list of pending events associated with the batch. + currentBatch.events.add(event); + + try { + // Apply record to the state machine. + if (replay) { + for (int i = 0; i < records.size(); i++) { + // We compute the offset of the record based on the last written offset. The + // coordinator is the single writer to the underlying partition so we can + // deduce it like this. + coordinator.replay( + currentBatch.nextOffset + i, Review Comment: the naming seems a bit off. nextOffset is initialized as baseOffset and we increment it below which means it's pointing to LEO - 1 right? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -626,89 +833,113 @@ private void append( // If the records are empty, it was a read operation after all. In this case, // the response can be returned directly iff there are no pending write operations; // otherwise, the read needs to wait on the last write operation to be completed. - OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); - if (pendingOffset.isPresent()) { - deferredEventQueue.add(pendingOffset.getAsLong(), event); + if (currentBatch != null) { + currentBatch.events.add(event); } else { - event.complete(null); + OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); + if (pendingOffset.isPresent()) { + deferredEventQueue.add(pendingOffset.getAsLong(), event); + } else { + event.complete(null); + } } } else { // If the records are not empty, first, they are applied to the state machine, - // second, then are written to the partition/log, and finally, the response - // is put into the deferred event queue. - long prevLastWrittenOffset = coordinator.lastWrittenOffset(); - LogConfig logConfig = partitionWriter.config(tp); - byte magic = logConfig.recordVersion().value; - int maxBatchSize = logConfig.maxMessageSize(); + // second, then are appended to the opened batch. Review Comment: nit: they are appended ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -592,14 +674,139 @@ private void unload() { } timer.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); + failCurrentBatch(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { coordinator.onUnloaded(); } coordinator = null; } /** - * Appends records the the log and replay them to the state machine. + * Frees the current batch. + */ + private void freeCurrentBatch() { + // Cancel the linger timeout. + currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); + + // Release the buffer. + bufferSupplier.release(currentBatch.buffer); + + currentBatch = null; + } + + /** + * Writes the current (or pending) batch to the log. When the batch is written + * locally, a new snapshot is created in the snapshot registry and the events + * associated with the batch are added to the deferred event queue. + */ + private void writeCurrentBatch() { Review Comment: nit: flushCurrentBatch makes more sense to me as we're dealing with batches ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -592,14 +674,139 @@ private void unload() { } timer.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); + failCurrentBatch(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { coordinator.onUnloaded(); } coordinator = null; } /** - * Appends records the the log and replay them to the state machine. + * Frees the current batch. + */ + private void freeCurrentBatch() { + // Cancel the linger timeout. + currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); + + // Release the buffer. + bufferSupplier.release(currentBatch.buffer); + + currentBatch = null; + } + + /** + * Writes the current (or pending) batch to the log. When the batch is written + * locally, a new snapshot is created in the snapshot registry and the events + * associated with the batch are added to the deferred event queue. + */ + private void writeCurrentBatch() { + if (currentBatch != null) { + try { + // Write the records to the log and update the last written offset. + long offset = partitionWriter.append( + tp, + currentBatch.verificationGuard, + currentBatch.builder.build() + ); + coordinator.updateLastWrittenOffset(offset); + + // Add all the pending deferred events to the deferred event queue. + for (DeferredEvent event : currentBatch.events) { + deferredEventQueue.add(offset, event); + } + + // Free up the current batch. + freeCurrentBatch(); + } catch (Throwable t) { + failCurrentBatch(t); + } + } + } + + /** + * Writes the current batch if it is transactional or if it has past the append linger time. + */ + private void maybeWriteCurrentBatch(long currentTimeMs) { + if (currentBatch != null) { + if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) { + writeCurrentBatch(); + } + } + } + + /** + * Fails the current batch, reverts to the snapshot to the base/start offset of the + * batch, fails all the associated events. + */ + private void failCurrentBatch(Throwable t) { Review Comment: i think we need https://github.com/apache/kafka/pull/16060 or we won't have visibility into the error ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -626,89 +833,113 @@ private void append( // If the records are empty, it was a read operation after all. In this case, // the response can be returned directly iff there are no pending write operations; // otherwise, the read needs to wait on the last write operation to be completed. - OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); - if (pendingOffset.isPresent()) { - deferredEventQueue.add(pendingOffset.getAsLong(), event); + if (currentBatch != null) { + currentBatch.events.add(event); } else { - event.complete(null); + OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); + if (pendingOffset.isPresent()) { + deferredEventQueue.add(pendingOffset.getAsLong(), event); + } else { + event.complete(null); + } } } else { // If the records are not empty, first, they are applied to the state machine, - // second, then are written to the partition/log, and finally, the response - // is put into the deferred event queue. - long prevLastWrittenOffset = coordinator.lastWrittenOffset(); - LogConfig logConfig = partitionWriter.config(tp); - byte magic = logConfig.recordVersion().value; - int maxBatchSize = logConfig.maxMessageSize(); + // second, then are appended to the opened batch. long currentTimeMs = time.milliseconds(); - ByteBuffer buffer = bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize)); - try { - MemoryRecordsBuilder builder = new MemoryRecordsBuilder( - buffer, - magic, - compression, - TimestampType.CREATE_TIME, - 0L, + // If the current write operation is transactional, the current batch + // is written before proceeding with it. + if (producerId != RecordBatch.NO_PRODUCER_ID) { + writeCurrentBatch(); + } + + // Allocate a new batch if none exists. + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + + // Prepare the records. + List<SimpleRecord> recordsToAppend = new ArrayList<>(records.size()); + for (U record : records) { + recordsToAppend.add(new SimpleRecord( currentTimeMs, - producerId, - producerEpoch, - 0, - producerId != RecordBatch.NO_PRODUCER_ID, - false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - maxBatchSize - ); + serializer.serializeKey(record), + serializer.serializeValue(record) + )); + } + + // Compute the estimated size of the records. + int estimatedSize = AbstractRecords.estimateSizeInBytes( + currentBatch.builder.magic(), + compression.type(), + recordsToAppend + ); + + // Check if the current batch has enough space. We check is before + // replaying the records in order to avoid having to revert back + // changes if the records do not fit within a batch. + if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + if (currentBatch.builder.numRecords() == 0) { + // If the number of records in the current batch is zero, it means that + // the records are larger than the max batch size. + throw new RecordTooLargeException("Message batch size is " + estimatedSize + + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + currentBatch.maxBatchSize + "."); + } else { + // Otherwise, we write the current batch, allocate a new one and re-verify + // whether the records fit in it. + writeCurrentBatch(); + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + throw new RecordTooLargeException("Message batch size is " + estimatedSize + + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + currentBatch.maxBatchSize + "."); + } + } + } - // Apply the records to the state machine and add them to the batch. - for (int i = 0; i < records.size(); i++) { - U record = records.get(i); + // Add the event to the list of pending events associated with the batch. + currentBatch.events.add(event); - if (replay) { + try { + // Apply record to the state machine. + if (replay) { + for (int i = 0; i < records.size(); i++) { // We compute the offset of the record based on the last written offset. The // coordinator is the single writer to the underlying partition so we can // deduce it like this. coordinator.replay( - prevLastWrittenOffset + i, + currentBatch.nextOffset + i, producerId, producerEpoch, - record + records.get(i) ); } - - byte[] keyBytes = serializer.serializeKey(record); - byte[] valBytes = serializer.serializeValue(record); - - if (builder.hasRoomFor(currentTimeMs, keyBytes, valBytes, EMPTY_HEADERS)) { - builder.append( - currentTimeMs, - keyBytes, - valBytes, - EMPTY_HEADERS - ); - } else { - throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() + - " bytes in append to partition " + tp + " which exceeds the maximum " + - "configured size of " + maxBatchSize + "."); - } } - // Write the records to the log and update the last written - // offset. - long offset = partitionWriter.append( - tp, - verificationGuard, - builder.build() - ); - coordinator.updateLastWrittenOffset(offset); + // Append to the batch. + for (SimpleRecord record : recordsToAppend) { + currentBatch.builder.append(record); + currentBatch.nextOffset++; + } - // Add the operation to the deferred queue. - deferredEventQueue.add(offset, event); + // Write the current batch if it is transactional or if the linger timeout + // has expired. + maybeWriteCurrentBatch(currentTimeMs); } catch (Throwable t) { - coordinator.revertLastWrittenOffset(prevLastWrittenOffset); - event.complete(t); - } finally { - bufferSupplier.release(buffer); + // If an exception is thrown, we fail the entire batch. Exceptions should be + // really exceptional in this code patch and they would usually be the results Review Comment: nit: in this code path, right? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -626,89 +833,113 @@ private void append( // If the records are empty, it was a read operation after all. In this case, // the response can be returned directly iff there are no pending write operations; // otherwise, the read needs to wait on the last write operation to be completed. - OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); - if (pendingOffset.isPresent()) { - deferredEventQueue.add(pendingOffset.getAsLong(), event); + if (currentBatch != null) { + currentBatch.events.add(event); } else { - event.complete(null); + OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); + if (pendingOffset.isPresent()) { + deferredEventQueue.add(pendingOffset.getAsLong(), event); + } else { + event.complete(null); + } } } else { // If the records are not empty, first, they are applied to the state machine, - // second, then are written to the partition/log, and finally, the response - // is put into the deferred event queue. - long prevLastWrittenOffset = coordinator.lastWrittenOffset(); - LogConfig logConfig = partitionWriter.config(tp); - byte magic = logConfig.recordVersion().value; - int maxBatchSize = logConfig.maxMessageSize(); + // second, then are appended to the opened batch. long currentTimeMs = time.milliseconds(); - ByteBuffer buffer = bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize)); - try { - MemoryRecordsBuilder builder = new MemoryRecordsBuilder( - buffer, - magic, - compression, - TimestampType.CREATE_TIME, - 0L, + // If the current write operation is transactional, the current batch + // is written before proceeding with it. Review Comment: To confirm, we want to flush transactional batches immediately to ensure correctness; records from a transactional write may end up in different batches. is this correct? also, when would we hit this case if we do a flush if transactional in L937? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -583,11 +674,330 @@ private void unload() { } timer.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); + failCurrentBatch(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { coordinator.onUnloaded(); } coordinator = null; } + + /** + * Frees the current batch. + */ + private void freeCurrentBatch() { + // Cancel the linger timeout. + currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); + + // Release the buffer. + bufferSupplier.release(currentBatch.buffer); + + currentBatch = null; + } + + /** + * Writes the current (or pending) batch to the log. When the batch is written + * locally, a new snapshot is created in the snapshot registry and the events + * associated with the batch are added to the deferred event queue. + */ + private void writeCurrentBatch() { + if (currentBatch != null) { + try { + // Write the records to the log and update the last written offset. + long offset = partitionWriter.append( + tp, + currentBatch.verificationGuard, + currentBatch.builder.build() + ); + coordinator.updateLastWrittenOffset(offset); + + // Add all the pending deferred events to the deferred event queue. + for (DeferredEvent event : currentBatch.events) { + deferredEventQueue.add(offset, event); + } + + // Free up the current batch. + freeCurrentBatch(); + } catch (Throwable t) { + failCurrentBatch(t); + } + } + } + + /** + * Writes the current batch if it is transactional or if it has past the append linger time. + */ + private void maybeWriteCurrentBatch(long currentTimeMs) { + if (currentBatch != null) { + if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) { + writeCurrentBatch(); + } + } + } + + /** + * Fails the current batch, reverts to the snapshot to the base/start offset of the + * batch, fails all the associated events. + */ + private void failCurrentBatch(Throwable t) { + if (currentBatch != null) { + coordinator.revertLastWrittenOffset(currentBatch.baseOffset); + for (DeferredEvent event : currentBatch.events) { + event.complete(t); + } + freeCurrentBatch(); + } + } + + /** + * Allocates a new batch if none already exists. + */ + private void maybeAllocateNewBatch( + long producerId, + short producerEpoch, + VerificationGuard verificationGuard, + long currentTimeMs + ) { + if (currentBatch == null) { + LogConfig logConfig = partitionWriter.config(tp); + byte magic = logConfig.recordVersion().value; + int maxBatchSize = logConfig.maxMessageSize(); + long prevLastWrittenOffset = coordinator.lastWrittenOffset(); + ByteBuffer buffer = bufferSupplier.get(maxBatchSize); Review Comment: i'm wondering if it's because we're batching multiple events now. i think we'll need to assess the common case - whether the linger time is met vs. batch is filled first, and the common batch size. given that this varies heavily depending on the workload, i still think it's slightly better to start small and grow as needed ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -583,11 +674,330 @@ private void unload() { } timer.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); + failCurrentBatch(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { coordinator.onUnloaded(); } coordinator = null; } + + /** + * Frees the current batch. + */ + private void freeCurrentBatch() { + // Cancel the linger timeout. + currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); + + // Release the buffer. + bufferSupplier.release(currentBatch.buffer); + + currentBatch = null; + } + + /** + * Writes the current (or pending) batch to the log. When the batch is written + * locally, a new snapshot is created in the snapshot registry and the events + * associated with the batch are added to the deferred event queue. + */ + private void writeCurrentBatch() { + if (currentBatch != null) { + try { + // Write the records to the log and update the last written offset. + long offset = partitionWriter.append( + tp, + currentBatch.verificationGuard, + currentBatch.builder.build() + ); + coordinator.updateLastWrittenOffset(offset); + + // Add all the pending deferred events to the deferred event queue. + for (DeferredEvent event : currentBatch.events) { + deferredEventQueue.add(offset, event); + } + + // Free up the current batch. + freeCurrentBatch(); + } catch (Throwable t) { + failCurrentBatch(t); + } + } + } + + /** + * Writes the current batch if it is transactional or if it has past the append linger time. + */ + private void maybeWriteCurrentBatch(long currentTimeMs) { + if (currentBatch != null) { + if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) { + writeCurrentBatch(); + } + } + } + + /** + * Fails the current batch, reverts to the snapshot to the base/start offset of the + * batch, fails all the associated events. + */ + private void failCurrentBatch(Throwable t) { + if (currentBatch != null) { + coordinator.revertLastWrittenOffset(currentBatch.baseOffset); + for (DeferredEvent event : currentBatch.events) { + event.complete(t); + } + freeCurrentBatch(); + } + } + + /** + * Allocates a new batch if none already exists. + */ + private void maybeAllocateNewBatch( + long producerId, + short producerEpoch, + VerificationGuard verificationGuard, + long currentTimeMs + ) { + if (currentBatch == null) { + LogConfig logConfig = partitionWriter.config(tp); + byte magic = logConfig.recordVersion().value; + int maxBatchSize = logConfig.maxMessageSize(); + long prevLastWrittenOffset = coordinator.lastWrittenOffset(); + ByteBuffer buffer = bufferSupplier.get(maxBatchSize); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder( + buffer, + magic, + compression, + TimestampType.CREATE_TIME, + 0L, + currentTimeMs, + producerId, + producerEpoch, + 0, + producerId != RecordBatch.NO_PRODUCER_ID, + false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + maxBatchSize + ); + + Optional<TimerTask> lingerTimeoutTask = Optional.empty(); + if (appendLingerMs > 0) { + lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs) { + @Override + public void run() { + scheduleInternalOperation("FlushBatch", tp, () -> { Review Comment: not sure i fully understand the question but only 1 thread would be mutating the state of a batch and its associated timer task at a given time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org