[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640218#comment-16640218 ] ASF GitHub Bot commented on KAFKA-7467: --- hachikuji closed pull request #5727: KAFKA-7467: NoSuchElementException is raised because controlBatch is … URL: https://github.com/apache/kafka/pull/5727 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 2a6ac0cfec7..6b42d07cc3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1300,8 +1300,7 @@ private boolean containsAbortMarker(RecordBatch batch) { Iterator batchIterator = batch.iterator(); if (!batchIterator.hasNext()) -throw new InvalidRecordException("Invalid batch for partition " + partition + " at offset " + -batch.baseOffset() + " with control sequence set, but no records"); +return false; Record firstRecord = batchIterator.next(); return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 75a34ccf908..42f6beb16c2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -2639,6 +2639,52 @@ private void verifySessionPartitions() { assertEquals(0, future.get()); } +@Test +public void testEmptyControlBatch() { +Fetcher fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), +new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); +ByteBuffer buffer = ByteBuffer.allocate(1024); +int currentOffset = 1; + +// Empty control batch should not cause an exception +DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, 1L, +(short) 0, -1, 0, 0, +RecordBatch.NO_PARTITION_LEADER_EPOCH, TimestampType.CREATE_TIME, time.milliseconds(), +true, true); + +currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset, +new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), +new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); + +commitTransaction(buffer, 1L, currentOffset); +buffer.flip(); + +List abortedTransactions = new ArrayList<>(); +MemoryRecords records = MemoryRecords.readableRecords(buffer); +subscriptions.assignFromUser(singleton(tp0)); + +subscriptions.seek(tp0, 0); + +// normal fetch +assertEquals(1, fetcher.sendFetches()); +assertFalse(fetcher.hasCompletedFetches()); +client.prepareResponse(new MockClient.RequestMatcher() { +@Override +public boolean matches(AbstractRequest body) { +FetchRequest request = (FetchRequest) body; +assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel()); +return true; +} +}, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0)); + +consumerClient.poll(time.timer(0)); +assertTrue(fetcher.hasCompletedFetches()); + +Map>> fetchedRecords = fetcher.fetchedRecords(); +assertTrue(fetchedRecords.containsKey(tp0)); +assertEquals(fetchedRecords.get(tp0).size(), 2); +} + private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset); for (int i = 0; i < count; i++) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 21f4a3dcad9..31cd3615675 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -555,17 +555,20 @@ class GroupMetadataManager(brokerId: Int, memRecords.batches.
[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635692#comment-16635692 ] ASF GitHub Bot commented on KAFKA-7467: --- bob-barrett opened a new pull request #5727: KAFKA-7467: NoSuchElementException is raised because controlBatch is … URL: https://github.com/apache/kafka/pull/5727 …empty This patch changes the behavior of MemoryRecords.filterTo() to always mark empty batches as non-control batches, rather than taking the attribute from the original batch. Empty control batches will cause fatal errors in segment recovery or the log cleaner due to trying access a record in an empty batch. In order to prevent errors from logs that are already in this state, this patch also adds a check to DefaultRecordBatch.isControlBatch() so that it will only return true if the batch is non-empty. Finally, it adds isControl to the output of DumpLogSegments. MemoryRecords and DefaultRecordsBatch changes were tested with unit tests, DumpLogSegments change was tested manually. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > NoSuchElementException is raised because controlBatch is empty > -- > > Key: KAFKA-7467 > URL: https://issues.apache.org/jira/browse/KAFKA-7467 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 >Reporter: Badai Aqrandista >Assignee: Bob Barrett >Priority: Major > > Somehow, log cleaner died because of NoSuchElementException when it calls > onControlBatchRead: > {noformat} > [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log > __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into > 0, discarding deletes. (kafka.log.LogCleaner) > [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to > (kafka.log.LogCleaner) > java.util.NoSuchElementException > at java.util.Collections$EmptyIterator.next(Collections.java:4189) > at > kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) > at > kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) > at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:392) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped > (kafka.log.LogCleaner) > {noformat} > The following code does not seem to expect the controlBatch to be empty: > [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946] > {noformat} > def onControlBatchRead(controlBatch: RecordBatch): Boolean = { > consumeAbortedTxnsUpTo(controlBatch.lastOffset) > val controlRecord = controlBatch.iterator.next() > val controlType = ControlRecordType.parse(controlRecord.key) > val producerId = controlBatch.producerId > {noformat} > MemoryRecords.filterTo copies the original control attribute for empty > batches, which results in empty control batches. Trying to read the control > type of an empty batch causes the error. > {noformat} > else if (batchRetention == BatchRetention.RETAIN_EMPTY) { > if (batchMagic < RecordBatch.MAGIC_VALUE_V2) > throw new IllegalStateException("Empty batches are only supported for > magic v2 and above"); > > bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); > DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), > batchMagic, batch.producerId(), > batch.producerEpoch(), batch.baseSequence(), ba
[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635078#comment-16635078 ] Badai Aqrandista commented on KAFKA-7467: - Changed Priority to Major. > NoSuchElementException is raised because controlBatch is empty > -- > > Key: KAFKA-7467 > URL: https://issues.apache.org/jira/browse/KAFKA-7467 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 >Reporter: Badai Aqrandista >Assignee: Bob Barrett >Priority: Major > > Somehow, log cleaner died because of NoSuchElementException when it calls > onControlBatchRead: > {noformat} > [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log > __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into > 0, discarding deletes. (kafka.log.LogCleaner) > [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to > (kafka.log.LogCleaner) > java.util.NoSuchElementException > at java.util.Collections$EmptyIterator.next(Collections.java:4189) > at > kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) > at > kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) > at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:392) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped > (kafka.log.LogCleaner) > {noformat} > The following code does not seem to expect the controlBatch to be empty: > [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946] > {noformat} > def onControlBatchRead(controlBatch: RecordBatch): Boolean = { > consumeAbortedTxnsUpTo(controlBatch.lastOffset) > val controlRecord = controlBatch.iterator.next() > val controlType = ControlRecordType.parse(controlRecord.key) > val producerId = controlBatch.producerId > {noformat} > MemoryRecords.filterTo copies the original control attribute for empty > batches, which results in empty control batches. Trying to read the control > type of an empty batch causes the error. > {noformat} > else if (batchRetention == BatchRetention.RETAIN_EMPTY) { > if (batchMagic < RecordBatch.MAGIC_VALUE_V2) > throw new IllegalStateException("Empty batches are only supported for > magic v2 and above"); > > bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); > DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), > batchMagic, batch.producerId(), > batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), > batch.lastOffset(), > batch.partitionLeaderEpoch(), batch.timestampType(), > batch.maxTimestamp(), > batch.isTransactional(), batch.isControlBatch()); > filterResult.updateRetainedBatchMetadata(batch, 0, true); > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634878#comment-16634878 ] Ismael Juma commented on KAFKA-7467: How come this is considered minor? It seems severe since it kills the cleaner. > NoSuchElementException is raised because controlBatch is empty > -- > > Key: KAFKA-7467 > URL: https://issues.apache.org/jira/browse/KAFKA-7467 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 >Reporter: Badai Aqrandista >Assignee: Bob Barrett >Priority: Minor > > Somehow, log cleaner died because of NoSuchElementException when it calls > onControlBatchRead: > {noformat} > [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log > __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into > 0, discarding deletes. (kafka.log.LogCleaner) > [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to > (kafka.log.LogCleaner) > java.util.NoSuchElementException > at java.util.Collections$EmptyIterator.next(Collections.java:4189) > at > kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) > at > kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) > at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:392) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped > (kafka.log.LogCleaner) > {noformat} > The following code does not seem to expect the controlBatch to be empty: > [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946] > {noformat} > def onControlBatchRead(controlBatch: RecordBatch): Boolean = { > consumeAbortedTxnsUpTo(controlBatch.lastOffset) > val controlRecord = controlBatch.iterator.next() > val controlType = ControlRecordType.parse(controlRecord.key) > val producerId = controlBatch.producerId > {noformat} > MemoryRecords.filterTo copies the original control attribute for empty > batches, which results in empty control batches. Trying to read the control > type of an empty batch causes the error. > {noformat} > else if (batchRetention == BatchRetention.RETAIN_EMPTY) { > if (batchMagic < RecordBatch.MAGIC_VALUE_V2) > throw new IllegalStateException("Empty batches are only supported for > magic v2 and above"); > > bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); > DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), > batchMagic, batch.producerId(), > batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), > batch.lastOffset(), > batch.partitionLeaderEpoch(), batch.timestampType(), > batch.maxTimestamp(), > batch.isTransactional(), batch.isControlBatch()); > filterResult.updateRetainedBatchMetadata(batch, 0, true); > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)