jeffkbkim commented on code in PR #17619:
URL: https://github.com/apache/kafka/pull/17619#discussion_r1827950561


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2476,20 +2479,21 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
         }
 
-        replicaManager.appendRecords(
-          timeout = config.requestTimeoutMs.toLong,
-          requiredAcks = -1,
-          internalTopicsAllowed = true,
-          origin = AppendOrigin.COORDINATOR,
-          entriesPerPartition = controlRecords,
-          requestLocal = requestLocal,
-          responseCallback = errors => {
-            errors.foreachEntry { (tp, partitionResponse) =>
-              markerResults.put(tp, partitionResponse.error)
+        if (!controlRecords.isEmpty) {

Review Comment:
   nit: controlRecords.nonEmpty?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2431,8 +2431,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
-        def maybeComplete(): Unit = {
-          if (partitionsWithCompatibleMessageFormat.size == 
markerResults.size) {
+        val numPartitions = new 
AtomicInteger(partitionsWithCompatibleMessageFormat.size)

Review Comment:
   To understand the first case:
   
   we call replicaManager#appendRecords (with empty records) while 
simultaneously appending via the new coordinator. This incorrectly decrements 
the number of appends counter and thus sends the premature response.
   
   The similarity in both cases is that we prematurely decrement this 
numAppends counter.
   
   Is my understanding correct?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -3068,6 +3068,44 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
   }
 
+  @Test
+  def WriteTxnMarkersShouldAllBeIncludedInTheResponse(): Unit = {
+    // This test verifies the response will not be sent prematurely because of 
calling replicaManager append
+    // with no records.
+    val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
+    val writeTxnMarkersRequest = new 
WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
+      asList(
+        new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, 
asList(topicPartition)),
+        new TxnMarkerEntry(2, 1.toShort, 0, TransactionResult.COMMIT, 
asList(topicPartition)),
+      )).build()
+    val request = buildRequest(writeTxnMarkersRequest)
+    val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = 
ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+
+    when(replicaManager.getMagic(any()))
+      .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+    when(groupCoordinator.isNewGroupCoordinator())

Review Comment:
   nit: we can remove the call parenthesis



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to