[ 
https://issues.apache.org/jira/browse/KAFKA-9999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9999.
--------------------------------
    Resolution: Won't Fix

> Internal topic creation failure should be non-fatal and trigger explicit 
> rebalance 
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-9999
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9999
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin, streams
>    Affects Versions: 2.4.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> We spotted a case in system test failure where the topic already exists but 
> the admin client still attempts to recreate it:
>  
> {code:java}
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably 
> marked for deletion (number of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number 
> of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-uwin-cnt-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager) 
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number 
> of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-cntByCnt-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics 
> [SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog, 
> SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made 
> ready with 5 retries left 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics 
> after 5 retries. This can happen if the Kafka cluster is temporary not 
> available. You can increase admin client config `retries` to be resilient 
> against this error. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,221] ERROR stream-thread 
> [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered 
> the following unexpected Kafka exception during processing, this usually 
> indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: Could not create topics 
> after 5 retries. This can happen if the Kafka cluster is temporary not 
> available. You can increase admin client config `retries` to be resilient 
> against this error.
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743){code}
> Looking closer it seems that we don't know as of today if a topic is pending 
> deletion or running properly. We could discuss a follow-up effort to reflect 
> that information as part of topic description result.
> The current solution to this problem is to explicitly trigger a rebalance 
> when we run out of retries to unblock the group, as the short term 
> unavailability should be more likely a broker side unavailability.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to