chia7712 commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492834735



##########
File path: core/src/main/scala/kafka/log/LogValidator.scala
##########
@@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging {
 
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-    for (batch <- records.batches.asScala) {
+    records.batches.forEach { batch =>
       validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, 
brokerTopicStats)
 
       val recordErrors = new ArrayBuffer[ApiRecordError](0)
-      for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+      var batchIndex = 0
+      batch.forEach { record =>
         validateRecord(batch, topicPartition, record, batchIndex, now, 
timestampType,
           timestampDiffMaxMs, compactedTopic, 
brokerTopicStats).foreach(recordError => recordErrors += recordError)
         // we fail the batch if any record fails, so we stop appending if any 
record fails
-        if (recordErrors.isEmpty)
-          builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+        if (recordErrors.isEmpty) 
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+        batchIndex += 1

Review comment:
       > Have we benchmarked this path?
   
   I didn't benchmark this path and you are right that optimization is small as 
we have to convert data in this path. I will revert it to make small patch. 

##########
File path: core/src/main/scala/kafka/log/LogValidator.scala
##########
@@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging {
 
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-    for (batch <- records.batches.asScala) {
+    records.batches.forEach { batch =>
       validateBatch(topicPartition, firstBatch, batch, origin, magic, 
brokerTopicStats)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
       var offsetOfMaxBatchTimestamp = -1L
 
       val recordErrors = new ArrayBuffer[ApiRecordError](0)
-      for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+      var batchIndex = 0

Review comment:
       copy that

##########
File path: core/src/main/scala/kafka/log/LogValidator.scala
##########
@@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging {
 
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-    records.batches.forEach { batch =>
+    for (batch <- records.batches.asScala) {

Review comment:
       ok~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to