[
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640218#comment-16640218
]
ASF GitHub Bot commented on KAFKA-7467:
---------------------------------------
hachikuji closed pull request #5727: KAFKA-7467: NoSuchElementException is
raised because controlBatch is …
URL: https://github.com/apache/kafka/pull/5727
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 2a6ac0cfec7..6b42d07cc3f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1300,8 +1300,7 @@ private boolean containsAbortMarker(RecordBatch batch) {
Iterator<Record> batchIterator = batch.iterator();
if (!batchIterator.hasNext())
- throw new InvalidRecordException("Invalid batch for partition
" + partition + " at offset " +
- batch.baseOffset() + " with control sequence set, but
no records");
+ return false;
Record firstRecord = batchIterator.next();
return ControlRecordType.ABORT ==
ControlRecordType.parse(firstRecord.key());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 75a34ccf908..42f6beb16c2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2639,6 +2639,52 @@ private void verifySessionPartitions() {
assertEquals(0, future.get());
}
+ @Test
+ public void testEmptyControlBatch() {
+ Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new
Metrics(), new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(), Integer.MAX_VALUE,
IsolationLevel.READ_COMMITTED);
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ int currentOffset = 1;
+
+ // Empty control batch should not cause an exception
+ DefaultRecordBatch.writeEmptyHeader(buffer,
RecordBatch.MAGIC_VALUE_V2, 1L,
+ (short) 0, -1, 0, 0,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
TimestampType.CREATE_TIME, time.milliseconds(),
+ true, true);
+
+ currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+ new SimpleRecord(time.milliseconds(), "key".getBytes(),
"value".getBytes()),
+ new SimpleRecord(time.milliseconds(), "key".getBytes(),
"value".getBytes()));
+
+ commitTransaction(buffer, 1L, currentOffset);
+ buffer.flip();
+
+ List<FetchResponse.AbortedTransaction> abortedTransactions = new
ArrayList<>();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ subscriptions.assignFromUser(singleton(tp0));
+
+ subscriptions.seek(tp0, 0);
+
+ // normal fetch
+ assertEquals(1, fetcher.sendFetches());
+ assertFalse(fetcher.hasCompletedFetches());
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ FetchRequest request = (FetchRequest) body;
+ assertEquals(IsolationLevel.READ_COMMITTED,
request.isolationLevel());
+ return true;
+ }
+ }, fullFetchResponseWithAbortedTransactions(records,
abortedTransactions, Errors.NONE, 100L, 100L, 0));
+
+ consumerClient.poll(time.timer(0));
+ assertTrue(fetcher.hasCompletedFetches());
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetcher.fetchedRecords();
+ assertTrue(fetchedRecords.containsKey(tp0));
+ assertEquals(fetchedRecords.get(tp0).size(), 2);
+ }
+
private MemoryRecords buildRecords(long baseOffset, int count, long
firstMessageId) {
MemoryRecordsBuilder builder =
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, baseOffset);
for (int i = 0; i < count; i++)
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 21f4a3dcad9..31cd3615675 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -555,17 +555,20 @@ class GroupMetadataManager(brokerId: Int,
memRecords.batches.asScala.foreach { batch =>
val isTxnOffsetCommit = batch.isTransactional
if (batch.isControlBatch) {
- val record = batch.iterator.next()
- val controlRecord = ControlRecordType.parse(record.key)
- if (controlRecord == ControlRecordType.COMMIT) {
- pendingOffsets.getOrElse(batch.producerId,
mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
- .foreach {
- case (groupTopicPartition, commitRecordMetadataAndOffset)
=>
- if (!loadedOffsets.contains(groupTopicPartition) ||
loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
- loadedOffsets.put(groupTopicPartition,
commitRecordMetadataAndOffset)
- }
+ val recordIterator = batch.iterator
+ if (recordIterator.hasNext) {
+ val record = recordIterator.next()
+ val controlRecord = ControlRecordType.parse(record.key)
+ if (controlRecord == ControlRecordType.COMMIT) {
+ pendingOffsets.getOrElse(batch.producerId,
mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
+ .foreach {
+ case (groupTopicPartition,
commitRecordMetadataAndOffset) =>
+ if (!loadedOffsets.contains(groupTopicPartition) ||
loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
+ loadedOffsets.put(groupTopicPartition,
commitRecordMetadataAndOffset)
+ }
+ }
+ pendingOffsets.remove(batch.producerId)
}
- pendingOffsets.remove(batch.producerId)
} else {
var batchBaseOffset: Option[Long] = None
for (record <- batch.asScala) {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index f1ac1fc5e94..bf4f7e1fcba 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -990,24 +990,30 @@ private[log] class CleanedTransactionMetadata(val
abortedTransactions: mutable.P
def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
consumeAbortedTxnsUpTo(controlBatch.lastOffset)
- val controlRecord = controlBatch.iterator.next()
- val controlType = ControlRecordType.parse(controlRecord.key)
- val producerId = controlBatch.producerId
- controlType match {
- case ControlRecordType.ABORT =>
- ongoingAbortedTxns.remove(producerId) match {
- // Retain the marker until all batches from the transaction have
been removed
- case Some(abortedTxnMetadata) if
abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
- transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
- false
- case _ => true
- }
+ val controlRecordIterator = controlBatch.iterator
+ if (controlRecordIterator.hasNext) {
+ val controlRecord = controlRecordIterator.next()
+ val controlType = ControlRecordType.parse(controlRecord.key)
+ val producerId = controlBatch.producerId
+ controlType match {
+ case ControlRecordType.ABORT =>
+ ongoingAbortedTxns.remove(producerId) match {
+ // Retain the marker until all batches from the transaction have
been removed
+ case Some(abortedTxnMetadata) if
abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
+ transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+ false
+ case _ => true
+ }
- case ControlRecordType.COMMIT =>
- // This marker is eligible for deletion if we didn't traverse any
batches from the transaction
- !ongoingCommittedTxns.remove(producerId)
+ case ControlRecordType.COMMIT =>
+ // This marker is eligible for deletion if we didn't traverse any
batches from the transaction
+ !ongoingCommittedTxns.remove(producerId)
- case _ => false
+ case _ => false
+ }
+ } else {
+ // An empty control batch was already cleaned, so it's safe to discard
+ true
}
}
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 2f711234bdb..a5c182c2ce3 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -254,10 +254,16 @@ private[log] class ProducerAppendInfo(val producerId:
Long,
def append(batch: RecordBatch): Option[CompletedTxn] = {
if (batch.isControlBatch) {
- val record = batch.iterator.next()
- val endTxnMarker = EndTransactionMarker.deserialize(record)
- val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch,
batch.baseOffset, record.timestamp)
- Some(completedTxn)
+ val recordIterator = batch.iterator
+ if (recordIterator.hasNext) {
+ val record = recordIterator.next()
+ val endTxnMarker = EndTransactionMarker.deserialize(record)
+ val completedTxn = appendEndTxnMarker(endTxnMarker,
batch.producerEpoch, batch.baseOffset, record.timestamp)
+ Some(completedTxn)
+ } else {
+ // An empty control batch means the entire transaction has been
cleaned from the log, so no need to append
+ None
+ }
} else {
append(batch.producerEpoch, batch.baseSequence, batch.lastSequence,
batch.maxTimestamp, batch.lastOffset,
batch.isTransactional)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 1792c7bff81..b5b0e6eced5 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -424,7 +424,8 @@ object DumpLogSegments {
print("baseOffset: " + batch.baseOffset + " lastOffset: " +
batch.lastOffset + " count: " + batch.countOrNull +
" baseSequence: " + batch.baseSequence + " lastSequence: " +
batch.lastSequence +
" producerId: " + batch.producerId + " producerEpoch: " +
batch.producerEpoch +
- " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + "
isTransactional: " + batch.isTransactional)
+ " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + "
isTransactional: " + batch.isTransactional +
+ " isControl: " + batch.isControlBatch)
else
print("offset: " + batch.lastOffset)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index b48f297ad78..f6482573841 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -35,6 +35,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
import org.junit.{Before, Test}
import java.nio.ByteBuffer
+import java.util.Collections
import java.util.Optional
import com.yammer.metrics.Metrics
@@ -1766,6 +1767,61 @@ class GroupMetadataManagerTest {
verifySerde(version)
}
+ @Test
+ def testLoadOffsetsWithEmptyControlBatch() {
+ val groupMetadataTopicPartition = groupTopicPartition
+ val startOffset = 15L
+ val generation = 15
+
+ val committedOffsets = Map(
+ new TopicPartition("foo", 0) -> 23L,
+ new TopicPartition("foo", 1) -> 455L,
+ new TopicPartition("bar", 0) -> 8992L
+ )
+
+ val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+ val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+ // Prepend empty control batch to valid records
+ val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+
EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
+ EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
+ EasyMock.expect(mockBatch.isTransactional).andReturn(true)
+ EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
+ EasyMock.replay(mockBatch)
+ val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+
EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch)
++ records.batches.asScala).asJava).anyTimes()
+
EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
+
EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD
+ records.sizeInBytes()).anyTimes()
+ EasyMock.replay(mockRecords)
+
+ val logMock = EasyMock.mock(classOf[Log])
+ EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
+ EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(),
EasyMock.eq(None),
+ EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+ .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
+
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18))
+ EasyMock.replay(logMock)
+ EasyMock.replay(replicaManager)
+
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> ())
+
+ // Empty control batch should not have caused the load to fail
+ val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group
was not loaded into the cache"))
+ assertEquals(groupId, group.groupId)
+ assertEquals(Empty, group.currentState)
+ assertEquals(generation, group.generationId)
+ assertEquals(Some(protocolType), group.protocolType)
+ assertNull(group.leaderOrNull)
+ assertNull(group.protocolOrNull)
+ committedOffsets.foreach { case (topicPartition, offset) =>
+ assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+ }
+ }
+
private def appendAndCaptureCallback(): Capture[Map[TopicPartition,
PartitionResponse] => Unit] = {
val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] =>
Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 73dfa7e39cd..ff5af6123e2 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -465,6 +465,37 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
}
+ @Test
+ def testCleanEmptyControlBatch(): Unit = {
+ val tp = new TopicPartition("test", 0)
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
+
+ val producerEpoch = 0.toShort
+
+ // [{Producer1: Commit}, {2}, {3}]
+ log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0,
isFromClient = false) // offset 7
+ log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
+ log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
+ log.roll()
+
+ // first time through the control batch is retained as an empty batch
+ // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L),
deleteHorizonMs = Long.MaxValue)._1
+ assertEquals(List(2, 3), LogTest.keysInLog(log))
+ assertEquals(List(1, 2), offsetsInLog(log))
+ assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+
+ // the empty control batch does not cause an exception when cleaned
+ // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L),
deleteHorizonMs = Long.MaxValue)._1
+ assertEquals(List(2, 3), LogTest.keysInLog(log))
+ assertEquals(List(1, 2), offsetsInLog(log))
+ assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+ }
+
@Test
def testAbortMarkerRemoval(): Unit = {
val tp = new TopicPartition("test", 0)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index f9f4a239023..9afb145c4c6 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -21,14 +21,16 @@ import java.io.File
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.StandardOpenOption
+import java.util.Collections
import kafka.server.LogOffsetMetadata
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.record.{ControlRecordType,
EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnitSuite
@@ -746,6 +748,22 @@ class ProducerStateManagerTest extends JUnitSuite {
}
}
+ @Test
+ def testAppendEmptyControlBatch(): Unit = {
+ val producerId = 23423L
+ val producerEpoch = 145.toShort
+ val baseOffset = 15
+
+ val batch = EasyMock.createMock(classOf[RecordBatch])
+ EasyMock.expect(batch.isControlBatch).andReturn(true).once
+
EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
+ EasyMock.replay(batch)
+
+ // Appending the empty control batch should not throw and a new
transaction shouldn't be started
+ append(stateManager, producerId, producerEpoch, baseOffset, batch,
isFromClient = true)
+ assertEquals(None,
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
+ }
+
private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel =>
Unit): Unit = {
val epoch = 0.toShort
val producerId = 1L
@@ -806,6 +824,18 @@ class ProducerStateManagerTest extends JUnitSuite {
stateManager.updateMapEndOffset(offset + 1)
}
+ private def append(stateManager: ProducerStateManager,
+ producerId: Long,
+ producerEpoch: Short,
+ offset: Long,
+ batch: RecordBatch,
+ isFromClient : Boolean): Unit = {
+ val producerAppendInfo = stateManager.prepareUpdate(producerId,
isFromClient)
+ producerAppendInfo.append(batch)
+ stateManager.update(producerAppendInfo)
+ stateManager.updateMapEndOffset(offset + 1)
+ }
+
private def currentSnapshotOffsets =
logDir.listFiles.map(Log.offsetFromFile).toSet
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> NoSuchElementException is raised because controlBatch is empty
> --------------------------------------------------------------
>
> Key: KAFKA-7467
> URL: https://issues.apache.org/jira/browse/KAFKA-7467
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 1.1.0
> Reporter: Badai Aqrandista
> Assignee: Bob Barrett
> Priority: Major
>
> Somehow, log cleaner died because of NoSuchElementException when it calls
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
> at java.util.Collections$EmptyIterator.next(Collections.java:4189)
> at
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
> at
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
> at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
> at
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
> at
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
> at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
> at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
> at kafka.log.Cleaner.clean(LogCleaner.scala:438)
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
> def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
> consumeAbortedTxnsUpTo(controlBatch.lastOffset)
> val controlRecord = controlBatch.iterator.next()
> val controlType = ControlRecordType.parse(controlRecord.key)
> val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty
> batches, which results in empty control batches. Trying to read the control
> type of an empty batch causes the error.
> {noformat}
> else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
> if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
> throw new IllegalStateException("Empty batches are only supported for
> magic v2 and above");
>
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
> DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(),
> batchMagic, batch.producerId(),
> batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(),
> batch.lastOffset(),
> batch.partitionLeaderEpoch(), batch.timestampType(),
> batch.maxTimestamp(),
> batch.isTransactional(), batch.isControlBatch());
> filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)