ijuma commented on a change in pull request #9206: URL: https://github.com/apache/kafka/pull/9206#discussion_r492821097
########## File path: core/src/main/scala/kafka/log/LogValidator.scala ########## @@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) - for (batch <- records.batches.asScala) { + records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, brokerTopicStats) val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 + batch.forEach { record => validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).foreach(recordError => recordErrors += recordError) // we fail the batch if any record fails, so we stop appending if any record fails - if (recordErrors.isEmpty) - builder.appendWithOffset(offsetCounter.getAndIncrement(), record) + if (recordErrors.isEmpty) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) + batchIndex += 1 Review comment: Have we benchmarked this path? It seems doubtful that these micro optimizations help given that we are `converting`. ########## File path: core/src/main/scala/kafka/log/LogValidator.scala ########## @@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) - for (batch <- records.batches.asScala) { + records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, magic, brokerTopicStats) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 Review comment: Worth adding a comment here that this is a hot path and we want to avoid any unnecessary allocations. ########## File path: core/src/main/scala/kafka/log/LogValidator.scala ########## @@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) - records.batches.forEach { batch => + for (batch <- records.batches.asScala) { Review comment: I liked your changes to make the code more concise, I'd keep them. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org