[ 
https://issues.apache.org/jira/browse/KAFKA-9203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986927#comment-16986927
 ] 

ASF GitHub Bot commented on KAFKA-9203:
---------------------------------------

ijuma commented on pull request #7769: KAFKA-9203: Revert "MINOR: Remove 
workarounds for lz4-java bug affecting byte buffers (#6679)"
URL: https://github.com/apache/kafka/pull/7769
 
 
   This reverts commit 90043d5f as it caused a regression in some cases:
   
   > Caused by: java.io.IOException: Stream frame descriptor corrupted
   >         at 
org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
   >         at 
org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
   >         at 
org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
   
   I will investigate why after, but I want to get the safe fix into 2.4.0.
   The reporter of KAFKA-9203 has verified that reverting this change
   makes the problem go away.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> kafka-client 2.3.1 fails to consume lz4 compressed topic in kafka 2.1.1
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-9203
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9203
>             Project: Kafka
>          Issue Type: Bug
>          Components: compression, consumer
>    Affects Versions: 2.3.0, 2.3.1
>            Reporter: David Watzke
>            Priority: Critical
>         Attachments: kafka-clients-2.3.2-SNAPSHOT.jar
>
>
> I run kafka cluster 2.1.1
> when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead 
> of 2.2.0, I immediately started getting the following exceptions in a loop 
> when consuming a topic with LZ4-compressed messages:
> {noformat}
> 2019-11-18 11:59:16.888 ERROR [afka-consumer-thread]     
> com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred 
> while polling and processing messages: org.apache.kafka.common.KafkaExce
> ption: Received exception when fetching the next record from 
> FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue 
> consumption. 
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the 
> record to continue consumption. 
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) 
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
>         at 
> com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180)
>  
>         at 
> com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$4(RequestSaver.scala:18) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$4$adapted(RequestSaver.scala:17)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$2(RequestSaver.scala:17) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$2$adapted(RequestSaver.scala:16)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at com.avast.filerep.saver.RequestSaver$.main(RequestSaver.scala:16) 
>         at com.avast.filerep.saver.RequestSaver.main(RequestSaver.scala) 
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: 
> Stream frame descriptor corrupted 
>         at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113)
>  
>         at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257)
>  
>         at 
> org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:335)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1450)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1487)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
>  
>         ... 70 common frames omitted 
> Caused by: java.io.IOException: Stream frame descriptor corrupted 
>         at 
> org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
>  
>         at 
> org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
>  
>         at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
>  
>         ... 78 common frames omitted
> {noformat}
>  (the producer app is using kafka client 0.10.2.1)
> I retried with a new consumer group but it didn't help. Kafka-client 
> downgrade back to 2.2.0 helped. This makes me think LZ4 may be broken in 
> kafka-client 2.3.x



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to