[
https://issues.apache.org/jira/browse/KAFKA-17286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872433#comment-17872433
]
Lianet Magrans edited comment on KAFKA-17286 at 8/9/24 7:09 PM:
----------------------------------------------------------------
[~brandboat] in case it helps, since we're subscribing to an invalid topic,
that's probably staying in the network layer as an unrecoverable error that's
propagated in the close as the log shows. We should understand why exactly
that's not the case with the classic consumer first (I expect that the classic
also keeps the invalid topic error as unrecoverable but it's just not polling
on close?). If we get to the point where we understand that the new consumer
close is not doing anything unexpected, then we just probably need to close
explicitly on this test, expecting that it may throw the InvalidTopic wrapped
into a KafkaException?
was (Author: JIRAUSER300183):
[~brandboat] in case it helps, since we're subscribing to an invalid topic,
that's probably staying in the network layer as an unrecoverable error that's
propagated in the close as the log shows. We should understand why exactly
that's not the case with the classic consumer first (I expect that the classic
also keeps the invalid topic error as unrecoverable but it's just not polling
on close?). If we get to the point where we understand that the new consumer
close is not doing anything unexpected, then we just probably need to close
explicitly on this test, expecting that it may throw the InvalidTopic wrapped
into a KafkaException.
> Flaky PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopic
> -----------------------------------------------------------------
>
> Key: KAFKA-17286
> URL: https://issues.apache.org/jira/browse/KAFKA-17286
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Kuan Po Tseng
> Assignee: Kuan Po Tseng
> Priority: Critical
> Labels: flaky-test, integration-test, integration-tests,
> kip-848-client-support
> Fix For: 4.0.0
>
>
> This test is flaky under `kraft+kip848.groupProtocol=consumer`
>
> {code:java}
> Gradle Test Run :core:test > Gradle Test Executor 335 >
> PlaintextConsumerSubscriptionTest > testSubscribeInvalidTopic(String, String)
> > testSubscribeInvalidTopic(String,
> String).quorum=kraft+kip848.groupProtocol=consumer FAILED
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer
> at
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1265)
> at
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1220)
> at
> app//org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
> at
> app//kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:226)
> at
> app//kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:226)
> at
> app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
> at
> app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
> at app//scala.collection.AbstractIterable.foreach(Iterable.scala:935)
> at
> app//kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:226)
> at [email protected]/java.lang.reflect.Method.invoke(Method.java:580)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024)
> at
> [email protected]/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
> at
> [email protected]/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024)
> at
> [email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> [email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
> at
> [email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> [email protected]/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at
> [email protected]/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> [email protected]/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
> at
> [email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> [email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
> at
> [email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> [email protected]/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at
> [email protected]/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at
> [email protected]/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
> at
> [email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> [email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
> at
> [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
> at
> [email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> [email protected]/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1596)
> at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1596)
> Caused by:
> org.apache.kafka.common.errors.InvalidTopicException: Invalid topics:
> [topic abc] {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)