This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 573152d HOTFIX: Allow multi-batches for old format and no compression (#6871) 573152d is described below commit 573152dfa8087e40ee69b27fb9fb8f45d3825eb6 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Mon Jun 3 16:56:28 2019 -0700 HOTFIX: Allow multi-batches for old format and no compression (#6871) Reviewers: Jason Gustafson <ja...@confluent.io> --- .../kafka/common/record/AbstractRecords.java | 9 +++ core/src/main/scala/kafka/log/LogValidator.scala | 77 ++++++++++++---------- .../scala/unit/kafka/log/LogValidatorTest.scala | 44 +++++++------ 3 files changed, 74 insertions(+), 56 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 1994a71..411e647 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -43,6 +43,15 @@ public abstract class AbstractRecords implements Records { return true; } + public boolean firstBatchHasCompatibleMagic(byte magic) { + Iterator<? extends RecordBatch> iterator = batches().iterator(); + + if (!iterator.hasNext()) + return true; + + return iterator.next().magic() <= magic; + } + /** * Get an iterator over the deep records. * @return An iterator over the records diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 77a4b2b..d10eed8 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -74,21 +74,18 @@ private[kafka] object LogValidator extends Logging { } } - private[kafka] def validateOneBatchRecords(records: MemoryRecords): RecordBatch = { - // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException + private[kafka] def validateOneBatchRecords(records: MemoryRecords) { val batchIterator = records.batches.iterator if (!batchIterator.hasNext) { throw new InvalidRecordException("Compressed outer record has no batches at all") } - val batch = batchIterator.next() + batchIterator.next() if (batchIterator.hasNext) { throw new InvalidRecordException("Compressed outer record has more than one batch") } - - batch } private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = { @@ -197,11 +194,12 @@ private[kafka] object LogValidator extends Logging { var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value - for (batch <- records.batches.asScala) { - if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) { - validateOneBatchRecords(records) - } + if (!records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) { + // for v2 and beyond, we should check there's only one batch. + validateOneBatchRecords(records) + } + for (batch <- records.batches.asScala) { validateBatch(batch, isFromClient, magic) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP @@ -280,38 +278,47 @@ private[kafka] object LogValidator extends Logging { var uncompressedSizeInBytes = 0 - val batch = validateOneBatchRecords(records) + // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException + // One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually + // a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records + if (sourceCodec != NoCompressionCodec || !records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) { + validateOneBatchRecords(records) + } + + val batches = records.batches.asScala - validateBatch(batch, isFromClient, toMagic) - uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType()) + for (batch <- batches) { + validateBatch(batch, isFromClient, toMagic) + uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType()) - // Do not compress control records unless they are written compressed - if (sourceCodec == NoCompressionCodec && batch.isControlBatch) - inPlaceAssignment = true + // Do not compress control records unless they are written compressed + if (sourceCodec == NoCompressionCodec && batch.isControlBatch) + inPlaceAssignment = true - for (record <- batch.asScala) { - if (sourceCodec != NoCompressionCodec && record.isCompressed) - throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + - s"compression attribute set: $record") - if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0) - throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression") - validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) + for (record <- batch.asScala) { + if (sourceCodec != NoCompressionCodec && record.isCompressed) + throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + + s"compression attribute set: $record") + if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0) + throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression") + validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) - uncompressedSizeInBytes += record.sizeInBytes() - if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { - // Check if we need to overwrite offset - // No in place assignment situation 3 - if (record.offset != expectedInnerOffset.getAndIncrement()) - inPlaceAssignment = false - if (record.timestamp > maxTimestamp) - maxTimestamp = record.timestamp - } + uncompressedSizeInBytes += record.sizeInBytes() + if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { + // Check if we need to overwrite offset + // No in place assignment situation 3 + if (record.offset != expectedInnerOffset.getAndIncrement()) + inPlaceAssignment = false + if (record.timestamp > maxTimestamp) + maxTimestamp = record.timestamp + } - // No in place assignment situation 4 - if (!record.hasMagic(toMagic)) - inPlaceAssignment = false + // No in place assignment situation 4 + if (!record.hasMagic(toMagic)) + inPlaceAssignment = false - validatedRecords += record + validatedRecords += record + } } if (!inPlaceAssignment) { diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index d306b16..324314f 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -37,40 +37,42 @@ class LogValidatorTest { val time = Time.SYSTEM @Test - def testOnlyOneBatchCompressedV0(): Unit = { - checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP) + def testOnlyOneBatch(): Unit = { + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, CompressionType.GZIP) + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP, CompressionType.GZIP) + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.NONE) + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, CompressionType.NONE) + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP, CompressionType.NONE) + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, CompressionType.NONE) + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, CompressionType.GZIP) } @Test - def testOnlyOneBatchCompressedV1(): Unit = { - checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP) + def testAllowMultiBatch(): Unit = { + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, CompressionType.NONE) + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.NONE) + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, CompressionType.GZIP) + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.GZIP) } - @Test - def testOnlyOneBatchCompressedV2(): Unit = { - checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP) - } - - @Test - def testOnlyOneBatchUncompressedV2(): Unit = { - checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE) - } - - private def checkOnlyOneBatchCompressed(magic: Byte, compressionType: CompressionType) { - validateMessages(createRecords(magic, 0L, compressionType), magic, compressionType) - + private def checkOnlyOneBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) { assertThrows[InvalidRecordException] { - validateMessages(createTwoBatchedRecords(magic, 0L, compressionType), magic, compressionType) + validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType) } } - private def validateMessages(records: MemoryRecords, magic: Byte, compressionType: CompressionType): Unit = { + private def checkAllowMultiBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) { + validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType) + } + + private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = { LogValidator.validateMessagesAndAssignOffsets(records, new LongRef(0L), time, now = 0L, - CompressionCodec.getCompressionCodec(compressionType.name), - CompressionCodec.getCompressionCodec(compressionType.name), + CompressionCodec.getCompressionCodec(sourceCompressionType.name), + CompressionCodec.getCompressionCodec(targetCompressionType.name), compactedTopic = false, magic, TimestampType.CREATE_TIME,