dajac commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662545695
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -936,62 +941,90 @@ private void append( )); } - // Compute the estimated size of the records. - int estimatedSize = AbstractRecords.estimateSizeInBytes( - currentBatch.builder.magic(), - compression.type(), - recordsToAppend - ); + if (isAtomic) { + // Compute the estimated size of the records. + int estimatedSize = AbstractRecords.estimateSizeInBytes( + currentBatch.builder.magic(), + compression.type(), + recordsToAppend + ); - // Check if the current batch has enough space. We check is before - // replaying the records in order to avoid having to revert back - // changes if the records do not fit within a batch. - if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { - throw new RecordTooLargeException("Message batch size is " + estimatedSize + - " bytes in append to partition " + tp + " which exceeds the maximum " + - "configured size of " + currentBatch.maxBatchSize + "."); - } + // Check if the current batch has enough space. We check is before + // replaying the records in order to avoid having to revert back + // changes if the records do not fit within a batch. + if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { + throw new RecordTooLargeException("Message batch size is " + estimatedSize + + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + currentBatch.maxBatchSize + "."); + } - if (!currentBatch.builder.hasRoomFor(estimatedSize)) { - // Otherwise, we write the current batch, allocate a new one and re-verify - // whether the records fit in it. - // If flushing fails, we don't catch the exception in order to let - // the caller fail the current operation. - flushCurrentBatch(); - maybeAllocateNewBatch( - producerId, - producerEpoch, - verificationGuard, - currentTimeMs - ); + if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + // Otherwise, we write the current batch, allocate a new one and re-verify + // whether the records fit in it. + // If flushing fails, we don't catch the exception in order to let + // the caller fail the current operation. + flushCurrentBatch(); + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + } } - // Add the event to the list of pending events associated with the batch. - currentBatch.deferredEvents.add(event); - try { - // Apply record to the state machine. - if (replay) { - for (int i = 0; i < records.size(); i++) { - // We compute the offset of the record based on the last written offset. The - // coordinator is the single writer to the underlying partition so we can - // deduce it like this. + for (int i = 0; i < records.size(); i++) { + U recordToReplay = records.get(i); + SimpleRecord recordToAppend = recordsToAppend.get(i); + + if (replay) { coordinator.replay( - currentBatch.nextOffset + i, + currentBatch.nextOffset, producerId, producerEpoch, - records.get(i) + recordToReplay ); } - } - // Append to the batch. - for (SimpleRecord record : recordsToAppend) { - currentBatch.builder.append(record); + if (!isAtomic) { + boolean hasRoomFor = currentBatch.builder.hasRoomFor( + recordToAppend.timestamp(), + recordToAppend.key(), + recordToAppend.value(), + recordToAppend.headers() + ); + + if (!hasRoomFor) { + if (currentBatch.builder.numRecords() == 0) { + throw new RecordTooLargeException("Record " + recordToAppend + " in append to partition " + tp + + " exceeds exceeds the maximum configured size of " + currentBatch.maxBatchSize + "."); + } + + // If the current batch is not empty, we flush it and allocate a new batch. + flushCurrentBatch(); + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + } + } + + currentBatch.builder.append(recordToAppend); currentBatch.nextOffset++; } + + // Add the event to the list of pending events associated with the batch. + currentBatch.deferredEvents.add(event); } catch (Throwable t) { log.error("Replaying records to {} failed due to: {}.", tp, t.getMessage()); + + // Add the event to the list of pending events associated with the last + // batch in order to fail it too. + currentBatch.deferredEvents.add(event); Review Comment: Partially. The main reason was that the event must be attached to the last batch (vs the first one) in order to only complete the write when all records are committed. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -936,62 +941,90 @@ private void append( )); } - // Compute the estimated size of the records. - int estimatedSize = AbstractRecords.estimateSizeInBytes( - currentBatch.builder.magic(), - compression.type(), - recordsToAppend - ); + if (isAtomic) { + // Compute the estimated size of the records. + int estimatedSize = AbstractRecords.estimateSizeInBytes( + currentBatch.builder.magic(), + compression.type(), + recordsToAppend + ); - // Check if the current batch has enough space. We check is before - // replaying the records in order to avoid having to revert back - // changes if the records do not fit within a batch. - if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { - throw new RecordTooLargeException("Message batch size is " + estimatedSize + - " bytes in append to partition " + tp + " which exceeds the maximum " + - "configured size of " + currentBatch.maxBatchSize + "."); - } + // Check if the current batch has enough space. We check is before + // replaying the records in order to avoid having to revert back + // changes if the records do not fit within a batch. + if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { + throw new RecordTooLargeException("Message batch size is " + estimatedSize + + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + currentBatch.maxBatchSize + "."); + } - if (!currentBatch.builder.hasRoomFor(estimatedSize)) { - // Otherwise, we write the current batch, allocate a new one and re-verify - // whether the records fit in it. - // If flushing fails, we don't catch the exception in order to let - // the caller fail the current operation. - flushCurrentBatch(); - maybeAllocateNewBatch( - producerId, - producerEpoch, - verificationGuard, - currentTimeMs - ); + if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + // Otherwise, we write the current batch, allocate a new one and re-verify + // whether the records fit in it. + // If flushing fails, we don't catch the exception in order to let + // the caller fail the current operation. + flushCurrentBatch(); + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + } } - // Add the event to the list of pending events associated with the batch. - currentBatch.deferredEvents.add(event); - try { - // Apply record to the state machine. - if (replay) { - for (int i = 0; i < records.size(); i++) { - // We compute the offset of the record based on the last written offset. The - // coordinator is the single writer to the underlying partition so we can - // deduce it like this. + for (int i = 0; i < records.size(); i++) { + U recordToReplay = records.get(i); + SimpleRecord recordToAppend = recordsToAppend.get(i); + + if (replay) { coordinator.replay( - currentBatch.nextOffset + i, + currentBatch.nextOffset, producerId, producerEpoch, - records.get(i) + recordToReplay ); } - } - // Append to the batch. - for (SimpleRecord record : recordsToAppend) { - currentBatch.builder.append(record); + if (!isAtomic) { + boolean hasRoomFor = currentBatch.builder.hasRoomFor( + recordToAppend.timestamp(), + recordToAppend.key(), + recordToAppend.value(), + recordToAppend.headers() + ); + + if (!hasRoomFor) { + if (currentBatch.builder.numRecords() == 0) { + throw new RecordTooLargeException("Record " + recordToAppend + " in append to partition " + tp + + " exceeds exceeds the maximum configured size of " + currentBatch.maxBatchSize + "."); + } Review Comment: Oh, I forgot that `hasRoomFor` always accept one record. In this case, it will when the batch is written. I updated the code to reflect this. This is also covered by the test that I added. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -936,62 +941,90 @@ private void append( )); } - // Compute the estimated size of the records. - int estimatedSize = AbstractRecords.estimateSizeInBytes( - currentBatch.builder.magic(), - compression.type(), - recordsToAppend - ); + if (isAtomic) { + // Compute the estimated size of the records. + int estimatedSize = AbstractRecords.estimateSizeInBytes( + currentBatch.builder.magic(), + compression.type(), + recordsToAppend + ); - // Check if the current batch has enough space. We check is before - // replaying the records in order to avoid having to revert back - // changes if the records do not fit within a batch. - if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { - throw new RecordTooLargeException("Message batch size is " + estimatedSize + - " bytes in append to partition " + tp + " which exceeds the maximum " + - "configured size of " + currentBatch.maxBatchSize + "."); - } + // Check if the current batch has enough space. We check is before + // replaying the records in order to avoid having to revert back + // changes if the records do not fit within a batch. + if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { + throw new RecordTooLargeException("Message batch size is " + estimatedSize + + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + currentBatch.maxBatchSize + "."); + } - if (!currentBatch.builder.hasRoomFor(estimatedSize)) { - // Otherwise, we write the current batch, allocate a new one and re-verify - // whether the records fit in it. - // If flushing fails, we don't catch the exception in order to let - // the caller fail the current operation. - flushCurrentBatch(); - maybeAllocateNewBatch( - producerId, - producerEpoch, - verificationGuard, - currentTimeMs - ); + if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + // Otherwise, we write the current batch, allocate a new one and re-verify + // whether the records fit in it. + // If flushing fails, we don't catch the exception in order to let + // the caller fail the current operation. + flushCurrentBatch(); + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + } } - // Add the event to the list of pending events associated with the batch. - currentBatch.deferredEvents.add(event); - try { - // Apply record to the state machine. - if (replay) { - for (int i = 0; i < records.size(); i++) { - // We compute the offset of the record based on the last written offset. The - // coordinator is the single writer to the underlying partition so we can - // deduce it like this. + for (int i = 0; i < records.size(); i++) { + U recordToReplay = records.get(i); + SimpleRecord recordToAppend = recordsToAppend.get(i); + + if (replay) { coordinator.replay( - currentBatch.nextOffset + i, + currentBatch.nextOffset, producerId, producerEpoch, - records.get(i) + recordToReplay ); } - } - // Append to the batch. - for (SimpleRecord record : recordsToAppend) { - currentBatch.builder.append(record); + if (!isAtomic) { + boolean hasRoomFor = currentBatch.builder.hasRoomFor( + recordToAppend.timestamp(), + recordToAppend.key(), + recordToAppend.value(), + recordToAppend.headers() + ); + + if (!hasRoomFor) { + if (currentBatch.builder.numRecords() == 0) { + throw new RecordTooLargeException("Record " + recordToAppend + " in append to partition " + tp + + " exceeds exceeds the maximum configured size of " + currentBatch.maxBatchSize + "."); + } Review Comment: > I think I'm missing though what happens in the case we did have some records already in the builder but the record is too big for the batch. The new test does exactly this. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -912,7 +915,9 @@ private void append( // If the current write operation is transactional, the current batch // is written before proceeding with it. - if (producerId != RecordBatch.NO_PRODUCER_ID) { + boolean isTransactional = producerId != RecordBatch.NO_PRODUCER_ID; Review Comment: Nope. Reverted. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -3839,6 +3839,114 @@ public void close() {} assertEquals("response1", write.get(5, TimeUnit.SECONDS)); } + @Test + public void testScheduleWriteOperationWithNonAtomicRecords() throws ExecutionException, InterruptedException, TimeoutException { Review Comment: I think that the two new tests cover the non-atomic write. All the existing one must be kept as they are. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org