[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964576#comment-16964576 ] Guozhang Wang commented on KAFKA-9073: -- [~simplyamuthan] You can find the explanation in the above PR, it is actually a pretty straight-forward bug. > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters java.lang.IllegalStateException: No current assignment for > partition > -- > > Key: KAFKA-9073 > URL: https://issues.apache.org/jira/browse/KAFKA-9073 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: amuthan Ganeshan >Priority: Major > Fix For: 2.4.0 > > Attachments: KAFKA-9073.log > > > I have a Kafka stream application that stores the incoming messages into a > state store, and later during the punctuation period, we store them into a > big data persistent store after processing the messages. > The application consumes from 120 partitions distributed across 40 instances. > The application has been running fine without any problem for months, but all > of a sudden some of the instances failed because of a stream thread exception > saying > ```java.lang.IllegalStateException: No current assignment for partition > --changelog-98``` > > And other instances are stuck in the REBALANCING state, and never comes out > of it. Here is the full stack trace, I just masked the application-specific > app name and store name in the stack trace due to NDA. > > ``` > 2019-10-21 13:27:13,481 ERROR > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > Encountered the following error during processing: > java.lang.IllegalStateException: No current assignment for partition > application.id-store_name-changelog-98 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > ``` > > Now I checked the state sore disk usage; it is less than 40% of the total > disk space available. Restarting the application solves the problem for a > short amount of time, but the error popping up randomly on some other > instances quickly. I tried to change the retry and retry.backoff.ms > configuration but not helpful at all > ``` > retries = 2147483647 > retry.backoff.ms > ``` > After googling for some time I found there was a similar bug reported to the > Kafka team in the past, and also notice my stack trace is exactly matching > with the stack trace of the reported bug. > Here is the link for the
[jira] [Resolved] (KAFKA-9114) Kafka broker fails to establish secure zookeeper connection via SSL.
[ https://issues.apache.org/jira/browse/KAFKA-9114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gangadhar Balikai resolved KAFKA-9114. -- Resolution: Invalid > Kafka broker fails to establish secure zookeeper connection via SSL. > > > Key: KAFKA-9114 > URL: https://issues.apache.org/jira/browse/KAFKA-9114 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Gangadhar Balikai >Priority: Minor > > When i try to enable TLS/SSL between Kafka broker (tried 2.3.0 && 2.3.1) and > zookeeper (3.5.5 & 3.5.6) cluster of 3 nodes. > kafka broker fails with following stack trace, i have given stacktrace, kafka > & zookeeper configurations used below. > *JDK*: 1_8_0_161_64 > {color:#de350b}[2019-10-30 03:52:10,036] ERROR Fatal error during KafkaServer > startup. Prepare to shutdown (kafka.server.KafkaServer){color} > {color:#de350b}java.io.IOException: Couldn't instantiate > org.apache.zookeeper.ClientCnxnSocketNetty{color} > {color:#de350b} at > org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1851){color} > {color:#de350b} at > org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:453){color} > {color:#de350b} at > org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:384){color} > {color:#de350b} at > kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:103){color} > {color:#de350b} at > kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826){color} > {color:#de350b} at > kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:364){color} > {color:#de350b} at > kafka.server.KafkaServer.initZkClient(KafkaServer.scala:387){color} > {color:#de350b} at > kafka.server.KafkaServer.startup(KafkaServer.scala:207){color} > {color:#de350b} at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38){color} > {color:#de350b} at kafka.Kafka$.main(Kafka.scala:84){color} > {color:#de350b} at kafka.Kafka.main(Kafka.scala){color} > {color:#de350b}Caused by: java.lang.NoSuchMethodException: > org.apache.zookeeper.ClientCnxnSocketNetty.(){color} > {color:#de350b} at java.lang.Class.getConstructor0(Class.java:3082){color} > {color:#de350b} at > java.lang.Class.getDeclaredConstructor(Class.java:2178){color} > {color:#de350b} at > org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1848){color} > {color:#de350b} ... 10 more{color} > {color:#de350b}[2019-10-30 03:52:10,039] INFO shutting down > (kafka.server.KafkaServer){color} > {color:#de350b}[2019-10-30 03:52:10,046] INFO shut down completed > (kafka.server.KafkaServer){color} > {color:#de350b}[2019-10-30 03:52:10,046] ERROR Exiting Kafka. > (kafka.server.KafkaServerStartable){color} > {color:#de350b}[2019-10-30 03:52:10,048] INFO shutting down > (kafka.server.KafkaServer){color} > STEPS. > 1) I copied following zookeeper dependencies into kafka bin. > a) kafka 2.3.0 and zookeer 3.5.5 > "zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" "netty*.jar" > "commons-cli-1.2.jar" > b) kafka 2.3.1 and zookeer 3.5.6 > "zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" > "netty-buffer-4.1.42.Final.jar" "netty-buffer-4.1.42.Final.LICENSE.txt" > "netty-codec-4.1.42.Final.jar" "netty-codec-4.1.42.Final.LICENSE.txt" > "netty-common-4.1.42.Final.jar" "netty-common-4.1.42.Final.LICENSE.txt" > "netty-handler-4.1.42.Final.jar" "netty-handler-4.1.42.Final.LICENSE.txt" > "netty-resolver-4.1.42.Final.jar" "netty-resolver-4.1.42.Final.LICENSE.txt" > "netty-transport-4.1.42.Final.jar" "netty-transport-4.1.42.Final.LICENSE.txt" > "netty-transport-native-epoll-4.1.42.Final.jar" > "netty-transport-native-epoll-4.1.42.Final.LICENSE.txt" > "netty-transport-native-unix-common-4.1.42.Final.jar" > "netty-transport-native-unix-common-4.1.42.Final.LICENSE.txt" > "commons-cli-1.2.jar" > *2) Configurations:* > The *zookeeper* cluster looks good with > 1) configuration *zoo.conf*. > {color:#505f79}quorum.auth.server.loginContext=QuorumServer{color} > {color:#505f79}quorum.auth.learner.loginContext=QuorumLearner{color} > {color:#505f79}syncLimit=2{color} > {color:#505f79}tickTime=2000{color} > {color:#505f79}server.3=broker1\:2888\:3888{color} > {color:#505f79}server.2=broker2\:2888\:3888{color} > {color:#505f79}server.1=broker3\:2888\:3888{color} > {color:#505f79}authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider{color} > {color:#505f79}initLimit=10{color} > {color:#505f79}secureClientPort=2281{color} > {color:#505f79}quorum.auth.learnerRequireSasl=true{color} > {color:#505f79}quorum.auth.enableSasl=true{color} > {color:#505f79}quorum.auth.kerberos.servicePrincipal=servicename/_HOST{color} > {color:#505f79}quorum.cnxn.threads.size=20{color} > {color:#505f79}zookeeper.client.secure=true{color} > {color:#505f79}quorum.auth.serverRequireSasl=true{color} >
[jira] [Commented] (KAFKA-9114) Kafka broker fails to establish secure zookeeper connection via SSL.
[ https://issues.apache.org/jira/browse/KAFKA-9114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964570#comment-16964570 ] Gangadhar Balikai commented on KAFKA-9114: -- found issue on my side. > Kafka broker fails to establish secure zookeeper connection via SSL. > > > Key: KAFKA-9114 > URL: https://issues.apache.org/jira/browse/KAFKA-9114 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0, 2.3.1 >Reporter: Gangadhar Balikai >Priority: Minor > > When i try to enable TLS/SSL between Kafka broker (tried 2.3.0 && 2.3.1) and > zookeeper (3.5.5 & 3.5.6) cluster of 3 nodes. > kafka broker fails with following stack trace, i have given stacktrace, kafka > & zookeeper configurations used below. > *JDK*: 1_8_0_161_64 > {color:#de350b}[2019-10-30 03:52:10,036] ERROR Fatal error during KafkaServer > startup. Prepare to shutdown (kafka.server.KafkaServer){color} > {color:#de350b}java.io.IOException: Couldn't instantiate > org.apache.zookeeper.ClientCnxnSocketNetty{color} > {color:#de350b} at > org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1851){color} > {color:#de350b} at > org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:453){color} > {color:#de350b} at > org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:384){color} > {color:#de350b} at > kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:103){color} > {color:#de350b} at > kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826){color} > {color:#de350b} at > kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:364){color} > {color:#de350b} at > kafka.server.KafkaServer.initZkClient(KafkaServer.scala:387){color} > {color:#de350b} at > kafka.server.KafkaServer.startup(KafkaServer.scala:207){color} > {color:#de350b} at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38){color} > {color:#de350b} at kafka.Kafka$.main(Kafka.scala:84){color} > {color:#de350b} at kafka.Kafka.main(Kafka.scala){color} > {color:#de350b}Caused by: java.lang.NoSuchMethodException: > org.apache.zookeeper.ClientCnxnSocketNetty.(){color} > {color:#de350b} at java.lang.Class.getConstructor0(Class.java:3082){color} > {color:#de350b} at > java.lang.Class.getDeclaredConstructor(Class.java:2178){color} > {color:#de350b} at > org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1848){color} > {color:#de350b} ... 10 more{color} > {color:#de350b}[2019-10-30 03:52:10,039] INFO shutting down > (kafka.server.KafkaServer){color} > {color:#de350b}[2019-10-30 03:52:10,046] INFO shut down completed > (kafka.server.KafkaServer){color} > {color:#de350b}[2019-10-30 03:52:10,046] ERROR Exiting Kafka. > (kafka.server.KafkaServerStartable){color} > {color:#de350b}[2019-10-30 03:52:10,048] INFO shutting down > (kafka.server.KafkaServer){color} > STEPS. > 1) I copied following zookeeper dependencies into kafka bin. > a) kafka 2.3.0 and zookeer 3.5.5 > "zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" "netty*.jar" > "commons-cli-1.2.jar" > b) kafka 2.3.1 and zookeer 3.5.6 > "zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" > "netty-buffer-4.1.42.Final.jar" "netty-buffer-4.1.42.Final.LICENSE.txt" > "netty-codec-4.1.42.Final.jar" "netty-codec-4.1.42.Final.LICENSE.txt" > "netty-common-4.1.42.Final.jar" "netty-common-4.1.42.Final.LICENSE.txt" > "netty-handler-4.1.42.Final.jar" "netty-handler-4.1.42.Final.LICENSE.txt" > "netty-resolver-4.1.42.Final.jar" "netty-resolver-4.1.42.Final.LICENSE.txt" > "netty-transport-4.1.42.Final.jar" "netty-transport-4.1.42.Final.LICENSE.txt" > "netty-transport-native-epoll-4.1.42.Final.jar" > "netty-transport-native-epoll-4.1.42.Final.LICENSE.txt" > "netty-transport-native-unix-common-4.1.42.Final.jar" > "netty-transport-native-unix-common-4.1.42.Final.LICENSE.txt" > "commons-cli-1.2.jar" > *2) Configurations:* > The *zookeeper* cluster looks good with > 1) configuration *zoo.conf*. > {color:#505f79}quorum.auth.server.loginContext=QuorumServer{color} > {color:#505f79}quorum.auth.learner.loginContext=QuorumLearner{color} > {color:#505f79}syncLimit=2{color} > {color:#505f79}tickTime=2000{color} > {color:#505f79}server.3=broker1\:2888\:3888{color} > {color:#505f79}server.2=broker2\:2888\:3888{color} > {color:#505f79}server.1=broker3\:2888\:3888{color} > {color:#505f79}authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider{color} > {color:#505f79}initLimit=10{color} > {color:#505f79}secureClientPort=2281{color} > {color:#505f79}quorum.auth.learnerRequireSasl=true{color} > {color:#505f79}quorum.auth.enableSasl=true{color} > {color:#505f79}quorum.auth.kerberos.servicePrincipal=servicename/_HOST{color} > {color:#505f79}quorum.cnxn.threads.size=20{color} > {color:#505f79}zookeeper.client.secure=true{color} >
[jira] [Created] (KAFKA-9127) Needless group coordination overhead for GlobalKTables
Chris Toomey created KAFKA-9127: --- Summary: Needless group coordination overhead for GlobalKTables Key: KAFKA-9127 URL: https://issues.apache.org/jira/browse/KAFKA-9127 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.0 Reporter: Chris Toomey When creating a simple stream topology to just populate a GlobalKTable, I noticed from logging that the stream consumer was doing group coordination requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to do since the global consumer thread populating the table fetches from all partitions and thus doesn't use the group requests. So this adds needless overhead on the client, network, and server. I tracked this down to the stream thread consumer, which is created regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG which defaults to 1 I guess. I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from happening, but it'd be a worthwhile improvement to be able to override this setting in cases of topologies like this that don't have any need for stream threads. Hence this ticket. I originally asked about this on the users mailing list where Bruno suggested I file it as an improvement request. Here's the Scala code that I'm using that exhibits this: {code:scala} val builder: StreamsBuilder = new StreamsBuilder() val gTable = builder.globalTable[K, V](...) val stream = new KafkaStreams(builder.build(), props) stream.start(){code} Not shown is the state store that I'm populating/using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id
[ https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx resolved KAFKA-9093. --- Resolution: Fixed > NullPointerException in KafkaConsumer with group.instance.id > > > Key: KAFKA-9093 > URL: https://issues.apache.org/jira/browse/KAFKA-9093 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0 >Reporter: Rolef Heinrich >Assignee: huxihx >Priority: Minor > Fix For: 2.3.2 > > > When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor > throws a NullpointerException in close(): > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) > {code} > {{It turns out that the exception is thrown because the *log* member is not > yet initialized (still null) in the constructor when the original exception > is handled. The original exception is thrown before *log* is initailized.}} > {{The side effect of this error is, that close does does not cleanup > resources as clean is supposed to do.}} > *{{The used consumer properties for reference:}}* > > {code:java} > key.deserializer=com.ibm.streamsx.kafka.serialization > request.timeout.ms=25000 > value.deserializer=com.ibm.streamsx.kafka.serialization > client.dns.lookup=use_all_dns_ips > metadata.max.age.ms=2000 > enable.auto.commit=false > group.instance.id=myUniqId[0] > max.poll.interval.ms=30 > group.id=consumer-0 > metric.reporters=com.ibm.streamsx.kafka.clients.consum... > reconnect.backoff.max.ms=1 > bootstrap.servers=localhost:9092 > max.poll.records=50 > session.timeout.ms=2 > client.id=C-J37-ReceivedMessages[0] > allow.auto.create.topics=false > metrics.sample.window.ms=1 > retry.backoff.ms=500 > reconnect.backoff.ms=250{code} > *Expected behaviour:* throw exception indicating that something is wrong with > the chosen group.instance.id. > The documentation does not tell anything about valid values for > group.instance.id. > *Reproduce:* > > > {code:java} > > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.KafkaConsumer; > public class Main { > public static void main(String[] args) { > Properties props = new Properties(); > props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1"); > props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]"); > props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > KafkaConsumer c = new KafkaConsumer (props); > } > } > Exception in thread "main" java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) > at Main.main(Main.java:15) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964526#comment-16964526 ] amuthan Ganeshan commented on KAFKA-9073: - I am more interested in understanding the root cause, [~guozhang] could you please explain the scenario. Also, is this bug fix going to be a minor release under 2.3 ? > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters java.lang.IllegalStateException: No current assignment for > partition > -- > > Key: KAFKA-9073 > URL: https://issues.apache.org/jira/browse/KAFKA-9073 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: amuthan Ganeshan >Priority: Major > Fix For: 2.4.0 > > Attachments: KAFKA-9073.log > > > I have a Kafka stream application that stores the incoming messages into a > state store, and later during the punctuation period, we store them into a > big data persistent store after processing the messages. > The application consumes from 120 partitions distributed across 40 instances. > The application has been running fine without any problem for months, but all > of a sudden some of the instances failed because of a stream thread exception > saying > ```java.lang.IllegalStateException: No current assignment for partition > --changelog-98``` > > And other instances are stuck in the REBALANCING state, and never comes out > of it. Here is the full stack trace, I just masked the application-specific > app name and store name in the stack trace due to NDA. > > ``` > 2019-10-21 13:27:13,481 ERROR > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > Encountered the following error during processing: > java.lang.IllegalStateException: No current assignment for partition > application.id-store_name-changelog-98 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > ``` > > Now I checked the state sore disk usage; it is less than 40% of the total > disk space available. Restarting the application solves the problem for a > short amount of time, but the error popping up randomly on some other > instances quickly. I tried to change the retry and retry.backoff.ms > configuration but not helpful at all > ``` > retries = 2147483647 > retry.backoff.ms > ``` > After googling for some time I found there was a similar bug reported to the > Kafka team in the past, and also notice my stack trace is exactly matching > with
[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964486#comment-16964486 ] ASF GitHub Bot commented on KAFKA-9073: --- guozhangwang commented on pull request #7630: KAFKA-9073: check assignment in requestFailed to avoid NPE URL: https://github.com/apache/kafka/pull/7630 This is a cherry-pick of the bug-fix included in https://github.com/apache/kafka/pull/6884 to 2.3 and older branch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters java.lang.IllegalStateException: No current assignment for > partition > -- > > Key: KAFKA-9073 > URL: https://issues.apache.org/jira/browse/KAFKA-9073 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: amuthan Ganeshan >Priority: Major > Fix For: 2.4.0 > > Attachments: KAFKA-9073.log > > > I have a Kafka stream application that stores the incoming messages into a > state store, and later during the punctuation period, we store them into a > big data persistent store after processing the messages. > The application consumes from 120 partitions distributed across 40 instances. > The application has been running fine without any problem for months, but all > of a sudden some of the instances failed because of a stream thread exception > saying > ```java.lang.IllegalStateException: No current assignment for partition > --changelog-98``` > > And other instances are stuck in the REBALANCING state, and never comes out > of it. Here is the full stack trace, I just masked the application-specific > app name and store name in the stack trace due to NDA. > > ``` > 2019-10-21 13:27:13,481 ERROR > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > Encountered the following error during processing: > java.lang.IllegalStateException: No current assignment for partition > application.id-store_name-changelog-98 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at >
[jira] [Commented] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state
[ https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964469#comment-16964469 ] ASF GitHub Bot commented on KAFKA-8972: --- guozhangwang commented on pull request #7617: KAFKA-8972 (2.4 blocker): bug fix for restoring task URL: https://github.com/apache/kafka/pull/7617 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback > state > > > Key: KAFKA-8972 > URL: https://issues.apache.org/jira/browse/KAFKA-8972 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > Fix For: 2.4.0 > > > Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the > following: > {code} > this.subscriptions.unsubscribe(); > this.coordinator.onLeavePrepare(); > this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); > {code} > And inside {{onLeavePrepare}} we would look into the assignment and try to > revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, > and then clear the assignment. > However, the subscription's assignment is already cleared in > {{this.subscriptions.unsubscribe();}} which means user's rebalance listener > would never be triggered. In other words, from consumer client's pov nothing > is owned after unsubscribe, but from the user caller's pov the partitions are > not revoked yet. For callers like Kafka Streams which rely on the rebalance > listener to maintain their internal state, this leads to inconsistent state > management and failure cases. > Before KIP-429 this issue is hidden away since every time the consumer > re-joins the group later, it would still revoke everything anyways regardless > of the passed-in parameters of the rebalance listener; with KIP-429 this is > easier to reproduce now. > I think we can summarize our fix as: > • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then > `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks > are all closed as revoked by then. > • [Optimization] If the generation is reset due to fatal error from join / hb > response etc, then we know that all partitions are lost, and we should not > trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside > `onLeavePrepare`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9113) Clean up task management
[ https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964468#comment-16964468 ] Guozhang Wang commented on KAFKA-9113: -- Also related to the above call: we should refactor the {{StreamTask.close}} function to be more robust to just close whatever is needed for a restoring task, or suspended, etc. More specifically we consider a task be: 1. created: topology not initialized, states not initialized. -> when close, basically nothing needs to be done 2. restoring: topology not initialized, states initialized. -> when close, only need to close the state manager 3. running: topology initialized, states initialized. -> when close, first suspend (close topology), and then close suspended 4. suspended: topology closed, states not closed. -> it is similar to restoring: we just call closeSuspended which only need to close the state manager > Clean up task management > > > Key: KAFKA-9113 > URL: https://issues.apache.org/jira/browse/KAFKA-9113 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > Along KIP-429 we did a lot of refactoring of the task management classes, > including the TaskManager and AssignedTasks (and children). While hopefully > easier to reason about there's still significant opportunity for further > cleanup including safer state tracking. Some potential improvements: > 1) Verify that no tasks are ever in more than one state at once. One > possibility is to just check that the suspended, created, restoring, and > running maps are all disjoint, but this begs the question of when and where > to do those checks, and how often. Another idea might be to put all tasks > into a single map and just track their state on a per-task basis. Whatever it > is should be aware that some methods are on the critical code path, and > should not be burdened with excessive safety checks (ie > AssignedStreamTasks#process). Alternatively, it seems to make sense to just > make each state its own type. We can then do some cleanup of the AbstractTask > and StreamTask classes, which currently contain a number of methods specific > to only one type/state of task. For example > * only active running tasks ever need to be suspendable, yet every task does > through suspend then closeSuspended during close. > * as the name suggests, closeSuspended should technically only ever apply to > suspended tasks > * the code paths needed to perform certain actions such as closing or > committing a task vary widely between the different states. A restoring task > need only close its state manager, but skipping the task.close call and > calling only closeStateManager has lead to confusion and time wasted trying > to remember or ask someone why that is sufficient > 2) Cleanup of closing and/or shutdown logic – there are some potential > improvements to be made here as well, for example AssignedTasks currently > implements a closeZombieTask method despite the fact that standby tasks are > never zombies. > 3) The StoreChangelogReader also interacts with (only) the > AssignedStreamsTasks class, through the TaskManager. It can be difficult to > reason about these interactions and the state of the changelog reader. > 4) All 4 classes and their state have very strict consistency requirements > that currently are almost impossible to verify, which has already resulted in > several bugs that we were lucky to catch in time. We should tighten up how > these classes manage their own state, and how the overall state is managed > between them, so that it is easy to make changes without introducing new bugs > because one class updated its own state without knowing it needed to tell > another class to also update its -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9113) Clean up task management
[ https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964462#comment-16964462 ] Guozhang Wang commented on KAFKA-9113: -- Dumping my thoughts here: for closing as zombies: created : call close restoring : call closeStateMgr running : call close suspended : call closeSuspended 3 and 4 makes sense to me, but 1/2 are a bit weird: for created tasks, we do not initialize topology nor state managers, but still we call close, whereas restoring ones (where we initialized state manager but not topology) we call closeStateMgr. Did a closer look at the code base, I think the reason is that in closeNonRunningTasks (which is triggered in onPartitionsRevoked) we actually trigger the following: task.close(false /* clean */, false /* isZombie */) I.e. we treat it as an "unclean" close, and hence we won't write checkpoint, and most importantly, when closeTopology throws (which would be the case since it is not initialized at all) we would just ignore it. So "accidentally" this becomes correct. This is not a great pattern, and I like @ableegoldman 's suggestion that we should consider making "close" call to be more robust in a follow-up PR, for now let's stay with what it is then. > Clean up task management > > > Key: KAFKA-9113 > URL: https://issues.apache.org/jira/browse/KAFKA-9113 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > Along KIP-429 we did a lot of refactoring of the task management classes, > including the TaskManager and AssignedTasks (and children). While hopefully > easier to reason about there's still significant opportunity for further > cleanup including safer state tracking. Some potential improvements: > 1) Verify that no tasks are ever in more than one state at once. One > possibility is to just check that the suspended, created, restoring, and > running maps are all disjoint, but this begs the question of when and where > to do those checks, and how often. Another idea might be to put all tasks > into a single map and just track their state on a per-task basis. Whatever it > is should be aware that some methods are on the critical code path, and > should not be burdened with excessive safety checks (ie > AssignedStreamTasks#process). Alternatively, it seems to make sense to just > make each state its own type. We can then do some cleanup of the AbstractTask > and StreamTask classes, which currently contain a number of methods specific > to only one type/state of task. For example > * only active running tasks ever need to be suspendable, yet every task does > through suspend then closeSuspended during close. > * as the name suggests, closeSuspended should technically only ever apply to > suspended tasks > * the code paths needed to perform certain actions such as closing or > committing a task vary widely between the different states. A restoring task > need only close its state manager, but skipping the task.close call and > calling only closeStateManager has lead to confusion and time wasted trying > to remember or ask someone why that is sufficient > 2) Cleanup of closing and/or shutdown logic – there are some potential > improvements to be made here as well, for example AssignedTasks currently > implements a closeZombieTask method despite the fact that standby tasks are > never zombies. > 3) The StoreChangelogReader also interacts with (only) the > AssignedStreamsTasks class, through the TaskManager. It can be difficult to > reason about these interactions and the state of the changelog reader. > 4) All 4 classes and their state have very strict consistency requirements > that currently are almost impossible to verify, which has already resulted in > several bugs that we were lucky to catch in time. We should tighten up how > these classes manage their own state, and how the overall state is managed > between them, so that it is easy to make changes without introducing new bugs > because one class updated its own state without knowing it needed to tell > another class to also update its -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-9073. -- Resolution: Fixed > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters java.lang.IllegalStateException: No current assignment for > partition > -- > > Key: KAFKA-9073 > URL: https://issues.apache.org/jira/browse/KAFKA-9073 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: amuthan Ganeshan >Priority: Major > Fix For: 2.4.0 > > Attachments: KAFKA-9073.log > > > I have a Kafka stream application that stores the incoming messages into a > state store, and later during the punctuation period, we store them into a > big data persistent store after processing the messages. > The application consumes from 120 partitions distributed across 40 instances. > The application has been running fine without any problem for months, but all > of a sudden some of the instances failed because of a stream thread exception > saying > ```java.lang.IllegalStateException: No current assignment for partition > --changelog-98``` > > And other instances are stuck in the REBALANCING state, and never comes out > of it. Here is the full stack trace, I just masked the application-specific > app name and store name in the stack trace due to NDA. > > ``` > 2019-10-21 13:27:13,481 ERROR > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > Encountered the following error during processing: > java.lang.IllegalStateException: No current assignment for partition > application.id-store_name-changelog-98 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > ``` > > Now I checked the state sore disk usage; it is less than 40% of the total > disk space available. Restarting the application solves the problem for a > short amount of time, but the error popping up randomly on some other > instances quickly. I tried to change the retry and retry.backoff.ms > configuration but not helpful at all > ``` > retries = 2147483647 > retry.backoff.ms > ``` > After googling for some time I found there was a similar bug reported to the > Kafka team in the past, and also notice my stack trace is exactly matching > with the stack trace of the reported bug. > Here is the link for the bug reported on a comparable basis a year ago. > https://issues.apache.org/jira/browse/KAFKA-7181 > > Now I am wondering is
[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964437#comment-16964437 ] Guozhang Wang commented on KAFKA-9073: -- We can resolve this as fixed in 2.4.0 only for now, once I have other PR merged in I will update the ticket. > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters java.lang.IllegalStateException: No current assignment for > partition > -- > > Key: KAFKA-9073 > URL: https://issues.apache.org/jira/browse/KAFKA-9073 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: amuthan Ganeshan >Priority: Major > Fix For: 2.4.0 > > Attachments: KAFKA-9073.log > > > I have a Kafka stream application that stores the incoming messages into a > state store, and later during the punctuation period, we store them into a > big data persistent store after processing the messages. > The application consumes from 120 partitions distributed across 40 instances. > The application has been running fine without any problem for months, but all > of a sudden some of the instances failed because of a stream thread exception > saying > ```java.lang.IllegalStateException: No current assignment for partition > --changelog-98``` > > And other instances are stuck in the REBALANCING state, and never comes out > of it. Here is the full stack trace, I just masked the application-specific > app name and store name in the stack trace due to NDA. > > ``` > 2019-10-21 13:27:13,481 ERROR > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > Encountered the following error during processing: > java.lang.IllegalStateException: No current assignment for partition > application.id-store_name-changelog-98 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > ``` > > Now I checked the state sore disk usage; it is less than 40% of the total > disk space available. Restarting the application solves the problem for a > short amount of time, but the error popping up randomly on some other > instances quickly. I tried to change the retry and retry.backoff.ms > configuration but not helpful at all > ``` > retries = 2147483647 > retry.backoff.ms > ``` > After googling for some time I found there was a similar bug reported to the > Kafka team in the past, and also notice my stack trace is exactly matching > with the stack trace of the reported bug. > Here is the link for the
[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964436#comment-16964436 ] Matthias J. Sax commented on KAFKA-9073: [~simplyamuthan] [~ableegoldman] [~guozhang] -- should we close this ticket as fixed? > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters java.lang.IllegalStateException: No current assignment for > partition > -- > > Key: KAFKA-9073 > URL: https://issues.apache.org/jira/browse/KAFKA-9073 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: amuthan Ganeshan >Priority: Major > Fix For: 2.4.0 > > Attachments: KAFKA-9073.log > > > I have a Kafka stream application that stores the incoming messages into a > state store, and later during the punctuation period, we store them into a > big data persistent store after processing the messages. > The application consumes from 120 partitions distributed across 40 instances. > The application has been running fine without any problem for months, but all > of a sudden some of the instances failed because of a stream thread exception > saying > ```java.lang.IllegalStateException: No current assignment for partition > --changelog-98``` > > And other instances are stuck in the REBALANCING state, and never comes out > of it. Here is the full stack trace, I just masked the application-specific > app name and store name in the stack trace due to NDA. > > ``` > 2019-10-21 13:27:13,481 ERROR > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > Encountered the following error during processing: > java.lang.IllegalStateException: No current assignment for partition > application.id-store_name-changelog-98 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > ``` > > Now I checked the state sore disk usage; it is less than 40% of the total > disk space available. Restarting the application solves the problem for a > short amount of time, but the error popping up randomly on some other > instances quickly. I tried to change the retry and retry.backoff.ms > configuration but not helpful at all > ``` > retries = 2147483647 > retry.backoff.ms > ``` > After googling for some time I found there was a similar bug reported to the > Kafka team in the past, and also notice my stack trace is exactly matching > with the stack trace of the reported bug. > Here is the link for the bug reported on a
[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964422#comment-16964422 ] Sophie Blee-Goldman commented on KAFKA-9088: It's still pretty awkward to make users initialize stores when unit testing, especially since we explicitly forbid and prevent them from doing so in "real" code. But I've been thinking, regardless of how we want to solve the store unit testing problem we should go ahead and put the new MockInternalProcessorContext (or whatever it'll be called) in the test-utils package. It doesn't seem that unlikely we might want or need it for something else in the future, given how ubiquitous the processor context is > Consolidate InternalMockProcessorContext and MockInternalProcessorContext > - > > Key: KAFKA-9088 > URL: https://issues.apache.org/jira/browse/KAFKA-9088 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Bruno Cadonna >Priority: Minor > Labels: newbie > > Currently, we have two mocks for the {{InternalProcessorContext}}. The goal > of this ticket is to merge both into one mock or replace it with an > {{EasyMock}} mock. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9072) Add Section to Streams Developer Guide for Topology Naming (KIP-307)
[ https://issues.apache.org/jira/browse/KAFKA-9072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964423#comment-16964423 ] ASF GitHub Bot commented on KAFKA-9072: --- bbejeck commented on pull request #7629: KAFKA-9072: Add Topology naming to the dev guide URL: https://github.com/apache/kafka/pull/7629 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Section to Streams Developer Guide for Topology Naming (KIP-307) > > > Key: KAFKA-9072 > URL: https://issues.apache.org/jira/browse/KAFKA-9072 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.4.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Labels: docs > Fix For: 2.5.0 > > > WIth KIP-307 users can name operators in a topology. Naming is important as > it can help with pinning state store, changelog topic, and repartition topic > names keeping the topology robust in the face of adding/removing operators in > a Kafka Streams DSL. We should add a section to the developer guide to > explain why this is important. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9113) Clean up task management
[ https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9113: --- Description: Along KIP-429 we did a lot of refactoring of the task management classes, including the TaskManager and AssignedTasks (and children). While hopefully easier to reason about there's still significant opportunity for further cleanup including safer state tracking. Some potential improvements: 1) Verify that no tasks are ever in more than one state at once. One possibility is to just check that the suspended, created, restoring, and running maps are all disjoint, but this begs the question of when and where to do those checks, and how often. Another idea might be to put all tasks into a single map and just track their state on a per-task basis. Whatever it is should be aware that some methods are on the critical code path, and should not be burdened with excessive safety checks (ie AssignedStreamTasks#process). Alternatively, it seems to make sense to just make each state its own type. We can then do some cleanup of the AbstractTask and StreamTask classes, which currently contain a number of methods specific to only one type/state of task. For example * only active running tasks ever need to be suspendable, yet every task does through suspend then closeSuspended during close. * as the name suggests, closeSuspended should technically only ever apply to suspended tasks * the code paths needed to perform certain actions such as closing or committing a task vary widely between the different states. A restoring task need only close its state manager, but skipping the task.close call and calling only closeStateManager has lead to confusion and time wasted trying to remember or ask someone why that is sufficient 2) Cleanup of closing and/or shutdown logic – there are some potential improvements to be made here as well, for example AssignedTasks currently implements a closeZombieTask method despite the fact that standby tasks are never zombies. 3) The StoreChangelogReader also interacts with (only) the AssignedStreamsTasks class, through the TaskManager. It can be difficult to reason about these interactions and the state of the changelog reader. 4) All 4 classes and their state have very strict consistency requirements that currently are almost impossible to verify, which has already resulted in several bugs that we were lucky to catch in time. We should tighten up how these classes manage their own state, and how the overall state is managed between them, so that it is easy to make changes without introducing new bugs because one class updated its own state without knowing it needed to tell another class to also update its was: Along KIP-429 we did a lot of refactoring of the task management classes, including the TaskManager and AssignedTasks (and children). While hopefully easier to reason about there's still significant opportunity for further cleanup including safer state tracking. Some potential improvements: 1) Verify that no tasks are ever in more than one state at once. One possibility is to just check that the suspended, created, restoring, and running maps are all disjoint, but this begs the question of when and where to do those checks, and how often. Another idea might be to put all tasks into a single map and just track their state on a per-task basis. Whatever it is should be aware that some methods are on the critical code path, and should not be burdened with excessive safety checks (ie AssignedStreamTasks#process) 2) Cleanup of closing and/or shutdown logic – there are some potential improvements to be made here as well, for example AssignedTasks currently implements a closeZombieTask method despite the fact that standby tasks are never zombies. 3) The StoreChangelogReader also interacts with (only) the AssignedStreamsTasks class, through the TaskManager. It can be difficult to reason about these interactions and the state of the changelog reader. 4) All 4 classes and their state have very strict consistency requirements that currently are almost impossible to verify, which has already resulted in several bugs that we were lucky to catch in time. We should tighten up how these classes manage their own state, and how the overall state is managed between them, so that it is easy to make changes without introducing new bugs because one class updated its own state without knowing it needed to tell another class to also update its > Clean up task management > > > Key: KAFKA-9113 > URL: https://issues.apache.org/jira/browse/KAFKA-9113 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >
[jira] [Commented] (KAFKA-9080) System Test Failure: MessageFormatChangeTest.testCompatibilty
[ https://issues.apache.org/jira/browse/KAFKA-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964417#comment-16964417 ] ASF GitHub Bot commented on KAFKA-9080: --- tuvtran commented on pull request #7628: KAFKA-9080: Addresses MessageFormatChangeTest.testCompatibilty with version 0.9.0.1 URL: https://github.com/apache/kafka/pull/7628 TODO ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > System Test Failure: MessageFormatChangeTest.testCompatibilty > - > > Key: KAFKA-9080 > URL: https://issues.apache.org/jira/browse/KAFKA-9080 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Manikumar >Assignee: Tu Tran >Priority: Blocker > Fix For: 2.4.0 > > > MessageFormatChangeTest tests are failing on 2.4 and trunk for 0.9.0.1 > version. > http://confluent-kafka-2-4-system-test-results.s3-us-west-2.amazonaws.com/2019-10-21--001.1571716233--confluentinc--2.4--cb4944f/report.html > {code} > Module: kafkatest.tests.client.message_format_change_test > Class: MessageFormatChangeTest > Method: test_compatibility > Arguments: > { > "consumer_version": "0.9.0.1", > "producer_version": "0.9.0.1" > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8737) TaskMigrated Exception while rebalancing kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-8737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964410#comment-16964410 ] KUMAR commented on KAFKA-8737: -- Hi Bill Is there a plan to fix this issue similar to earlier fix for KIP-6269? > TaskMigrated Exception while rebalancing kafka streams > -- > > Key: KAFKA-8737 > URL: https://issues.apache.org/jira/browse/KAFKA-8737 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 1.0.1 > Environment: 20 partitions > 1 topic > 8 Streamer service > topic-region-1 9 7841726 8236017 > 394291 > streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/ > > streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer > topic-region-1 15 7421710 7467666 > 45956 > streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/ > > streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer > topic-region-1 19 7737360 8120611 > 383251 > streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/ > > streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer > topic-region-1 >Reporter: KUMAR >Assignee: Bill Bejeck >Priority: Major > > Kafka streams throws following exception while restart of a stream client > service - > o.a.k.s.p.internals.StreamThread.? - stream-thread > [streams-subscriberstopic-region-1-32d968e3-f892-4772-a7a4-6f684d7e43c9-StreamThread-1] > Detected a task that got migrated to another thread. This implies that this > thread missed a rebalance and dropped out of the consumer group. Trying to > rejoin the consumer group now. > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > topic-region-1-12 should not change while restoring: old end offset 6286727, > current offset 6380997 > > Kafka version is 1.0.0 and we have back merged the fix for KIP-6269- > [https://github.com/apache/kafka/pull/4300/files#|https://github.com/apache/kafka/pull/4300/files] > However we observe that there seems to be an issue in rebalance when > "auto.offset.reset" is configured as "latest". Based on log analysis we see > following behavior - > # StreamThread starts a restore consumer > # While Fetching it gets offset out of range > o.a.k.c.consumer.internals.Fetcher.? - [Consumer > clientId=streams-subscriberstopic-region-1-11b2d7fb-11ce-4b0b-a40a-388d3c7b6bc9-StreamThread-1-restore- > consumer, groupId=] Fetch READ_UNCOMMITTED at offset 246431 for partition > topic-region-1-12 returned fetch data (error=OFFSET_OUT_OF_RANGE, > highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, > abortedTransactions = null, recordsSizeInBytes=0) > # Fetcher tries to reset the offset > # While reset the offset it appears it is changing the offset position and > causing TaskMigrated exception > Above test repeated with "auto.offset.reset" is configured as "earliest" does > not throw any TaskMigrated exception as in earliest case we are not reseting > the restore consumer position. > > Please let us know if this is possible and if a fix would be needed for the > offset reset piece when set to latest. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8558) KIP-479 - Add StreamJoined Overload to KStream#Join
[ https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8558: --- Description: To prevent a topology incompatibility with the release of 2.4 and the naming of Join operations we'll add an overloaded KStream#join method accepting a Materialized parameter. This will allow users to explicitly name state stores created by Kafka Streams in the join operation. The overloads will apply to all flavors of KStream#join (inner, left, and right). KIP-479: https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join was: To prevent a topology incompatibility with the release of 2.4 and the naming of Join operations we'll add an overloaded KStream#join method accepting a Materialized parameter. This will allow users to explicitly name state stores created by Kafka Streams in the join operation. The overloads will apply to all flavors of KStream#join (inner, left, and right). > KIP-479 - Add StreamJoined Overload to KStream#Join > > > Key: KAFKA-8558 > URL: https://issues.apache.org/jira/browse/KAFKA-8558 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Blocker > Labels: kip > Fix For: 2.4.0 > > > To prevent a topology incompatibility with the release of 2.4 and the naming > of Join operations we'll add an overloaded KStream#join method accepting a > Materialized parameter. This will allow users to explicitly name state stores > created by Kafka Streams in the join operation. > The overloads will apply to all flavors of KStream#join (inner, left, and > right). > KIP-479: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9126) Extend `StreamJoined` to allow more store configs
Matthias J. Sax created KAFKA-9126: -- Summary: Extend `StreamJoined` to allow more store configs Key: KAFKA-9126 URL: https://issues.apache.org/jira/browse/KAFKA-9126 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.4.0 Reporter: Matthias J. Sax In 2.4.0 release, we introduced `StreamJoined` configuration object via KIP-479 (KAFKA-8558). The idea of `StreamJoined` is to be an equivalent to `Materialized` but customized for stream-stream joines, that have two stores (in contrast to the usage of `Materialized` that is used for single-store operators). During the KIP discussion, the idea to allow setting the store retention time and enable/disable changelogging for the store was discussed. However, at some point this idea was dropped not unknown reasons (seems it slipped). We should consider to extend `StreamJoined` with `withRetentionPeriod()` and `loggingEnabled()`/`loggingDisabled()` methods to get feature parity to `Materialized`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8558) KIP-479 - Add StreamJoined Overload to KStream#Join
[ https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8558: --- Labels: kip (was: needs-kip) > KIP-479 - Add StreamJoined Overload to KStream#Join > > > Key: KAFKA-8558 > URL: https://issues.apache.org/jira/browse/KAFKA-8558 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Blocker > Labels: kip > Fix For: 2.4.0 > > > To prevent a topology incompatibility with the release of 2.4 and the naming > of Join operations we'll add an overloaded KStream#join method accepting a > Materialized parameter. This will allow users to explicitly name state stores > created by Kafka Streams in the join operation. > > The overloads will apply to all flavors of KStream#join (inner, left, and > right). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8558) KIP-479 - Add StreamJoined Overload to KStream#Join
[ https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8558: --- Summary: KIP-479 - Add StreamJoined Overload to KStream#Join (was: KIP-479 - Add Materialized Overload to KStream#Join ) > KIP-479 - Add StreamJoined Overload to KStream#Join > > > Key: KAFKA-8558 > URL: https://issues.apache.org/jira/browse/KAFKA-8558 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Blocker > Labels: needs-kip > Fix For: 2.4.0 > > > To prevent a topology incompatibility with the release of 2.4 and the naming > of Join operations we'll add an overloaded KStream#join method accepting a > Materialized parameter. This will allow users to explicitly name state stores > created by Kafka Streams in the join operation. > > The overloads will apply to all flavors of KStream#join (inner, left, and > right). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964218#comment-16964218 ] Guozhang Wang commented on KAFKA-9073: -- Thanks for the find-out, I can prepare a PR for 2.3 and try to cherry-pick to older branches :) > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters java.lang.IllegalStateException: No current assignment for > partition > -- > > Key: KAFKA-9073 > URL: https://issues.apache.org/jira/browse/KAFKA-9073 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: amuthan Ganeshan >Priority: Major > Fix For: 2.4.0 > > Attachments: KAFKA-9073.log > > > I have a Kafka stream application that stores the incoming messages into a > state store, and later during the punctuation period, we store them into a > big data persistent store after processing the messages. > The application consumes from 120 partitions distributed across 40 instances. > The application has been running fine without any problem for months, but all > of a sudden some of the instances failed because of a stream thread exception > saying > ```java.lang.IllegalStateException: No current assignment for partition > --changelog-98``` > > And other instances are stuck in the REBALANCING state, and never comes out > of it. Here is the full stack trace, I just masked the application-specific > app name and store name in the stack trace due to NDA. > > ``` > 2019-10-21 13:27:13,481 ERROR > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > Encountered the following error during processing: > java.lang.IllegalStateException: No current assignment for partition > application.id-store_name-changelog-98 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > ``` > > Now I checked the state sore disk usage; it is less than 40% of the total > disk space available. Restarting the application solves the problem for a > short amount of time, but the error popping up randomly on some other > instances quickly. I tried to change the retry and retry.backoff.ms > configuration but not helpful at all > ``` > retries = 2147483647 > retry.backoff.ms > ``` > After googling for some time I found there was a similar bug reported to the > Kafka team in the past, and also notice my stack trace is exactly matching > with the stack trace of the reported bug. > Here is the link for the bug reported on
[jira] [Created] (KAFKA-9125) GroupMetadataManager and TransactionStateManager should use metadata cache instead of zkClient
Viktor Somogyi-Vass created KAFKA-9125: -- Summary: GroupMetadataManager and TransactionStateManager should use metadata cache instead of zkClient Key: KAFKA-9125 URL: https://issues.apache.org/jira/browse/KAFKA-9125 Project: Kafka Issue Type: Sub-task Reporter: Viktor Somogyi-Vass Both classes query their respective topic's partition count via the zkClient. This however could be queried by the broker's local metadata cache. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9124) ISR changes should be propagated via Kafka protocol
[ https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-9124: --- Comment: was deleted (was: Opened by mistake.) > ISR changes should be propagated via Kafka protocol > --- > > Key: KAFKA-9124 > URL: https://issues.apache.org/jira/browse/KAFKA-9124 > Project: Kafka > Issue Type: Sub-task >Reporter: Viktor Somogyi-Vass >Priority: Major > > Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates > Zookeeper which is listened by the controller and that's how it notices the > ISR changes and sends out metadata requests. > Instead of this the brokers should use Kafka protocol messages to send out > ISR change notifications. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9124) ISR changes should be propagated via Kafka protocol
[ https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-9124: --- Description: Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates Zookeeper which is listened by the controller and that's how it notices the ISR changes and sends out metadata requests. Instead of this the brokers should use Kafka protocol messages to send out ISR change notifications. was: Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates Zookeeper which is listened by the controller and that's how it notices the ISR changes and sends out metadata requests. Instead of this the brokers should use Kafka protocol messages to send out isr change notifications. > ISR changes should be propagated via Kafka protocol > --- > > Key: KAFKA-9124 > URL: https://issues.apache.org/jira/browse/KAFKA-9124 > Project: Kafka > Issue Type: Sub-task >Reporter: Viktor Somogyi-Vass >Priority: Major > > Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates > Zookeeper which is listened by the controller and that's how it notices the > ISR changes and sends out metadata requests. > Instead of this the brokers should use Kafka protocol messages to send out > ISR change notifications. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9124) ISR changes should be propagated via Kafka protocol
[ https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-9124: --- Description: Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates Zookeeper which is listened by the controller and that's how it notices the ISR changes and sends out metadata requests. Instead of this the brokers should use Kafka protocol messages to send out isr change notifications. was:Currently {{KafkaApis}} invokes the {{adminManager}} (which esentially wraps the zkClient) to create a topic. Instead of this we should create a protocol that sends a request to the broker and listens for confirmation. > ISR changes should be propagated via Kafka protocol > --- > > Key: KAFKA-9124 > URL: https://issues.apache.org/jira/browse/KAFKA-9124 > Project: Kafka > Issue Type: Sub-task >Reporter: Viktor Somogyi-Vass >Priority: Major > > Currently {{Partition.expandIsr}} and {{Partition.shrinkIsr}} updates > Zookeeper which is listened by the controller and that's how it notices the > ISR changes and sends out metadata requests. > Instead of this the brokers should use Kafka protocol messages to send out > isr change notifications. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-9124) Topic creation should be done by the controller
[ https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass reopened KAFKA-9124: > Topic creation should be done by the controller > --- > > Key: KAFKA-9124 > URL: https://issues.apache.org/jira/browse/KAFKA-9124 > Project: Kafka > Issue Type: Sub-task >Reporter: Viktor Somogyi-Vass >Priority: Major > > Currently {{KafkaApis}} invokes the {{adminManager}} (which esentially wraps > the zkClient) to create a topic. Instead of this we should create a protocol > that sends a request to the broker and listens for confirmation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9124) ISR changes should be propagated via Kafka protocol
[ https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-9124: --- Summary: ISR changes should be propagated via Kafka protocol (was: Topic creation should be done by the controller) > ISR changes should be propagated via Kafka protocol > --- > > Key: KAFKA-9124 > URL: https://issues.apache.org/jira/browse/KAFKA-9124 > Project: Kafka > Issue Type: Sub-task >Reporter: Viktor Somogyi-Vass >Priority: Major > > Currently {{KafkaApis}} invokes the {{adminManager}} (which esentially wraps > the zkClient) to create a topic. Instead of this we should create a protocol > that sends a request to the broker and listens for confirmation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9124) Topic creation should be done by the controller
[ https://issues.apache.org/jira/browse/KAFKA-9124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass resolved KAFKA-9124. Resolution: Invalid Opened by mistake. > Topic creation should be done by the controller > --- > > Key: KAFKA-9124 > URL: https://issues.apache.org/jira/browse/KAFKA-9124 > Project: Kafka > Issue Type: Sub-task >Reporter: Viktor Somogyi-Vass >Priority: Major > > Currently {{KafkaApis}} invokes the {{adminManager}} (which esentially wraps > the zkClient) to create a topic. Instead of this we should create a protocol > that sends a request to the broker and listens for confirmation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9122) Externalizing DB password is not working
[ https://issues.apache.org/jira/browse/KAFKA-9122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964122#comment-16964122 ] Dwijadas commented on KAFKA-9122: - Hi After removing the quotes from the credential file, the connector managed to connect to the DB. {{$ cat /home/kfk/data/ora_credentials.properties ora.username=apps ora.password=Passw0rd!}} > Externalizing DB password is not working > > > Key: KAFKA-9122 > URL: https://issues.apache.org/jira/browse/KAFKA-9122 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.1 > Environment: CentOS 6.7 >Reporter: Dwijadas >Priority: Trivial > Attachments: Screenshot_1.png > > > Hi > I am trying to externalizing user name and password for oracle DB using > {{FileConfigProvider}} provider. > For that i have created a properties file that contains user name and > password. > > {{$ cat /home/kfk/data/ora_credentials.properties > ora.username="apps" > ora.password="Passw0rd!"}} > Added the config providers as file and also the config.providers.file.class > as FileConfigProvider in the worker config: > > {{$ cat /home/kfk/etc/kafka/connect-distributed.properties > ... > ... > config.providers=file > config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider > ... > ...}} > Restarted the worker and submitted a task using REST with the following config > > {{"config": \{ >"connector.class": > "io.confluent.connect.jdbc.JdbcSourceConnector", >"tasks.max": "1", > "connection.user": > "${file:/home/kfk/data/ora_credentials.properties:ora.username}", >"connection.password": > "${file:/home/kfk/data/ora_credentials.properties:ora.password}", >... >... > }}} > Submitting the above task resulting in the following error: > > {{{ > "error_code": 400, > "message": "Connector configuration is invalid and contains the following 2 > error(s):\nInvalid value java.sql.SQLException: ORA-01017: invalid > username/password; logon denied\n for configuration Couldn't open connection > to jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nInvalid value > java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n > for configuration Couldn't open connection to > jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nYou can also find the > above list of errors at the endpoint `/\{connectorType}/config/validate`" > }}} > Assuming the above config does not replaces the user name and password at all > rather entire values for connection.user and connection.password are used to > connect to the DB resulting in ORA-01017: invalid username/password error. > Is it a bug ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9122) Externalizing DB password is not working
[ https://issues.apache.org/jira/browse/KAFKA-9122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964095#comment-16964095 ] Dwijadas commented on KAFKA-9122: - In the distributed connector log, i can see value of username is correctly replaced from the properties file and the password is hidden. But still there is an error code of 400 about invalid values ? !Screenshot_1.png! > Externalizing DB password is not working > > > Key: KAFKA-9122 > URL: https://issues.apache.org/jira/browse/KAFKA-9122 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.1 > Environment: CentOS 6.7 >Reporter: Dwijadas >Priority: Trivial > Attachments: Screenshot_1.png > > > Hi > I am trying to externalizing user name and password for oracle DB using > {{FileConfigProvider}} provider. > For that i have created a properties file that contains user name and > password. > > {{$ cat /home/kfk/data/ora_credentials.properties > ora.username="apps" > ora.password="Passw0rd!"}} > Added the config providers as file and also the config.providers.file.class > as FileConfigProvider in the worker config: > > {{$ cat /home/kfk/etc/kafka/connect-distributed.properties > ... > ... > config.providers=file > config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider > ... > ...}} > Restarted the worker and submitted a task using REST with the following config > > {{"config": \{ >"connector.class": > "io.confluent.connect.jdbc.JdbcSourceConnector", >"tasks.max": "1", > "connection.user": > "${file:/home/kfk/data/ora_credentials.properties:ora.username}", >"connection.password": > "${file:/home/kfk/data/ora_credentials.properties:ora.password}", >... >... > }}} > Submitting the above task resulting in the following error: > > {{{ > "error_code": 400, > "message": "Connector configuration is invalid and contains the following 2 > error(s):\nInvalid value java.sql.SQLException: ORA-01017: invalid > username/password; logon denied\n for configuration Couldn't open connection > to jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nInvalid value > java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n > for configuration Couldn't open connection to > jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nYou can also find the > above list of errors at the endpoint `/\{connectorType}/config/validate`" > }}} > Assuming the above config does not replaces the user name and password at all > rather entire values for connection.user and connection.password are used to > connect to the DB resulting in ORA-01017: invalid username/password error. > Is it a bug ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9122) Externalizing DB password is not working
[ https://issues.apache.org/jira/browse/KAFKA-9122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dwijadas updated KAFKA-9122: Attachment: Screenshot_1.png > Externalizing DB password is not working > > > Key: KAFKA-9122 > URL: https://issues.apache.org/jira/browse/KAFKA-9122 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.1 > Environment: CentOS 6.7 >Reporter: Dwijadas >Priority: Trivial > Attachments: Screenshot_1.png > > > Hi > I am trying to externalizing user name and password for oracle DB using > {{FileConfigProvider}} provider. > For that i have created a properties file that contains user name and > password. > > {{$ cat /home/kfk/data/ora_credentials.properties > ora.username="apps" > ora.password="Passw0rd!"}} > Added the config providers as file and also the config.providers.file.class > as FileConfigProvider in the worker config: > > {{$ cat /home/kfk/etc/kafka/connect-distributed.properties > ... > ... > config.providers=file > config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider > ... > ...}} > Restarted the worker and submitted a task using REST with the following config > > {{"config": \{ >"connector.class": > "io.confluent.connect.jdbc.JdbcSourceConnector", >"tasks.max": "1", > "connection.user": > "${file:/home/kfk/data/ora_credentials.properties:ora.username}", >"connection.password": > "${file:/home/kfk/data/ora_credentials.properties:ora.password}", >... >... > }}} > Submitting the above task resulting in the following error: > > {{{ > "error_code": 400, > "message": "Connector configuration is invalid and contains the following 2 > error(s):\nInvalid value java.sql.SQLException: ORA-01017: invalid > username/password; logon denied\n for configuration Couldn't open connection > to jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nInvalid value > java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n > for configuration Couldn't open connection to > jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nYou can also find the > above list of errors at the endpoint `/\{connectorType}/config/validate`" > }}} > Assuming the above config does not replaces the user name and password at all > rather entire values for connection.user and connection.password are used to > connect to the DB resulting in ORA-01017: invalid username/password error. > Is it a bug ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9121) Mirror Maker 2.0 doesn't handle the topic names in consumer checkpoints properly when topic name contain separator
[ https://issues.apache.org/jira/browse/KAFKA-9121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964089#comment-16964089 ] Jakub Scholz commented on KAFKA-9121: - I wondered if the alias of the source/target cluster alias can be used to remove the prefix more elegantly instead of the current implementation which requires the separator to be unique. But TBH I don't know enough how MM2 works when you chain multiple clusters so I'm not sure it would be as easy as when I played with it with just two Kafka clusters. > Mirror Maker 2.0 doesn't handle the topic names in consumer checkpoints > properly when topic name contain separator > -- > > Key: KAFKA-9121 > URL: https://issues.apache.org/jira/browse/KAFKA-9121 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Jakub Scholz >Priority: Major > > I was trying the Kafka Mirror Maker 2.0 and run into the following situation: > 1) I have 2 Kafka clusters with topic {{kafka-test-apps}} topic > 2) I configured Mirror Maker with {{replication.policy.separator=-}} and with > mirroring between cluster {{a}} and {{b}}. > 3) When running Mirror Maker the mirroring of topics works fine. But when I > use the {{RemoteClusterUtils}} to recover the offsets, the names of the > topics for which the offsets are found are {{a-kafka-test-apps}} and > {{apps}}. While the expected topic names would be {{a-kafka-test-apps}} and > {{kafka-test-apps}}. > I tried to find the issue, but didn't found it so far. But it doesn't seem to > be in {{RemoteClusterUtils}} because the topic names seem to be wrong already > in {{checkpoints.internal}} topic. So it is probably already processed in the > wrong way in the source cluster. > When I use {{.}} as the separator, it seems to work fine for me. It looks > like the problem is only when the topci names contain already the separator > in the original topic name. But using the right separator might not be a > solution for this, because you migth have topics with different characters > and always have this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9120) Convert ./bin/kafka-reassign-partitions.sh to use KIP-455
[ https://issues.apache.org/jira/browse/KAFKA-9120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964077#comment-16964077 ] Stanislav Kozlovski commented on KAFKA-9120: This is tracked in https://issues.apache.org/jira/browse/KAFKA-8820 > Convert ./bin/kafka-reassign-partitions.sh to use KIP-455 > - > > Key: KAFKA-9120 > URL: https://issues.apache.org/jira/browse/KAFKA-9120 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin McCabe >Priority: Major > > We should convert ./bin/kafka-reassign-partitions.sh to use the KIP-455 API. > The KIP was already approved and the server-side work is done. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9123) Add system test with large number of partitions
[ https://issues.apache.org/jira/browse/KAFKA-9123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16964017#comment-16964017 ] ASF GitHub Bot commented on KAFKA-9123: --- mumrah commented on pull request #7621: KAFKA-9123 Test a large number of replicas URL: https://github.com/apache/kafka/pull/7621 TBD This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add system test with large number of partitions > --- > > Key: KAFKA-9123 > URL: https://issues.apache.org/jira/browse/KAFKA-9123 > Project: Kafka > Issue Type: Test > Components: system tests >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 2.5.0 > > > Let's add a system test with several thousand replicas and do some validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9123) Add system test with large number of partitions
David Arthur created KAFKA-9123: --- Summary: Add system test with large number of partitions Key: KAFKA-9123 URL: https://issues.apache.org/jira/browse/KAFKA-9123 Project: Kafka Issue Type: Test Components: system tests Reporter: David Arthur Assignee: David Arthur Fix For: 2.5.0 Let's add a system test with several thousand replicas and do some validation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6542) Tables should trigger joins too, not just streams
[ https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963970#comment-16963970 ] Chad Preisler commented on KAFKA-6542: -- [~mjsax] Thank you for the clarification. I was looking at the issue only from the perspective of bootstrapping the kTables on startup. The code appears to only get to a State.Running when all kTable partitions have been restored. It also looks like the partitions are only "restored" when they reach the endOffsets that were set when the application started. > Tables should trigger joins too, not just streams > - > > Key: KAFKA-6542 > URL: https://issues.apache.org/jira/browse/KAFKA-6542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > At the moment it's quite possible to have a race condition when joining a > stream with a table, if the stream event arrives first, before the table > event, in which case the join will fail. > This is also related to bootstrapping KTables (which is what a GKTable does). > Related to: KAFKA-4113 Allow KTable bootstrap -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9034) kafka-run-class.sh will fail if JAVA_HOME has space
[ https://issues.apache.org/jira/browse/KAFKA-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963923#comment-16963923 ] Xu JianHai commented on KAFKA-9034: --- new intellij java use "_" replace wiith blank space. > kafka-run-class.sh will fail if JAVA_HOME has space > --- > > Key: KAFKA-9034 > URL: https://issues.apache.org/jira/browse/KAFKA-9034 > Project: Kafka > Issue Type: Bug > Components: tools > Environment: macOS >Reporter: Fenjin Wang >Priority: Minor > Labels: easyfix, newbie > > If set JAVA_HOME to IntelliJ's java, bin/zookeeper-server-start.sh can't work > because the path has space in it. > > {quote}export JAVA_HOME="/Applications/IntelliJ > IDEA.app/Contents/jbr/Contents/Home/" > {quote} > > We can fix this by quote "$JAVA" in the shell script according to: > [https://stackoverflow.com/a/7740746/1203241] > > I can send a PR if you like. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9034) kafka-run-class.sh will fail if JAVA_HOME has space
[ https://issues.apache.org/jira/browse/KAFKA-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xu JianHai reassigned KAFKA-9034: - Assignee: (was: Xu JianHai) > kafka-run-class.sh will fail if JAVA_HOME has space > --- > > Key: KAFKA-9034 > URL: https://issues.apache.org/jira/browse/KAFKA-9034 > Project: Kafka > Issue Type: Bug > Components: tools > Environment: macOS >Reporter: Fenjin Wang >Priority: Minor > Labels: easyfix, newbie > > If set JAVA_HOME to IntelliJ's java, bin/zookeeper-server-start.sh can't work > because the path has space in it. > > {quote}export JAVA_HOME="/Applications/IntelliJ > IDEA.app/Contents/jbr/Contents/Home/" > {quote} > > We can fix this by quote "$JAVA" in the shell script according to: > [https://stackoverflow.com/a/7740746/1203241] > > I can send a PR if you like. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9122) Externalizing DB password is not working
Dwijadas created KAFKA-9122: --- Summary: Externalizing DB password is not working Key: KAFKA-9122 URL: https://issues.apache.org/jira/browse/KAFKA-9122 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.2.1 Environment: CentOS 6.7 Reporter: Dwijadas Hi I am trying to externalizing user name and password for oracle DB using {{FileConfigProvider}} provider. For that i have created a properties file that contains user name and password. {{$ cat /home/kfk/data/ora_credentials.properties ora.username="apps" ora.password="Passw0rd!"}} Added the config providers as file and also the config.providers.file.class as FileConfigProvider in the worker config: {{$ cat /home/kfk/etc/kafka/connect-distributed.properties ... ... config.providers=file config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider ... ...}} Restarted the worker and submitted a task using REST with the following config {{"config": \{ "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.user": "${file:/home/kfk/data/ora_credentials.properties:ora.username}", "connection.password": "${file:/home/kfk/data/ora_credentials.properties:ora.password}", ... ... }}} Submitting the above task resulting in the following error: {{{ "error_code": 400, "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n for configuration Couldn't open connection to jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nInvalid value java.sql.SQLException: ORA-01017: invalid username/password; logon denied\n for configuration Couldn't open connection to jdbc:oracle:thin:@oebsr122.infodetics.com:1521:VIS\nYou can also find the above list of errors at the endpoint `/\{connectorType}/config/validate`" }}} Assuming the above config does not replaces the user name and password at all rather entire values for connection.user and connection.password are used to connect to the DB resulting in ORA-01017: invalid username/password error. Is it a bug ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963785#comment-16963785 ] Matthias J. Sax commented on KAFKA-8769: I did not ready the KIP yet, but I personally don't think it's a good idea because flushing `suppress()` for "final window emit" based on wall-clock time breaks the underlying idea of the suppress-contract. For regular "rate limitation" base on wall-clock time, there is already a KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time Last, wall-clock time base rate control is not related to this Jira ticket at all, but it's two independent/orthogonal issue. Why do we need KIP-539, ie, what is different to KIP-424? In fact, skipping over the KIP it seems that you might have copied from KIP-424, anyway? > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6542) Tables should trigger joins too, not just streams
[ https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963776#comment-16963776 ] Matthias J. Sax commented on KAFKA-6542: [~cpreisler], it's unfortunately not that easy. In fact, `StreamThread` will not process any stream-side records before it goes to state RUNNING. However, during processing, the table might get updated at any point in time and if there is out-of-order data, a stream-side record might join with incorrect table state. However, the Jira title is miss-leading anyway, because a table update can never trigger a join computation anyway, because conceptually the stream side is empty. Hence, a join can only happen if a stream-record is processed. Also note, if there are no out-of-order data, Kafka Streams ensure that the table is updated first if the table-record has a smaller timestamp than the stream-record (cf KAFKA-3514). Fixing the out-of-order data issue is a larger and more complex change and thus, would even require a KIP. > Tables should trigger joins too, not just streams > - > > Key: KAFKA-6542 > URL: https://issues.apache.org/jira/browse/KAFKA-6542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > > At the moment it's quite possible to have a race condition when joining a > stream with a table, if the stream event arrives first, before the table > event, in which case the join will fail. > This is also related to bootstrapping KTables (which is what a GKTable does). > Related to: KAFKA-4113 Allow KTable bootstrap -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963757#comment-16963757 ] Matthias J. Sax commented on KAFKA-9088: [~ableegoldman] I have no objections – as long as it's not leaking into public API, I am fine with it. Given the current "flood" of related tickets that reveals the internal mess with regard to those interfaces, it's unclear to me atm, how a "clean" fix would look like. > Consolidate InternalMockProcessorContext and MockInternalProcessorContext > - > > Key: KAFKA-9088 > URL: https://issues.apache.org/jira/browse/KAFKA-9088 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Bruno Cadonna >Priority: Minor > Labels: newbie > > Currently, we have two mocks for the {{InternalProcessorContext}}. The goal > of this ticket is to merge both into one mock or replace it with an > {{EasyMock}} mock. -- This message was sent by Atlassian Jira (v8.3.4#803005)