[ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640218#comment-16640218
 ] 

ASF GitHub Bot commented on KAFKA-7467:
---------------------------------------

hachikuji closed pull request #5727: KAFKA-7467: NoSuchElementException is 
raised because controlBatch is …
URL: https://github.com/apache/kafka/pull/5727
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 2a6ac0cfec7..6b42d07cc3f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1300,8 +1300,7 @@ private boolean containsAbortMarker(RecordBatch batch) {
 
             Iterator<Record> batchIterator = batch.iterator();
             if (!batchIterator.hasNext())
-                throw new InvalidRecordException("Invalid batch for partition 
" + partition + " at offset " +
-                        batch.baseOffset() + " with control sequence set, but 
no records");
+                return false;
 
             Record firstRecord = batchIterator.next();
             return ControlRecordType.ABORT == 
ControlRecordType.parse(firstRecord.key());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 75a34ccf908..42f6beb16c2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2639,6 +2639,52 @@ private void verifySessionPartitions() {
         assertEquals(0, future.get());
     }
 
+    @Test
+    public void testEmptyControlBatch() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new 
Metrics(), new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 1;
+
+        // Empty control batch should not cause an exception
+        DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.MAGIC_VALUE_V2, 1L,
+                (short) 0, -1, 0, 0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH, 
TimestampType.CREATE_TIME, time.milliseconds(),
+                true, true);
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()));
+
+        commitTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        List<FetchResponse.AbortedTransaction> abortedTransactions = new 
ArrayList<>();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        subscriptions.assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                FetchRequest request = (FetchRequest) body;
+                assertEquals(IsolationLevel.READ_COMMITTED, 
request.isolationLevel());
+                return true;
+            }
+        }, fullFetchResponseWithAbortedTransactions(records, 
abortedTransactions, Errors.NONE, 100L, 100L, 0));
+
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetcher.fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+    }
+
     private MemoryRecords buildRecords(long baseOffset, int count, long 
firstMessageId) {
         MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, baseOffset);
         for (int i = 0; i < count; i++)
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 21f4a3dcad9..31cd3615675 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -555,17 +555,20 @@ class GroupMetadataManager(brokerId: Int,
           memRecords.batches.asScala.foreach { batch =>
             val isTxnOffsetCommit = batch.isTransactional
             if (batch.isControlBatch) {
-              val record = batch.iterator.next()
-              val controlRecord = ControlRecordType.parse(record.key)
-              if (controlRecord == ControlRecordType.COMMIT) {
-                pendingOffsets.getOrElse(batch.producerId, 
mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
-                  .foreach {
-                    case (groupTopicPartition, commitRecordMetadataAndOffset) 
=>
-                      if (!loadedOffsets.contains(groupTopicPartition) || 
loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
-                        loadedOffsets.put(groupTopicPartition, 
commitRecordMetadataAndOffset)
-                  }
+              val recordIterator = batch.iterator
+              if (recordIterator.hasNext) {
+                val record = recordIterator.next()
+                val controlRecord = ControlRecordType.parse(record.key)
+                if (controlRecord == ControlRecordType.COMMIT) {
+                  pendingOffsets.getOrElse(batch.producerId, 
mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
+                    .foreach {
+                      case (groupTopicPartition, 
commitRecordMetadataAndOffset) =>
+                        if (!loadedOffsets.contains(groupTopicPartition) || 
loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
+                          loadedOffsets.put(groupTopicPartition, 
commitRecordMetadataAndOffset)
+                    }
+                }
+                pendingOffsets.remove(batch.producerId)
               }
-              pendingOffsets.remove(batch.producerId)
             } else {
               var batchBaseOffset: Option[Long] = None
               for (record <- batch.asScala) {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index f1ac1fc5e94..bf4f7e1fcba 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -990,24 +990,30 @@ private[log] class CleanedTransactionMetadata(val 
abortedTransactions: mutable.P
   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
 
-    val controlRecord = controlBatch.iterator.next()
-    val controlType = ControlRecordType.parse(controlRecord.key)
-    val producerId = controlBatch.producerId
-    controlType match {
-      case ControlRecordType.ABORT =>
-        ongoingAbortedTxns.remove(producerId) match {
-          // Retain the marker until all batches from the transaction have 
been removed
-          case Some(abortedTxnMetadata) if 
abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
-            transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
-            false
-          case _ => true
-        }
+    val controlRecordIterator = controlBatch.iterator
+    if (controlRecordIterator.hasNext) {
+      val controlRecord = controlRecordIterator.next()
+      val controlType = ControlRecordType.parse(controlRecord.key)
+      val producerId = controlBatch.producerId
+      controlType match {
+        case ControlRecordType.ABORT =>
+          ongoingAbortedTxns.remove(producerId) match {
+            // Retain the marker until all batches from the transaction have 
been removed
+            case Some(abortedTxnMetadata) if 
abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
+              transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+              false
+            case _ => true
+          }
 
-      case ControlRecordType.COMMIT =>
-        // This marker is eligible for deletion if we didn't traverse any 
batches from the transaction
-        !ongoingCommittedTxns.remove(producerId)
+        case ControlRecordType.COMMIT =>
+          // This marker is eligible for deletion if we didn't traverse any 
batches from the transaction
+          !ongoingCommittedTxns.remove(producerId)
 
-      case _ => false
+        case _ => false
+      }
+    } else {
+      // An empty control batch was already cleaned, so it's safe to discard
+      true
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 2f711234bdb..a5c182c2ce3 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -254,10 +254,16 @@ private[log] class ProducerAppendInfo(val producerId: 
Long,
 
   def append(batch: RecordBatch): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
-      val record = batch.iterator.next()
-      val endTxnMarker = EndTransactionMarker.deserialize(record)
-      val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, 
batch.baseOffset, record.timestamp)
-      Some(completedTxn)
+      val recordIterator = batch.iterator
+      if (recordIterator.hasNext) {
+        val record = recordIterator.next()
+        val endTxnMarker = EndTransactionMarker.deserialize(record)
+        val completedTxn = appendEndTxnMarker(endTxnMarker, 
batch.producerEpoch, batch.baseOffset, record.timestamp)
+        Some(completedTxn)
+      } else {
+        // An empty control batch means the entire transaction has been 
cleaned from the log, so no need to append
+        None
+      }
     } else {
       append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, 
batch.maxTimestamp, batch.lastOffset,
         batch.isTransactional)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 1792c7bff81..b5b0e6eced5 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -424,7 +424,8 @@ object DumpLogSegments {
           print("baseOffset: " + batch.baseOffset + " lastOffset: " + 
batch.lastOffset + " count: " + batch.countOrNull +
             " baseSequence: " + batch.baseSequence + " lastSequence: " + 
batch.lastSequence +
             " producerId: " + batch.producerId + " producerEpoch: " + 
batch.producerEpoch +
-            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " 
isTransactional: " + batch.isTransactional)
+            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " 
isTransactional: " + batch.isTransactional +
+            " isControl: " + batch.isControlBatch)
         else
           print("offset: " + batch.lastOffset)
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index b48f297ad78..f6482573841 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -35,6 +35,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
+import java.util.Collections
 import java.util.Optional
 
 import com.yammer.metrics.Metrics
@@ -1766,6 +1767,61 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testLoadOffsetsWithEmptyControlBatch() {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val startOffset = 15L
+    val generation = 15
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    // Prepend empty control batch to valid records
+    val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+    
EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
+    EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
+    EasyMock.expect(mockBatch.isTransactional).andReturn(true)
+    EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
+    EasyMock.replay(mockBatch)
+    val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+    
EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch)
 ++ records.batches.asScala).asJava).anyTimes()
+    
EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
+    
EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD
 + records.sizeInBytes()).anyTimes()
+    EasyMock.replay(mockRecords)
+
+    val logMock = EasyMock.mock(classOf[Log])
+    EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), 
EasyMock.eq(None),
+      EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+      .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
+    
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+    
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18))
+    EasyMock.replay(logMock)
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> ())
+
+    // Empty control batch should not have caused the load to fail
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group 
was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertNull(group.leaderOrNull)
+    assertNull(group.protocolOrNull)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
   private def appendAndCaptureCallback(): Capture[Map[TopicPartition, 
PartitionResponse] => Unit] = {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 73dfa7e39cd..ff5af6123e2 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -465,6 +465,37 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
   }
 
+  @Test
+  def testCleanEmptyControlBatch(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    val producerEpoch = 0.toShort
+
+    // [{Producer1: Commit}, {2}, {3}]
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, 
isFromClient = false) // offset 7
+    log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
+    log.roll()
+
+    // first time through the control batch is retained as an empty batch
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+
+    // the empty control batch does not cause an exception when cleaned
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+  }
+
   @Test
   def testAbortMarkerRemoval(): Unit = {
     val tp = new TopicPartition("test", 0)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index f9f4a239023..9afb145c4c6 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -21,14 +21,16 @@ import java.io.File
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.StandardOpenOption
+import java.util.Collections
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -746,6 +748,22 @@ class ProducerStateManagerTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testAppendEmptyControlBatch(): Unit = {
+    val producerId = 23423L
+    val producerEpoch = 145.toShort
+    val baseOffset = 15
+
+    val batch = EasyMock.createMock(classOf[RecordBatch])
+    EasyMock.expect(batch.isControlBatch).andReturn(true).once
+    
EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
+    EasyMock.replay(batch)
+
+    // Appending the empty control batch should not throw and a new 
transaction shouldn't be started
+    append(stateManager, producerId, producerEpoch, baseOffset, batch, 
isFromClient = true)
+    assertEquals(None, 
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
+  }
+
   private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => 
Unit): Unit = {
     val epoch = 0.toShort
     val producerId = 1L
@@ -806,6 +824,18 @@ class ProducerStateManagerTest extends JUnitSuite {
     stateManager.updateMapEndOffset(offset + 1)
   }
 
+  private def append(stateManager: ProducerStateManager,
+                     producerId: Long,
+                     producerEpoch: Short,
+                     offset: Long,
+                     batch: RecordBatch,
+                     isFromClient : Boolean): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, 
isFromClient)
+    producerAppendInfo.append(batch)
+    stateManager.update(producerAppendInfo)
+    stateManager.updateMapEndOffset(offset + 1)
+  }
+
   private def currentSnapshotOffsets =
     logDir.listFiles.map(Log.offsetFromFile).toSet
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> NoSuchElementException is raised because controlBatch is empty
> --------------------------------------------------------------
>
>                 Key: KAFKA-7467
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7467
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Badai Aqrandista
>            Assignee: Bob Barrett
>            Priority: Major
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
>     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
>     val controlRecord = controlBatch.iterator.next()
>     val controlType = ControlRecordType.parse(controlRecord.key)
>     val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty 
> batches, which results in empty control batches. Trying to read the control 
> type of an empty batch causes the error.
> {noformat}
>   else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
>     if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
>         throw new IllegalStateException("Empty batches are only supported for 
> magic v2 and above");
>     
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
>     DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
> batchMagic, batch.producerId(),
>             batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), 
> batch.lastOffset(),
>             batch.partitionLeaderEpoch(), batch.timestampType(), 
> batch.maxTimestamp(),
>             batch.isTransactional(), batch.isControlBatch());
>     filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to