dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662545695


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -936,62 +941,90 @@ private void append(
                     ));
                 }
 
-                // Compute the estimated size of the records.
-                int estimatedSize = AbstractRecords.estimateSizeInBytes(
-                    currentBatch.builder.magic(),
-                    compression.type(),
-                    recordsToAppend
-                );
+                if (isAtomic) {
+                    // Compute the estimated size of the records.
+                    int estimatedSize = AbstractRecords.estimateSizeInBytes(
+                        currentBatch.builder.magic(),
+                        compression.type(),
+                        recordsToAppend
+                    );
 
-                // Check if the current batch has enough space. We check is 
before
-                // replaying the records in order to avoid having to revert 
back
-                // changes if the records do not fit within a batch.
-                if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
-                    throw new RecordTooLargeException("Message batch size is " 
+ estimatedSize +
-                        " bytes in append to partition " + tp + " which 
exceeds the maximum " +
-                        "configured size of " + currentBatch.maxBatchSize + 
".");
-                }
+                    // Check if the current batch has enough space. We check 
is before
+                    // replaying the records in order to avoid having to 
revert back
+                    // changes if the records do not fit within a batch.
+                    if (estimatedSize > 
currentBatch.builder.maxAllowedBytes()) {
+                        throw new RecordTooLargeException("Message batch size 
is " + estimatedSize +
+                            " bytes in append to partition " + tp + " which 
exceeds the maximum " +
+                            "configured size of " + currentBatch.maxBatchSize 
+ ".");
+                    }
 
-                if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
-                    // Otherwise, we write the current batch, allocate a new 
one and re-verify
-                    // whether the records fit in it.
-                    // If flushing fails, we don't catch the exception in 
order to let
-                    // the caller fail the current operation.
-                    flushCurrentBatch();
-                    maybeAllocateNewBatch(
-                        producerId,
-                        producerEpoch,
-                        verificationGuard,
-                        currentTimeMs
-                    );
+                    if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+                        // Otherwise, we write the current batch, allocate a 
new one and re-verify
+                        // whether the records fit in it.
+                        // If flushing fails, we don't catch the exception in 
order to let
+                        // the caller fail the current operation.
+                        flushCurrentBatch();
+                        maybeAllocateNewBatch(
+                            producerId,
+                            producerEpoch,
+                            verificationGuard,
+                            currentTimeMs
+                        );
+                    }
                 }
 
