[ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Barrett updated KAFKA-7467:
-------------------------------
    Description: 
Somehow, log cleaner died because of NoSuchElementException when it calls 
onControlBatchRead:
{noformat}
[2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, 
discarding deletes. (kafka.log.LogCleaner)
[2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.util.NoSuchElementException
  at java.util.Collections$EmptyIterator.next(Collections.java:4189)
  at 
kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
  at 
kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
  at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
  at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
{noformat}
The following code does not seem to expect the controlBatch to be empty:

[https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
{noformat}
  def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
    consumeAbortedTxnsUpTo(controlBatch.lastOffset)

    val controlRecord = controlBatch.iterator.next()
    val controlType = ControlRecordType.parse(controlRecord.key)
    val producerId = controlBatch.producerId
{noformat}
MemoryRecords.filterTo copies the original control attribute for empty batches, 
which results in empty control batches. Trying to read the control type of an 
empty batch causes the error.
{noformat}
  else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
    if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
        throw new IllegalStateException("Empty batches are only supported for 
magic v2 and above");

    
bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
    DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
batchMagic, batch.producerId(),
            batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), 
batch.lastOffset(),
            batch.partitionLeaderEpoch(), batch.timestampType(), 
batch.maxTimestamp(),
            batch.isTransactional(), batch.isControlBatch());
    filterResult.updateRetainedBatchMetadata(batch, 0, true);
{noformat}

  was:
Somehow, log cleaner died because of NoSuchElementException when it calls 
onControlBatchRead:
{noformat}
[2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, 
discarding deletes. (kafka.log.LogCleaner)
[2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.util.NoSuchElementException
  at java.util.Collections$EmptyIterator.next(Collections.java:4189)
  at 
kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
  at 
kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
  at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
  at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
{noformat}
The following code does not seem to expect the controlBatch to be empty:

[https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
{noformat}
  def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
    consumeAbortedTxnsUpTo(controlBatch.lastOffset)

    val controlRecord = controlBatch.iterator.next()
    val controlType = ControlRecordType.parse(controlRecord.key)
    val producerId = controlBatch.producerId
{noformat}
MemoryRecords.filterTo copies the original control attribute for empty batches, 
which results in empty control batches. Trying to read the control type of an 
empty batch causes the error.
{noformat}
else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
                if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
                    throw new IllegalStateException("Empty batches are only 
supported for magic v2 and above");

                
bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
                
DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, 
batch.producerId(),
                        batch.producerEpoch(), batch.baseSequence(), 
batch.baseOffset(), batch.lastOffset(),
                        batch.partitionLeaderEpoch(), batch.timestampType(), 
batch.maxTimestamp(),
                        batch.isTransactional(), batch.isControlBatch());
                filterResult.updateRetainedBatchMetadata(batch, 0, true);
{noformat}


> NoSuchElementException is raised because controlBatch is empty
> --------------------------------------------------------------
>
>                 Key: KAFKA-7467
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7467
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Badai Aqrandista
>            Assignee: Bob Barrett
>            Priority: Minor
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
>     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
>     val controlRecord = controlBatch.iterator.next()
>     val controlType = ControlRecordType.parse(controlRecord.key)
>     val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty 
> batches, which results in empty control batches. Trying to read the control 
> type of an empty batch causes the error.
> {noformat}
>   else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
>     if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
>         throw new IllegalStateException("Empty batches are only supported for 
> magic v2 and above");
>     
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
>     DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
> batchMagic, batch.producerId(),
>             batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), 
> batch.lastOffset(),
>             batch.partitionLeaderEpoch(), batch.timestampType(), 
> batch.maxTimestamp(),
>             batch.isTransactional(), batch.isControlBatch());
>     filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to