[ 
https://issues.apache.org/jira/browse/KAFKA-6587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6587.
----------------------------------
       Resolution: Fixed
         Assignee: Ted Yu
    Fix Version/s: 2.1.0

> Kafka Streams hangs when not able to access internal topics
> -----------------------------------------------------------
>
>                 Key: KAFKA-6587
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6587
>             Project: Kafka
>          Issue Type: Bug
>          Components: security, streams
>    Affects Versions: 1.0.0
>            Reporter: Chris Medved
>            Assignee: Ted Yu
>            Priority: Minor
>             Fix For: 2.1.0
>
>
> *Expectation:* Kafka Streams client will throw an exception, log errors, or 
> crash when a fatal error occurs.
> *Observation:* Kafka Streams does not log an error or throw an exception when 
> necessary permissions for internal state store topics are not granted. It 
> will hang indefinitely and not start running the topology.
> *Steps to reproduce:*
>  # Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found 
> should be set to false, or deny permissions must be set on the intermediate 
> topics).
>  # Create a simple streams application that does a stateful operation such as 
> count.
>  # Grant ACLs on source and sink topics to principal used for testing (would 
> recommend using ANONYMOUS user if possible for ease of testing).
>  # Grant ACLs for consumer group and cluster create. Add deny permissions to 
> state store topics if the default is "allow". You can run the application to 
> create the topics or use the toplogy describe method to get the names.
>  # Run streams application. It should hang on "(Re-)joining group" with no 
> errors printed.
> *Detailed Explanation*
> I spent some time trying to figure out what was wrong with my streams app. 
> I'm using ACLs on my Kafka cluster and it turns out I forgot to grant 
> read/write access to the internal topic state store for an aggregation.
> The streams client would hang on "(Re-)joining group" until killed (note ^C 
> is ctrl+c, which I used to kill the app): 
> {code:java}
> 10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=kafka-consumer-client-StreamThread-1-consumer, 
> groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id: 
> 2147483647 rack: null)
> 10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=kafka-consumer-client-StreamThread-1-consumer, 
> groupId=kafka-consumer-test] Revoking previously assigned partitions []
> 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [kafka-consumer-client-StreamThread-1] State transition from RUNNING to 
> PARTITIONS_REVOKED
> 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [kafka-consumer-client]State transition from RUNNING to REBALANCING
> 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [kafka-consumer-client-StreamThread-1] partition revocation took 1 ms.
> suspended active tasks: []
> suspended standby tasks: []
> 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=kafka-consumer-client-StreamThread-1-consumer, 
> groupId=kafka-consumer-test] (Re-)joining group
> ^C
> 10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams - 
> stream-client [kafka-consumer-client]State transition from REBALANCING to 
> PENDING_SHUTDOWN
> 10:34:53.610 [Thread-3] INFO 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [kafka-consumer-client-StreamThread-1] Informed to shut down
> 10:34:53.610 [Thread-3] INFO 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [kafka-consumer-client-StreamThread-1] State transition from 
> PARTITIONS_REVOKED to PENDING_SHUTDOWN{code}
> The server log would show:
> {code:java}
> [2018-02-23 10:29:10,408] INFO [Partition 
> kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-0 
> broker=0] 
> kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog-0 
> starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 
> (kafka.cluster.Partition)
> [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Member 
> kafka-consumer-client-StreamThread-1-consumer-f86e4ca8-4c
> 45-4883-bdaa-2383193eabbe in group kafka-consumer-test has failed, removing 
> it from the group (kafka.coordinator.group.GroupCoordinator)
> [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group kafka-consumer-test with old generation 1 (__consumer_offsets-23) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Group 
> kafka-consumer-test with generation 2 is now empty (__consumer_offsets-23) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-02-23 10:31:23,448] INFO [GroupMetadataManager brokerId=0] Group 
> kafka-consumer-test transitioned to Dead in generation 2 
> (kafka.coordinator.group.GroupMetadataManager){code}
> In this example, the internal topic was created. If the internal topic 
> already exists, it will try to create it again and fail with a "topic already 
> exists" exception (shown in the server log, not the client).
> The streams client then just remains stuck indefinitely. No errors or 
> warnings are printed, and it does not seem to actually shutdown at any point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to