[
https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071393#comment-17071393
]
Boyang Chen commented on KAFKA-8677:
------------------------------------
This also happens for trunk, FYI:
[2020-03-30T17:12:15-07:00]
(streams-soak-trunk_soak_i-0f9c49376aaf33346_streamslog) [2020-03-31
00:12:14,667] ERROR
[stream-soak-test-a3babca8-96ba-4db2-b378-730a1b9f7c0f-StreamThread-1]
stream-thread
[stream-soak-test-a3babca8-96ba-4db2-b378-730a1b9f7c0f-StreamThread-1]
Encountered the following exception during processing and the thread is going
to shut down: (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-03-30T17:12:15-07:00]
(streams-soak-trunk_soak_i-0f9c49376aaf33346_streamslog)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'version': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
at
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeVersion(ConsumerProtocol.java:124)
at
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:304)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:352)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:491)
at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1264)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1205)
> Flakey test
> GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> ------------------------------------------------------------------------------------------------
>
> Key: KAFKA-8677
> URL: https://issues.apache.org/jira/browse/KAFKA-8677
> Project: Kafka
> Issue Type: Bug
> Components: core, security, unit tests
> Affects Versions: 2.5.0
> Reporter: Boyang Chen
> Assignee: Guozhang Wang
> Priority: Critical
> Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest >
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00*
> kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> failed, log available in
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
> *18:44:00* kafka.api.GroupEndToEndAuthorizationTest >
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00*
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before
> timeout instead of the expected 1 records
> ---------------------------
> I found this flaky test is actually exposing a real bug in consumer: within
> {{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch
> request before returning the data in order to pipelining the fetch requests:
> {code}
> if (!records.isEmpty()) {
> // before returning the fetched records, we can send off
> the next round of fetches
> // and avoid block waiting for their responses to enable
> pipelining while the user
> // is handling the fetched records.
> //
> // NOTE: since the consumed position has already been
> updated, we must not allow
> // wakeups or any other errors to be triggered prior to
> returning the fetched records.
> if (fetcher.sendFetches() > 0 ||
> client.hasPendingRequests()) {
> client.pollNoWakeup();
> }
> return this.interceptors.onConsume(new
> ConsumerRecords<>(records));
> }
> {code}
> As the NOTE mentioned, this pollNoWakeup should NOT throw any exceptions,
> since at this point the fetch position has been updated. If an exception is
> thrown here, and the callers decides to capture and continue, those records
> would never be returned again, causing data loss.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)