PoAn Yang created KAFKA-17581:
---------------------------------
Summary: AsyncKafkaConsumer can't unsubscribe invalid topics
Key: KAFKA-17581
URL: https://issues.apache.org/jira/browse/KAFKA-17581
Project: Kafka
Issue Type: Bug
Reporter: PoAn Yang
Assignee: PoAn Yang
When consumer subscribes an invalid topic name like " this is test", classic
consumer can unsubscribe without error. However, async consumer can't. We can
use following integration test to validate:
{code:java}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = {
// Invalid topic name due to space
val invalidTopicName = "topic abc"
val consumer = createConsumer()
consumer.subscribe(List(invalidTopicName).asJava)
var exception : InvalidTopicException = null
TestUtils.waitUntilTrue(() => {
try consumer.poll(Duration.ofMillis(500)) catch {
case e : InvalidTopicException => exception = e
case e : Throwable => fail(s"An InvalidTopicException should be thrown.
But ${e.getClass} is thrown")
}
exception != null
}, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")
assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
// AsyncKafkaConsumer sends request in background thread. Wait enough time to
send next request.
Thread.sleep(1000)
assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.unsubscribe()
})
}{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)