-                // Add the event to the list of pending events associated with 
the batch.
-                currentBatch.deferredEvents.add(event);
-
                 try {
-                    // Apply record to the state machine.
-                    if (replay) {
-                        for (int i = 0; i < records.size(); i++) {
-                            // We compute the offset of the record based on 
the last written offset. The
-                            // coordinator is the single writer to the 
underlying partition so we can
-                            // deduce it like this.
+                    for (int i = 0; i < records.size(); i++) {
+                        U recordToReplay = records.get(i);
+                        SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+                        if (replay) {
                             coordinator.replay(
-                                currentBatch.nextOffset + i,
+                                currentBatch.nextOffset,
                                 producerId,
                                 producerEpoch,
-                                records.get(i)
+                                recordToReplay
                             );
                         }
-                    }
 
-                    // Append to the batch.
-                    for (SimpleRecord record : recordsToAppend) {
-                        currentBatch.builder.append(record);
+                        if (!isAtomic) {
+                            boolean hasRoomFor = 
currentBatch.builder.hasRoomFor(
+                                recordToAppend.timestamp(),
+                                recordToAppend.key(),
+                                recordToAppend.value(),
+                                recordToAppend.headers()
+                            );
+
+                            if (!hasRoomFor) {
+                                if (currentBatch.builder.numRecords() == 0) {
+                                    throw new RecordTooLargeException("Record 
" + recordToAppend + " in append to partition " + tp +
+                                        " exceeds exceeds the maximum 
configured size of " + currentBatch.maxBatchSize + ".");
+                                }
+
+                                // If the current batch is not empty, we flush 
it and allocate a new batch.
+                                flushCurrentBatch();
+                                maybeAllocateNewBatch(
+                                    producerId,
+                                    producerEpoch,
+                                    verificationGuard,
+                                    currentTimeMs
+                                );
+                            }
+                        }
+
+                        currentBatch.builder.append(recordToAppend);
                         currentBatch.nextOffset++;
                     }
+
+                    // Add the event to the list of pending events associated 
with the batch.
+                    currentBatch.deferredEvents.add(event);
                 } catch (Throwable t) {
                     log.error("Replaying records to {} failed due to: {}.", 
tp, t.getMessage());
+
+                    // Add the event to the list of pending events associated 
with the last
+                    // batch in order to fail it too.
+                    currentBatch.deferredEvents.add(event);

Review Comment:
   Partially. The main reason was that the event must be attached to the last 
batch (vs the first one) in order to only complete the write when all records 
are committed.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -936,62 +941,90 @@ private void append(
                     ));
                 }
 
-                // Compute the estimated size of the records.
-                int estimatedSize = AbstractRecords.estimateSizeInBytes(
-                    currentBatch.builder.magic(),
-                    compression.type(),
-                    recordsToAppend
-                );
+                if (isAtomic) {
+                    // Compute the estimated size of the records.
+                    int estimatedSize = AbstractRecords.estimateSizeInBytes(
+                        currentBatch.builder.magic(),
+                        compression.type(),
+                        recordsToAppend
+                    );
 
-                // Check if the current batch has enough space. We check is 
before
-                // replaying the records in order to avoid having to revert 
back
-                // changes if the records do not fit within a batch.
-                if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
-                    throw new RecordTooLargeException("Message batch size is " 
+ estimatedSize +
-                        " bytes in append to partition " + tp + " which 
exceeds the maximum " +
-                        "configured size of " + currentBatch.maxBatchSize + 
".");
-                }
+                    // Check if the current batch has enough space. We check 
is before
+                    // replaying the records in order to avoid having to 
revert back
+                    // changes if the records do not fit within a batch.
+                    if (estimatedSize > 
currentBatch.builder.maxAllowedBytes()) {
+                        throw new RecordTooLargeException("Message batch size 
is " + estimatedSize +
+                            " bytes in append to partition " + tp + " which 
exceeds the maximum " +
+                            "configured size of " + currentBatch.maxBatchSize 
+ ".");
+                    }
 
-                if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
-                    // Otherwise, we write the current batch, allocate a new 
one and re-verify
-                    // whether the records fit in it.
-                    // If flushing fails, we don't catch the exception in 
order to let
-                    // the caller fail the current operation.
-                    flushCurrentBatch();
-                    maybeAllocateNewBatch(
-                        producerId,
-                        producerEpoch,
-                        verificationGuard,
-                        currentTimeMs
-                    );
+                    if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+                        // Otherwise, we write the current batch, allocate a 
new one and re-verify
+                        // whether the records fit in it.
+                        // If flushing fails, we don't catch the exception in 
order to let
+                        // the caller fail the current operation.
+                        flushCurrentBatch();
+                        maybeAllocateNewBatch(
+                            producerId,
+                            producerEpoch,
+                            verificationGuard,
+                            currentTimeMs
+                        );
+                    }
                 }
 
