Boyang Chen created KAFKA-9999:
----------------------------------

             Summary: Topic description should be triggered after each failed 
topic creation iteration 
                 Key: KAFKA-9999
                 URL: https://issues.apache.org/jira/browse/KAFKA-9999
             Project: Kafka
          Issue Type: Bug
          Components: admin, streams
    Affects Versions: 2.4.0
            Reporter: Boyang Chen


We spotted a case in system test failure where the topic already exists but the 
admin client still attempts to recreate it:

 
{code:java}
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably 
marked for deletion (number of partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of 
partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-uwin-cnt-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager) 
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of 
partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-cntByCnt-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,120] INFO stream-thread [main] Topics 
[SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog, 
SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made 
ready with 5 retries left 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics 
after 5 retries. This can happen if the Kafka cluster is temporary not 
available. You can increase admin client config `retries` to be resilient 
against this error. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,221] ERROR stream-thread 
[SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the 
following unexpected Kafka exception during processing, this usually indicate 
Streams internal errors: 
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: Could not create topics after 
5 retries. This can happen if the Kafka cluster is temporary not available. You 
can increase admin client config `retries` to be resilient against this error.
        at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
 
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
 
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
 
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743){code}

Looking closer it seems that during the topic description phase we didn't find 
the topic due to temporary unavailability of brokers. However, we gave up 
retrying the topic description after the first run. A more resilient approach 
should be retrying the topic description to make sure we could filter topics 
that are already created and have expected number of partitions so that we 
don't wait for the unrealistic pending-to-remove phase.

Another angle to this problem is that we don't know as of today if a topic is 
pending deletion or running properly. We could discuss a follow-up effort to 
reflect that information as part of topic description result.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to