[
https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lianet Magrans resolved KAFKA-17581.
------------------------------------
Resolution: Fixed
> AsyncKafkaConsumer can't unsubscribe invalid topics
> ---------------------------------------------------
>
> Key: KAFKA-17581
> URL: https://issues.apache.org/jira/browse/KAFKA-17581
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: PoAn Yang
> Assignee: PoAn Yang
> Priority: Major
> Labels: consumer-threading-refactor, kip-848-client-support
>
> When consumer subscribes an invalid topic name like " this is test", classic
> consumer can unsubscribe without error. However, async consumer can't. We can
> use following integration test to validate:
>
> {code:java}
> @ParameterizedTest(name =
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
> @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
> def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = {
> // Invalid topic name due to space
> val invalidTopicName = "topic abc"
> val consumer = createConsumer()
> consumer.subscribe(List(invalidTopicName).asJava)
> var exception : InvalidTopicException = null
> TestUtils.waitUntilTrue(() => {
> try consumer.poll(Duration.ofMillis(500)) catch {
> case e : InvalidTopicException => exception = e
> case e : Throwable => fail(s"An InvalidTopicException should be thrown.
> But ${e.getClass} is thrown")
> }
> exception != null
> }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")
> assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
> // AsyncKafkaConsumer sends request in background thread. Wait enough time
> to send next request.
> Thread.sleep(1000)
> assertDoesNotThrow(new Executable {
> override def execute(): Unit = consumer.unsubscribe()
> })
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)