-                // Add the event to the list of pending events associated with 
the batch.
-                currentBatch.deferredEvents.add(event);
-
                 try {
-                    // Apply record to the state machine.
-                    if (replay) {
-                        for (int i = 0; i < records.size(); i++) {
-                            // We compute the offset of the record based on 
the last written offset. The
-                            // coordinator is the single writer to the 
underlying partition so we can
-                            // deduce it like this.
+                    for (int i = 0; i < records.size(); i++) {
+                        U recordToReplay = records.get(i);
+                        SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+                        if (replay) {
                             coordinator.replay(
-                                currentBatch.nextOffset + i,
+                                currentBatch.nextOffset,
                                 producerId,
                                 producerEpoch,
-                                records.get(i)
+                                recordToReplay
                             );
                         }
-                    }
 
-                    // Append to the batch.
-                    for (SimpleRecord record : recordsToAppend) {
-                        currentBatch.builder.append(record);
+                        if (!isAtomic) {
+                            boolean hasRoomFor = 
currentBatch.builder.hasRoomFor(
+                                recordToAppend.timestamp(),
+                                recordToAppend.key(),
+                                recordToAppend.value(),
+                                recordToAppend.headers()
+                            );
+
+                            if (!hasRoomFor) {
+                                if (currentBatch.builder.numRecords() == 0) {
+                                    throw new RecordTooLargeException("Record 
" + recordToAppend + " in append to partition " + tp +
+                                        " exceeds exceeds the maximum 
configured size of " + currentBatch.maxBatchSize + ".");
+                                }

Review Comment:
   Oh, I forgot that `hasRoomFor` always accept one record. In this case, it 
will when the batch is written. I updated the code to reflect this. This is 
also covered by the test that I added.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -936,62 +941,90 @@ private void append(
                     ));
                 }
 
-                // Compute the estimated size of the records.
-                int estimatedSize = AbstractRecords.estimateSizeInBytes(
-                    currentBatch.builder.magic(),
-                    compression.type(),
-                    recordsToAppend
-                );
+                if (isAtomic) {
+                    // Compute the estimated size of the records.
+                    int estimatedSize = AbstractRecords.estimateSizeInBytes(
+                        currentBatch.builder.magic(),
+                        compression.type(),
+                        recordsToAppend
+                    );
 
-                // Check if the current batch has enough space. We check is 
before
-                // replaying the records in order to avoid having to revert 
back
-                // changes if the records do not fit within a batch.
-                if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
-                    throw new RecordTooLargeException("Message batch size is " 
+ estimatedSize +
-                        " bytes in append to partition " + tp + " which 
exceeds the maximum " +
-                        "configured size of " + currentBatch.maxBatchSize + 
".");
-                }
+                    // Check if the current batch has enough space. We check 
is before
+                    // replaying the records in order to avoid having to 
revert back
+                    // changes if the records do not fit within a batch.
+                    if (estimatedSize > 
currentBatch.builder.maxAllowedBytes()) {
+                        throw new RecordTooLargeException("Message batch size 
is " + estimatedSize +
+                            " bytes in append to partition " + tp + " which 
exceeds the maximum " +
+                            "configured size of " + currentBatch.maxBatchSize 
+ ".");
+                    }
 
-                if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
-                    // Otherwise, we write the current batch, allocate a new 
one and re-verify
-                    // whether the records fit in it.
-                    // If flushing fails, we don't catch the exception in 
order to let
-                    // the caller fail the current operation.
-                    flushCurrentBatch();
-                    maybeAllocateNewBatch(
-                        producerId,
-                        producerEpoch,
-                        verificationGuard,
-                        currentTimeMs
-                    );
+                    if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+                        // Otherwise, we write the current batch, allocate a 
new one and re-verify
+                        // whether the records fit in it.
+                        // If flushing fails, we don't catch the exception in 
order to let
+                        // the caller fail the current operation.
+                        flushCurrentBatch();
+                        maybeAllocateNewBatch(
+                            producerId,
+                            producerEpoch,
+                            verificationGuard,
+                            currentTimeMs
+                        );
+                    }
                 }
 
-                // Add the event to the list of pending events associated with 
the batch.
-                currentBatch.deferredEvents.add(event);
-
                 try {
-                    // Apply record to the state machine.
-                    if (replay) {
-                        for (int i = 0; i < records.size(); i++) {
-                            // We compute the offset of the record based on 
the last written offset. The
-                            // coordinator is the single writer to the 
underlying partition so we can
-                            // deduce it like this.
+                    for (int i = 0; i < records.size(); i++) {
+                        U recordToReplay = records.get(i);
+                        SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+                        if (replay) {
                             coordinator.replay(
-                                currentBatch.nextOffset + i,
+                                currentBatch.nextOffset,
                                 producerId,
                                 producerEpoch,
-                                records.get(i)
+                                recordToReplay
                             );
                         }
-                    }
 
-                    // Append to the batch.
-                    for (SimpleRecord record : recordsToAppend) {
-                        currentBatch.builder.append(record);
+                        if (!isAtomic) {
+                            boolean hasRoomFor = 
currentBatch.builder.hasRoomFor(
+                                recordToAppend.timestamp(),
+                                recordToAppend.key(),
+                                recordToAppend.value(),
+                                recordToAppend.headers()
+                            );
+
+                            if (!hasRoomFor) {
+                                if (currentBatch.builder.numRecords() == 0) {
+                                    throw new RecordTooLargeException("Record 
" + recordToAppend + " in append to partition " + tp +
+                                        " exceeds exceeds the maximum 
configured size of " + currentBatch.maxBatchSize + ".");
+                                }

Review Comment:
   > I think I'm missing though what happens in the case we did have some 
records already in the builder but the record is too big for the batch.
   
   The new test does exactly this.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -912,7 +915,9 @@ private void append(
 
                 // If the current write operation is transactional, the 
current batch
                 // is written before proceeding with it.
-                if (producerId != RecordBatch.NO_PRODUCER_ID) {
+                boolean isTransactional = producerId != 
RecordBatch.NO_PRODUCER_ID;

Review Comment:
   Nope. Reverted.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3839,6 +3839,114 @@ public void close() {}
         assertEquals("response1", write.get(5, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testScheduleWriteOperationWithNonAtomicRecords() throws 
ExecutionException, InterruptedException, TimeoutException {

Review Comment:
   I think that the two new tests cover the non-atomic write. All the existing 
one must be kept as they are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to