[ 
https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071383#comment-17071383
 ] 

Guozhang Wang commented on KAFKA-8677:
--------------------------------------

I'm going to investigate the newly reported error here, it's also been reported 
at least another time as

{code}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 65541, only 5 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:345)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:725)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:839)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1310)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1250)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
at kafka.utils.TestUtils$.$anonfun$pollRecordsUntilTrue$1(TestUtils.scala:819)
at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:864)
at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1382)
at 
kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:537)
at 
kafka.api.EndToEndAuthorizationTest.consumeRecordsIgnoreOneAuthorizationException(EndToEndAuthorizationTest.scala:556)
at 
kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:376)
{code}

And in some soak test, we saw

{code}
[2020-03-30T15:32:21-07:00] 
(streams-soak-2-5_soak_i-002fe2f777065e280_streamslog) [2020-03-30 
22:32:20,765] ERROR 
[stream-soak-test-7da4ed8b-dc6c-4fd4-b14c-5fe68c71559e-StreamThread-1] 
stream-thread 
[stream-soak-test-7da4ed8b-dc6c-4fd4-b14c-5fe68c71559e-StreamThread-1] 
Encountered the following unexpected Kafka exception during processing, this 
usually indicate Streams internal errors: 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2020-03-30T15:32:21-07:00] 
(streams-soak-2-5_soak_i-002fe2f777065e280_streamslog) 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'version': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeVersion(ConsumerProtocol.java:124)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:304)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:349)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
{code}

{code}
[2020-03-27T21:13:01-05:00] 
(streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) [2020-03-28 
02:13:00,513] INFO 
[stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] 
[Consumer instanceId=ip-172-31-31-29.us-west-2.compute.internal-1, 
clientId=stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1-consumer,
 groupId=stream-soak-test] Successfully joined group with generation 3 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-27T21:13:01-05:00] 
(streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) [2020-03-28 
02:13:00,513] ERROR 
[stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] 
stream-thread 
[stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] 
Encountered the following unexpected Kafka exception during processing, this 
usually indicate Streams internal errors: 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2020-03-27T21:13:01-05:00] 
(streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'version': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeVersion(ConsumerProtocol.java:124)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:304)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:349)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
[2020-03-27T21:13:01-05:00] 
(streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) [2020-03-28 
02:13:00,514] INFO 
[stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] 
stream-thread 
[stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] State 
transition from RUNNING to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2020-03-27T21:13:01-05:00] 
(streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) [2020-03-28 
02:13:00,514] INFO 
[stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] 
stream-thread 
[stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] Shutting 
down (org.apache.kafka.streams.processor.internals.StreamThread)
{code}

> Flakey test 
> GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8677
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8677
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, security, unit tests
>    Affects Versions: 2.5.0
>            Reporter: Boyang Chen
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>  
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
> kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
>  *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00*     
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records
> ---------------------------
> I found this flaky test is actually exposing a real bug in consumer: within 
> {{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch 
> request before returning the data in order to pipelining the fetch requests:
> {code}
>                 if (!records.isEmpty()) {
>                     // before returning the fetched records, we can send off 
> the next round of fetches
>                     // and avoid block waiting for their responses to enable 
> pipelining while the user
>                     // is handling the fetched records.
>                     //
>                     // NOTE: since the consumed position has already been 
> updated, we must not allow
>                     // wakeups or any other errors to be triggered prior to 
> returning the fetched records.
>                     if (fetcher.sendFetches() > 0 || 
> client.hasPendingRequests()) {
>                         client.pollNoWakeup();
>                     }
>                     return this.interceptors.onConsume(new 
> ConsumerRecords<>(records));
>                 }
> {code}
> As the NOTE mentioned, this pollNoWakeup should NOT throw any exceptions, 
> since at this point the fetch position has been updated. If an exception is 
> thrown here, and the callers decides to capture and continue, those records 
> would never be returned again, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to