[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640218#comment-16640218
 ] 

ASF GitHub Bot commented on KAFKA-7467:
---

hachikuji closed pull request #5727: KAFKA-7467: NoSuchElementException is 
raised because controlBatch is …
URL: https://github.com/apache/kafka/pull/5727
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 2a6ac0cfec7..6b42d07cc3f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1300,8 +1300,7 @@ private boolean containsAbortMarker(RecordBatch batch) {
 
 Iterator batchIterator = batch.iterator();
 if (!batchIterator.hasNext())
-throw new InvalidRecordException("Invalid batch for partition 
" + partition + " at offset " +
-batch.baseOffset() + " with control sequence set, but 
no records");
+return false;
 
 Record firstRecord = batchIterator.next();
 return ControlRecordType.ABORT == 
ControlRecordType.parse(firstRecord.key());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 75a34ccf908..42f6beb16c2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2639,6 +2639,52 @@ private void verifySessionPartitions() {
 assertEquals(0, future.get());
 }
 
+@Test
+public void testEmptyControlBatch() {
+Fetcher fetcher = createFetcher(subscriptions, new 
Metrics(), new ByteArrayDeserializer(),
+new ByteArrayDeserializer(), Integer.MAX_VALUE, 
IsolationLevel.READ_COMMITTED);
+ByteBuffer buffer = ByteBuffer.allocate(1024);
+int currentOffset = 1;
+
+// Empty control batch should not cause an exception
+DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.MAGIC_VALUE_V2, 1L,
+(short) 0, -1, 0, 0,
+RecordBatch.NO_PARTITION_LEADER_EPOCH, 
TimestampType.CREATE_TIME, time.milliseconds(),
+true, true);
+
+currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()),
+new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()));
+
+commitTransaction(buffer, 1L, currentOffset);
+buffer.flip();
+
+List abortedTransactions = new 
ArrayList<>();
+MemoryRecords records = MemoryRecords.readableRecords(buffer);
+subscriptions.assignFromUser(singleton(tp0));
+
+subscriptions.seek(tp0, 0);
+
+// normal fetch
+assertEquals(1, fetcher.sendFetches());
+assertFalse(fetcher.hasCompletedFetches());
+client.prepareResponse(new MockClient.RequestMatcher() {
+@Override
+public boolean matches(AbstractRequest body) {
+FetchRequest request = (FetchRequest) body;
+assertEquals(IsolationLevel.READ_COMMITTED, 
request.isolationLevel());
+return true;
+}
+}, fullFetchResponseWithAbortedTransactions(records, 
abortedTransactions, Errors.NONE, 100L, 100L, 0));
+
+consumerClient.poll(time.timer(0));
+assertTrue(fetcher.hasCompletedFetches());
+
+Map>> 
fetchedRecords = fetcher.fetchedRecords();
+assertTrue(fetchedRecords.containsKey(tp0));
+assertEquals(fetchedRecords.get(tp0).size(), 2);
+}
+
 private MemoryRecords buildRecords(long baseOffset, int count, long 
firstMessageId) {
 MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, baseOffset);
 for (int i = 0; i < count; i++)
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 21f4a3dcad9..31cd3615675 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -555,17 +555,20 @@ class GroupMetadataManager(brokerId: Int,
   memRecords.batches.

[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635692#comment-16635692
 ] 

ASF GitHub Bot commented on KAFKA-7467:
---

bob-barrett opened a new pull request #5727: KAFKA-7467: NoSuchElementException 
is raised because controlBatch is …
URL: https://github.com/apache/kafka/pull/5727
 
 
   …empty
   
   This patch changes the behavior of MemoryRecords.filterTo() to always mark 
empty batches as non-control batches, rather than taking the attribute from the 
original batch. Empty control batches will cause fatal errors in segment 
recovery or the log cleaner due to trying access a record in an empty batch. In 
order to prevent errors from logs that are already in this state, this patch 
also adds a check to DefaultRecordBatch.isControlBatch() so that it will only 
return true if the batch is non-empty. Finally, it adds isControl to the output 
of DumpLogSegments. MemoryRecords and DefaultRecordsBatch changes were tested 
with unit tests, DumpLogSegments change was tested manually.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NoSuchElementException is raised because controlBatch is empty
> --
>
> Key: KAFKA-7467
> URL: https://issues.apache.org/jira/browse/KAFKA-7467
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Badai Aqrandista
>Assignee: Bob Barrett
>Priority: Major
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
> consumeAbortedTxnsUpTo(controlBatch.lastOffset)
> val controlRecord = controlBatch.iterator.next()
> val controlType = ControlRecordType.parse(controlRecord.key)
> val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty 
> batches, which results in empty control batches. Trying to read the control 
> type of an empty batch causes the error.
> {noformat}
>   else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
> if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
> throw new IllegalStateException("Empty batches are only supported for 
> magic v2 and above");
> 
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
> DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
> batchMagic, batch.producerId(),
> batch.producerEpoch(), batch.baseSequence(), ba

[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-01 Thread Badai Aqrandista (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635078#comment-16635078
 ] 

Badai Aqrandista commented on KAFKA-7467:
-

Changed Priority to Major.

> NoSuchElementException is raised because controlBatch is empty
> --
>
> Key: KAFKA-7467
> URL: https://issues.apache.org/jira/browse/KAFKA-7467
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Badai Aqrandista
>Assignee: Bob Barrett
>Priority: Major
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
> consumeAbortedTxnsUpTo(controlBatch.lastOffset)
> val controlRecord = controlBatch.iterator.next()
> val controlType = ControlRecordType.parse(controlRecord.key)
> val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty 
> batches, which results in empty control batches. Trying to read the control 
> type of an empty batch causes the error.
> {noformat}
>   else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
> if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
> throw new IllegalStateException("Empty batches are only supported for 
> magic v2 and above");
> 
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
> DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
> batchMagic, batch.producerId(),
> batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), 
> batch.lastOffset(),
> batch.partitionLeaderEpoch(), batch.timestampType(), 
> batch.maxTimestamp(),
> batch.isTransactional(), batch.isControlBatch());
> filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-01 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634878#comment-16634878
 ] 

Ismael Juma commented on KAFKA-7467:


How come this is considered minor? It seems severe since it kills the cleaner.

> NoSuchElementException is raised because controlBatch is empty
> --
>
> Key: KAFKA-7467
> URL: https://issues.apache.org/jira/browse/KAFKA-7467
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Badai Aqrandista
>Assignee: Bob Barrett
>Priority: Minor
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
> consumeAbortedTxnsUpTo(controlBatch.lastOffset)
> val controlRecord = controlBatch.iterator.next()
> val controlType = ControlRecordType.parse(controlRecord.key)
> val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty 
> batches, which results in empty control batches. Trying to read the control 
> type of an empty batch causes the error.
> {noformat}
>   else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
> if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
> throw new IllegalStateException("Empty batches are only supported for 
> magic v2 and above");
> 
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
> DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
> batchMagic, batch.producerId(),
> batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), 
> batch.lastOffset(),
> batch.partitionLeaderEpoch(), batch.timestampType(), 
> batch.maxTimestamp(),
> batch.isTransactional(), batch.isControlBatch());
> filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)