ijuma commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492821097



##########
File path: core/src/main/scala/kafka/log/LogValidator.scala
##########
@@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging {
 
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-    for (batch <- records.batches.asScala) {
+    records.batches.forEach { batch =>
       validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, 
brokerTopicStats)
 
       val recordErrors = new ArrayBuffer[ApiRecordError](0)
-      for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+      var batchIndex = 0
+      batch.forEach { record =>
         validateRecord(batch, topicPartition, record, batchIndex, now, 
timestampType,
           timestampDiffMaxMs, compactedTopic, 
brokerTopicStats).foreach(recordError => recordErrors += recordError)
         // we fail the batch if any record fails, so we stop appending if any 
record fails
-        if (recordErrors.isEmpty)
-          builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+        if (recordErrors.isEmpty) 
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+        batchIndex += 1

Review comment:
       Have we benchmarked this path? It seems doubtful that these micro 
optimizations help given that we are `converting`.

##########
File path: core/src/main/scala/kafka/log/LogValidator.scala
##########
@@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging {
 
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-    for (batch <- records.batches.asScala) {
+    records.batches.forEach { batch =>
       validateBatch(topicPartition, firstBatch, batch, origin, magic, 
brokerTopicStats)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
       var offsetOfMaxBatchTimestamp = -1L
 
       val recordErrors = new ArrayBuffer[ApiRecordError](0)
-      for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+      var batchIndex = 0

Review comment:
       Worth adding a comment here that this is a hot path and we want to avoid 
any unnecessary allocations.

##########
File path: core/src/main/scala/kafka/log/LogValidator.scala
##########
@@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging {
 
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-    records.batches.forEach { batch =>
+    for (batch <- records.batches.asScala) {

Review comment:
       I liked your changes to make the code more concise, I'd keep them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to