[jira] [Commented] (KAFKA-6587) Kafka Streams hangs when not able to access internal topics
[ https://issues.apache.org/jira/browse/KAFKA-6587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549952#comment-16549952 ] Guozhang Wang commented on KAFKA-6587: -- This has been resolved as of https://issues.apache.org/jira/browse/KAFKA-5037. > Kafka Streams hangs when not able to access internal topics > --- > > Key: KAFKA-6587 > URL: https://issues.apache.org/jira/browse/KAFKA-6587 > Project: Kafka > Issue Type: Bug > Components: security, streams >Affects Versions: 1.0.0 >Reporter: Chris Medved >Priority: Minor > Fix For: 2.1.0 > > > *Expectation:* Kafka Streams client will throw an exception, log errors, or > crash when a fatal error occurs. > *Observation:* Kafka Streams does not log an error or throw an exception when > necessary permissions for internal state store topics are not granted. It > will hang indefinitely and not start running the topology. > *Steps to reproduce:* > # Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found > should be set to false, or deny permissions must be set on the intermediate > topics). > # Create a simple streams application that does a stateful operation such as > count. > # Grant ACLs on source and sink topics to principal used for testing (would > recommend using ANONYMOUS user if possible for ease of testing). > # Grant ACLs for consumer group and cluster create. Add deny permissions to > state store topics if the default is "allow". You can run the application to > create the topics or use the toplogy describe method to get the names. > # Run streams application. It should hang on "(Re-)joining group" with no > errors printed. > *Detailed Explanation* > I spent some time trying to figure out what was wrong with my streams app. > I'm using ACLs on my Kafka cluster and it turns out I forgot to grant > read/write access to the internal topic state store for an aggregation. > The streams client would hang on "(Re-)joining group" until killed (note ^C > is ctrl+c, which I used to kill the app): > {code:java} > 10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id: > 2147483647 rack: null) > 10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] Revoking previously assigned partitions [] > 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] State transition from RUNNING to > PARTITIONS_REVOKED > 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [kafka-consumer-client]State transition from RUNNING to REBALANCING > 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] partition revocation took 1 ms. > suspended active tasks: [] > suspended standby tasks: [] > 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] (Re-)joining group > ^C > 10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams - > stream-client [kafka-consumer-client]State transition from REBALANCING to > PENDING_SHUTDOWN > 10:34:53.610 [Thread-3] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] Informed to shut down > 10:34:53.610 [Thread-3] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] State transition from > PARTITIONS_REVOKED to PENDING_SHUTDOWN{code} > The server log would show: > {code:java} > [2018-02-23 10:29:10,408] INFO [Partition > kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-05-changelog-0 > broker=0] > kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-05-changelog-0 > starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 > (kafka.cluster.Partition) > [2018-02-23 10:29:20,143] INFO [GroupCoordinator 0]: Member > kafka-consumer-client-StreamThread-1-consumer-f86e4ca8-4c > 45-4883-bdaa-2383193eabbe in group kafka-consumer-test has failed, removing > it from the group (kafka.coordinator.group.GroupCoordinator) > [2018-02-23 10:29:20,143] INFO [GroupCoordinator
[jira] [Commented] (KAFKA-6587) Kafka Streams hangs when not able to access internal topics
[ https://issues.apache.org/jira/browse/KAFKA-6587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524306#comment-16524306 ] Guozhang Wang commented on KAFKA-6587: -- I looked into the source code, and I believe this issue still exists in trunk, and the security setting is just one of the scenarios that can triggers it. The root cause is the same as https://issues.apache.org/jira/browse/KAFKA-5037. Here is the reasoning: Without the missed setting, the TopicMetadata response will filter out any topics that are not granted permission, and hence the {{metadata}} object passed in {{assign(Cluster metadata, Map subscriptions)}} will not contain this topic, and hence it will be blocked as described in KAFKA-5037. > Kafka Streams hangs when not able to access internal topics > --- > > Key: KAFKA-6587 > URL: https://issues.apache.org/jira/browse/KAFKA-6587 > Project: Kafka > Issue Type: Bug > Components: security, streams >Affects Versions: 1.0.0 >Reporter: Chris Medved >Priority: Minor > > *Expectation:* Kafka Streams client will throw an exception, log errors, or > crash when a fatal error occurs. > *Observation:* Kafka Streams does not log an error or throw an exception when > necessary permissions for internal state store topics are not granted. It > will hang indefinitely and not start running the topology. > *Steps to reproduce:* > # Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found > should be set to false, or deny permissions must be set on the intermediate > topics). > # Create a simple streams application that does a stateful operation such as > count. > # Grant ACLs on source and sink topics to principal used for testing (would > recommend using ANONYMOUS user if possible for ease of testing). > # Grant ACLs for consumer group and cluster create. Add deny permissions to > state store topics if the default is "allow". You can run the application to > create the topics or use the toplogy describe method to get the names. > # Run streams application. It should hang on "(Re-)joining group" with no > errors printed. > *Detailed Explanation* > I spent some time trying to figure out what was wrong with my streams app. > I'm using ACLs on my Kafka cluster and it turns out I forgot to grant > read/write access to the internal topic state store for an aggregation. > The streams client would hang on "(Re-)joining group" until killed (note ^C > is ctrl+c, which I used to kill the app): > {code:java} > 10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id: > 2147483647 rack: null) > 10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] Revoking previously assigned partitions [] > 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] State transition from RUNNING to > PARTITIONS_REVOKED > 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [kafka-consumer-client]State transition from RUNNING to REBALANCING > 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] partition revocation took 1 ms. > suspended active tasks: [] > suspended standby tasks: [] > 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] (Re-)joining group > ^C > 10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams - > stream-client [kafka-consumer-client]State transition from REBALANCING to > PENDING_SHUTDOWN > 10:34:53.610 [Thread-3] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] Informed to shut down > 10:34:53.610 [Thread-3] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] State transition from > PARTITIONS_REVOKED to PENDING_SHUTDOWN{code} > The server log would show: > {code:java} > [2018-02-23 10:29:10,408] INFO [Partition > kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-05-changelog-0 > broker=0] >
[jira] [Commented] (KAFKA-6587) Kafka Streams hangs when not able to access internal topics
[ https://issues.apache.org/jira/browse/KAFKA-6587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374967#comment-16374967 ] Matthias J. Sax commented on KAFKA-6587: Thanks for reporting this. For upcoming 1.1 release, we removed an internal client for topic administration within Kafka Streams and replaced it with {{KafkaAdminClient}}.(cf. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier] or details). Would it be possible for you, to test this scenario with {{1.1.0-SNAPSHOT}} version to see if it is fixed there? > Kafka Streams hangs when not able to access internal topics > --- > > Key: KAFKA-6587 > URL: https://issues.apache.org/jira/browse/KAFKA-6587 > Project: Kafka > Issue Type: Bug > Components: security, streams >Affects Versions: 1.0.0 >Reporter: Chris Medved >Priority: Minor > > *Expectation:* Kafka Streams client will throw an exception, log errors, or > crash when a fatal error occurs. > *Observation:* Kafka Streams does not log an error or throw an exception when > necessary permissions for internal state store topics are not granted. It > will hang indefinitely and not start running the topology. > *Steps to reproduce:* > # Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found > should be set to false, or deny permissions must be set on the intermediate > topics). > # Create a simple streams application that does a stateful operation such as > count. > # Grant ACLs on source and sink topics to principal used for testing (would > recommend using ANONYMOUS user if possible for ease of testing). > # Grant ACLs for consumer group and cluster create. Add deny permissions to > state store topics if the default is "allow". You can run the application to > create the topics or use the toplogy describe method to get the names. > # Run streams application. It should hang on "(Re-)joining group" with no > errors printed. > *Detailed Explanation* > I spent some time trying to figure out what was wrong with my streams app. > I'm using ACLs on my Kafka cluster and it turns out I forgot to grant > read/write access to the internal topic state store for an aggregation. > The streams client would hang on "(Re-)joining group" until killed (note ^C > is ctrl+c, which I used to kill the app): > {code:java} > 10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id: > 2147483647 rack: null) > 10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] Revoking previously assigned partitions [] > 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] State transition from RUNNING to > PARTITIONS_REVOKED > 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [kafka-consumer-client]State transition from RUNNING to REBALANCING > 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] partition revocation took 1 ms. > suspended active tasks: [] > suspended standby tasks: [] > 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] (Re-)joining group > ^C > 10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams - > stream-client [kafka-consumer-client]State transition from REBALANCING to > PENDING_SHUTDOWN > 10:34:53.610 [Thread-3] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] Informed to shut down > 10:34:53.610 [Thread-3] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] State transition from > PARTITIONS_REVOKED to PENDING_SHUTDOWN{code} > The server log would show: > {code:java} > [2018-02-23 10:29:10,408] INFO [Partition > kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-05-changelog-0 > broker=0] > kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-05-changelog-0 > starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 > (kafka.cluster.Partition) >