Re: Kafka 2.7.1 Rebalance failed DisconnectException
Hi All, An update on this. Finally I could figure out the cause for this. I have a consumer with *MAX_POLL_INTERVAL_MS_CONFIG* set to *Integer.MAX_VALUE*, which was causing the problem. Looks like its a combination of *group.initial.rebalance.delay.ms <http://group.initial.rebalance.delay.ms>* in kafka + the *max.poll.interval.ms <http://max.poll.interval.ms>* causing the *Rebalance failed. org.apache.kafka.common.errors.DisconnectException*. After debugging I could see the below line from AbstractCoordinator class (line #337), which leads integer overflow if *max.poll.interval.ms <http://max.poll.interval.ms>* is greater than (Integer.MAX_VALUE - 5000) and thus *joinGroupTimeoutMs* defaults the request timeout. Now if *request.time.out* is less than *group.initial.rebalance.delay.ms <http://group.initial.rebalance.delay.ms>* then the issue occurs. Let me know what you think. For now I can get away with changing the max.poll.interval.ms AbstractCoordinator #337 int joinGroupTimeoutMs = Math.max(this.client.defaultRequestTimeoutMs(), this.rebalanceConfig.rebalanceTimeoutMs + 5000); Thanks, Tony On Wed, Jul 14, 2021 at 10:56 PM Tony John wrote: > Hi Shilin, > > Thanks for the suggestion. But I am not upgrading an existing cluster. > I've got a fresh broker and application cluster and there are no consumer > offsets or topics present. When the app starts it creates the topics and > once it moves to RUNNING state I see the rebalance failed log every 30 > seconds. As I understand, the steps in the doc needs to be followed only if > an existing cluster is being migrated to the new version. Am I missing > something here? Below is my KafkaConfig from one the broker during start > up. > > > [2021-07-14 07:27:06,271] INFO KafkaConfig values: > advertised.host.name = null > advertised.listeners = PLAINTEXT://broker100:9092 > advertised.port = null > alter.config.policy.class.name = null > alter.log.dirs.replication.quota.window.num = 11 > alter.log.dirs.replication.quota.window.size.seconds = 1 > authorizer.class.name = > auto.create.topics.enable = true > auto.leader.rebalance.enable = true > background.threads = 10 > broker.id = 100 > broker.id.generation.enable = true > broker.rack = null > client.quota.callback.class = null > compression.type = producer > connection.failed.authentication.delay.ms = 100 > connections.max.idle.ms = 108 > connections.max.reauth.ms = 0 > control.plane.listener.name = null > controlled.shutdown.enable = true > controlled.shutdown.max.retries = 3 > controlled.shutdown.retry.backoff.ms = 5000 > controller.quota.window.num = 11 > controller.quota.window.size.seconds = 1 > controller.socket.timeout.ms = 3 > create.topic.policy.class.name = null > default.replication.factor = 1 > delegation.token.expiry.check.interval.ms = 360 > delegation.token.expiry.time.ms = 8640 > delegation.token.master.key = null > delegation.token.max.lifetime.ms = 60480 > delete.records.purgatory.purge.interval.requests = 1 > delete.topic.enable = true > fetch.max.bytes = 57671680 > fetch.purgatory.purge.interval.requests = 1000 > group.initial.rebalance.delay.ms = 12 > group.max.session.timeout.ms = 120 > group.max.size = 2147483647 > group.min.session.timeout.ms = 6000 > host.name = > inter.broker.listener.name = null > inter.broker.protocol.version = 2.7-IV2 > kafka.metrics.polling.interval.secs = 10 > kafka.metrics.reporters = [] > leader.imbalance.check.interval.seconds = 300 > leader.imbalance.per.broker.percentage = 10 > listener.security.protocol.map = > PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > listeners = PLAINTEXT://broker100:9092 > log.cleaner.backoff.ms = 15000 > log.cleaner.dedupe.buffer.size = 134217728 > log.cleaner.delete.retention.ms = 8640 > log.cleaner.enable = true > log.cleaner.io.buffer.load.factor = 0.9 > log.cleaner.io.buffer.size = 524288 > log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 > log.cleaner.max.compaction.lag.ms = 9223372036854775807 > log.cleaner.min.cleanable.ratio = 0.5 > log.cleaner.min.compaction.lag.ms = 0 > log.cleaner.threads = 1 > log.cleanup.policy = [delete] > log.dir = /tmp/kafka-logs > log.dirs
Re: Kafka 2.7.1 Rebalance failed DisconnectException
false zookeeper.ssl.protocol = TLSv1.2 zookeeper.ssl.truststore.location = null zookeeper.ssl.truststore.password = null zookeeper.ssl.truststore.type = null zookeeper.sync.time.ms = 2000 Thanks, Tony On Wed, Jul 14, 2021 at 4:58 PM Shilin Wu wrote: > Depending on your original version, you may have to consult the upgrade > guide. > https://kafka.apache.org/27/documentation.html#upgrade > > Didn't see important compatibility settings like: > [image: image.png] > > > Perhaps you are not doing it correctly. > > > [image: Confluent] <https://www.confluent.io> > Wu Shilin > Solution Architect > +6581007012 > Follow us: [image: Blog] > <https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>[image: > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn] > <https://www.linkedin.com/company/confluent/>[image: Slack] > <https://slackpass.io/confluentcommunity>[image: YouTube] > <https://youtube.com/confluent> > [image: Kafka Summit] <https://www.kafka-summit.org/> > > > On Wed, Jul 14, 2021 at 7:21 PM Tony John > wrote: > >> Can someone help me on this. >> >> Thanks, >> Tony >> >> On Fri, Jul 9, 2021 at 8:15 PM Tony John >> wrote: >> >> > Hi All, >> > >> > I am trying to upgrade my Kafka streams application to 2.7.1 version of >> > Kafka. The brokers are upgraded to 2.7.1 and kafka dependencies are >> also on >> > 2.7.1. But when I start the application, rebalance is failing with the >> > following message >> > >> > Rebalance failed. org.apache.kafka.common.errors.DisconnectException >> > >> > I am also seeing Group coordinator broker102:9092 (id: 2147483645 rack: >> > null) is unavailable or invalid due to cause: coordinator >> > unavailable.isDisconnected: false. Rediscovery will be attempted. >> > >> > The full set of logs (which gets printed every 30 seconds) is given >> below >> > >> > INFO 2021-07-09 09:33:20.229 | internals.AbstractCoordinator [Consumer >> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, >> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Group >> coordinator >> > broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid >> due to >> > cause: null.isDisconnected: true. Rediscovery will be attempted. >> > INFO 2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer >> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, >> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered >> group >> > coordinator broker102:9092 (id: 2147483645 rack: null) >> > INFO 2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer >> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, >> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Group >> coordinator >> > broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid >> due to >> > cause: coordinator unavailable.isDisconnected: false. Rediscovery will >> be >> > attempted. >> > INFO 2021-07-09 09:33:20.330 | internals.AbstractCoordinator [Consumer >> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, >> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered >> group >> > coordinator broker102:9092 (id: 2147483645 rack: null) >> > INFO 2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer >> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, >> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Rebalance >> failed. >> > org.apache.kafka.common.errors.DisconnectException >> > INFO 2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer >> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, >> > groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining >> group >> > INFO 2021-07-09 09:33:20.333 | internals.AbstractCoordinator [Consumer >> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, >> > groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining >> group >> > INFO 2021-07-09 09:33:20.419 | internals.AbstractCoordinator [Consumer >> > clientId=consumer-my-app-v1-master-coordinator-consumer-8, >> > groupId=my-app-v1-master-coordinator-consumer] Group coordinator >> > broker101:909
Re: Kafka 2.7.1 Rebalance failed DisconnectException
Can someone help me on this. Thanks, Tony On Fri, Jul 9, 2021 at 8:15 PM Tony John wrote: > Hi All, > > I am trying to upgrade my Kafka streams application to 2.7.1 version of > Kafka. The brokers are upgraded to 2.7.1 and kafka dependencies are also on > 2.7.1. But when I start the application, rebalance is failing with the > following message > > Rebalance failed. org.apache.kafka.common.errors.DisconnectException > > I am also seeing Group coordinator broker102:9092 (id: 2147483645 rack: > null) is unavailable or invalid due to cause: coordinator > unavailable.isDisconnected: false. Rediscovery will be attempted. > > The full set of logs (which gets printed every 30 seconds) is given below > > INFO 2021-07-09 09:33:20.229 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, > groupId=my-app-v1-task-coordinator-consumer-app-node100] Group coordinator > broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to > cause: null.isDisconnected: true. Rediscovery will be attempted. > INFO 2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, > groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered group > coordinator broker102:9092 (id: 2147483645 rack: null) > INFO 2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, > groupId=my-app-v1-task-coordinator-consumer-app-node100] Group coordinator > broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to > cause: coordinator unavailable.isDisconnected: false. Rediscovery will be > attempted. > INFO 2021-07-09 09:33:20.330 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, > groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered group > coordinator broker102:9092 (id: 2147483645 rack: null) > INFO 2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, > groupId=my-app-v1-task-coordinator-consumer-app-node100] Rebalance failed. > org.apache.kafka.common.errors.DisconnectException > INFO 2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, > groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining group > INFO 2021-07-09 09:33:20.333 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, > groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining group > INFO 2021-07-09 09:33:20.419 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-master-coordinator-consumer-8, > groupId=my-app-v1-master-coordinator-consumer] Group coordinator > broker101:9092 (id: 2147483646 rack: null) is unavailable or invalid due to > cause: null.isDisconnected: true. Rediscovery will be attempted. > INFO 2021-07-09 09:33:20.420 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-master-coordinator-consumer-8, > groupId=my-app-v1-master-coordinator-consumer] Discovered group coordinator > broker101:9092 (id: 2147483646 rack: null) > INFO 2021-07-09 09:33:20.420 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-master-coordinator-consumer-8, > groupId=my-app-v1-master-coordinator-consumer] Group coordinator > broker101:9092 (id: 2147483646 rack: null) is unavailable or invalid due to > cause: coordinator unavailable.isDisconnected: false. Rediscovery will be > attempted. > INFO 2021-07-09 09:33:20.521 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-master-coordinator-consumer-8, > groupId=my-app-v1-master-coordinator-consumer] Discovered group coordinator > broker101:9092 (id: 2147483646 rack: null) > INFO 2021-07-09 09:33:20.522 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-master-coordinator-consumer-8, > groupId=my-app-v1-master-coordinator-consumer] Rebalance failed. > org.apache.kafka.common.errors.DisconnectException > INFO 2021-07-09 09:33:20.523 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-master-coordinator-consumer-8, > groupId=my-app-v1-master-coordinator-consumer] (Re-)joining group > INFO 2021-07-09 09:33:20.524 | internals.AbstractCoordinator [Consumer > clientId=consumer-my-app-v1-master-coordinator-consumer-8, > groupId=my-app-v1-master-coordinator-consumer] (Re-)joining group > > The application was working fine on 2.5.1. Please note with 2.5.1 the > build used was kafka_*2.12*-2.5.1, but with 2.7.1 I used kaf
Kafka 2.7.1 Rebalance failed DisconnectException
Hi All, I am trying to upgrade my Kafka streams application to 2.7.1 version of Kafka. The brokers are upgraded to 2.7.1 and kafka dependencies are also on 2.7.1. But when I start the application, rebalance is failing with the following message Rebalance failed. org.apache.kafka.common.errors.DisconnectException I am also seeing Group coordinator broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. The full set of logs (which gets printed every 30 seconds) is given below INFO 2021-07-09 09:33:20.229 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, groupId=my-app-v1-task-coordinator-consumer-app-node100] Group coordinator broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. INFO 2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered group coordinator broker102:9092 (id: 2147483645 rack: null) INFO 2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, groupId=my-app-v1-task-coordinator-consumer-app-node100] Group coordinator broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. INFO 2021-07-09 09:33:20.330 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered group coordinator broker102:9092 (id: 2147483645 rack: null) INFO 2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, groupId=my-app-v1-task-coordinator-consumer-app-node100] Rebalance failed. org.apache.kafka.common.errors.DisconnectException INFO 2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining group INFO 2021-07-09 09:33:20.333 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9, groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining group INFO 2021-07-09 09:33:20.419 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-master-coordinator-consumer-8, groupId=my-app-v1-master-coordinator-consumer] Group coordinator broker101:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. INFO 2021-07-09 09:33:20.420 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-master-coordinator-consumer-8, groupId=my-app-v1-master-coordinator-consumer] Discovered group coordinator broker101:9092 (id: 2147483646 rack: null) INFO 2021-07-09 09:33:20.420 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-master-coordinator-consumer-8, groupId=my-app-v1-master-coordinator-consumer] Group coordinator broker101:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. INFO 2021-07-09 09:33:20.521 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-master-coordinator-consumer-8, groupId=my-app-v1-master-coordinator-consumer] Discovered group coordinator broker101:9092 (id: 2147483646 rack: null) INFO 2021-07-09 09:33:20.522 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-master-coordinator-consumer-8, groupId=my-app-v1-master-coordinator-consumer] Rebalance failed. org.apache.kafka.common.errors.DisconnectException INFO 2021-07-09 09:33:20.523 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-master-coordinator-consumer-8, groupId=my-app-v1-master-coordinator-consumer] (Re-)joining group INFO 2021-07-09 09:33:20.524 | internals.AbstractCoordinator [Consumer clientId=consumer-my-app-v1-master-coordinator-consumer-8, groupId=my-app-v1-master-coordinator-consumer] (Re-)joining group The application was working fine on 2.5.1. Please note with 2.5.1 the build used was kafka_*2.12*-2.5.1, but with 2.7.1 I used kafka_*2.13*-2.7.1 *The broker config is * broker.id=2 listeners=PLAINTEXT://broker102:9092 advertised.listeners=PLAINTEXT://broker102:9092 num.network.threads=4 num.io.threads=4 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/mnt/store/kafka/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=4 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=2 transaction.state.log
Re: Kafka Streams : Rack Aware deployment in EC2
Thanks a lot Guozhang. Will give it a try. Thanks, Tony On Tue, Oct 20, 2020 at 10:42 PM Guozhang Wang wrote: > Hello Tony, > > I think you already know the consumer-client side fetch-from-follower > feature: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica > > Say your Kafka deployment is cross-AZs, and your Streams is also deployed > cross-AZ, you can, by setting the "client.rack" through streams configs, to > let the embedded consumers to read from close replicas within the same AZ > as well. > > Guozhang > > > On Mon, Oct 19, 2020 at 9:19 PM Tony John > wrote: > > > Thanks Matthias.. I don't think I will be able to take it up.. Will wait > > for it to be available in future. :) > > > > On Mon, Oct 19, 2020 at 11:56 PM Matthias J. Sax > wrote: > > > > > No suggestions... Sorry. > > > > > > The task assignment algorithm cannot be customized. > > > > > > The only thing you _could_ do, is to pickup the ticket yourself (to get > > > the feature maybe in 2.8 release). Not sure if you would be interested > > > to contribute :) > > > > > > > > > -Matthias > > > > > > On 10/19/20 11:08 AM, Tony John wrote: > > > > Thanks for the quick response Matthias. We were planning to move to > AWS > > > > MSK, and I think we will have to defer it until rack awareness is in > > > place > > > > for Kafka Streams. Reason being consumers consuming from brokers in > > > > different availability zones would result in high data transfer > > charges. > > > Do > > > > you have any suggestions? > > > > > > > > Thanks, > > > > Tony > > > > > > > > On Mon, Oct 19, 2020 at 11:08 PM Matthias J. Sax > > > wrote: > > > > > > > >> There is no such feature for Kafka Streams yet. > > > >> > > > >> We have one ticket for rack aware standby task placement > > > >> (https://issues.apache.org/jira/browse/KAFKA-6718) but nobody is > > > working > > > >> on it. > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> On 10/19/20 9:20 AM, Tony John wrote: > > > >>> Hi All, > > > >>> > > > >>> I have been trying to figure out some documentation on how to > enable > > > rack > > > >>> awareness on a Kafka Streams app. I do see the broker.rack > > > configuration > > > >>> which needs to be done at the broker side and client.rack > > configuration > > > >> for > > > >>> the consumers. Is there any specific configuration which is > required > > to > > > >>> enable rack awareness in a streams app? Please throw some light on > > > this. > > > >>> > > > >>> PS: I am using Kafka / Kafka Streams 2.2.1 > > > >>> > > > >>> Thanks, > > > >>> Tony > > > >>> > > > >> > > > > > > > > > > > > -- > -- Guozhang >
Re: Kafka Streams : Rack Aware deployment in EC2
Thanks Matthias.. I don't think I will be able to take it up.. Will wait for it to be available in future. :) On Mon, Oct 19, 2020 at 11:56 PM Matthias J. Sax wrote: > No suggestions... Sorry. > > The task assignment algorithm cannot be customized. > > The only thing you _could_ do, is to pickup the ticket yourself (to get > the feature maybe in 2.8 release). Not sure if you would be interested > to contribute :) > > > -Matthias > > On 10/19/20 11:08 AM, Tony John wrote: > > Thanks for the quick response Matthias. We were planning to move to AWS > > MSK, and I think we will have to defer it until rack awareness is in > place > > for Kafka Streams. Reason being consumers consuming from brokers in > > different availability zones would result in high data transfer charges. > Do > > you have any suggestions? > > > > Thanks, > > Tony > > > > On Mon, Oct 19, 2020 at 11:08 PM Matthias J. Sax > wrote: > > > >> There is no such feature for Kafka Streams yet. > >> > >> We have one ticket for rack aware standby task placement > >> (https://issues.apache.org/jira/browse/KAFKA-6718) but nobody is > working > >> on it. > >> > >> > >> -Matthias > >> > >> On 10/19/20 9:20 AM, Tony John wrote: > >>> Hi All, > >>> > >>> I have been trying to figure out some documentation on how to enable > rack > >>> awareness on a Kafka Streams app. I do see the broker.rack > configuration > >>> which needs to be done at the broker side and client.rack configuration > >> for > >>> the consumers. Is there any specific configuration which is required to > >>> enable rack awareness in a streams app? Please throw some light on > this. > >>> > >>> PS: I am using Kafka / Kafka Streams 2.2.1 > >>> > >>> Thanks, > >>> Tony > >>> > >> > > >
Re: Kafka Streams : Rack Aware deployment in EC2
Thanks for the quick response Matthias. We were planning to move to AWS MSK, and I think we will have to defer it until rack awareness is in place for Kafka Streams. Reason being consumers consuming from brokers in different availability zones would result in high data transfer charges. Do you have any suggestions? Thanks, Tony On Mon, Oct 19, 2020 at 11:08 PM Matthias J. Sax wrote: > There is no such feature for Kafka Streams yet. > > We have one ticket for rack aware standby task placement > (https://issues.apache.org/jira/browse/KAFKA-6718) but nobody is working > on it. > > > -Matthias > > On 10/19/20 9:20 AM, Tony John wrote: > > Hi All, > > > > I have been trying to figure out some documentation on how to enable rack > > awareness on a Kafka Streams app. I do see the broker.rack configuration > > which needs to be done at the broker side and client.rack configuration > for > > the consumers. Is there any specific configuration which is required to > > enable rack awareness in a streams app? Please throw some light on this. > > > > PS: I am using Kafka / Kafka Streams 2.2.1 > > > > Thanks, > > Tony > > >
Kafka Streams : Rack Aware deployment in EC2
Hi All, I have been trying to figure out some documentation on how to enable rack awareness on a Kafka Streams app. I do see the broker.rack configuration which needs to be done at the broker side and client.rack configuration for the consumers. Is there any specific configuration which is required to enable rack awareness in a streams app? Please throw some light on this. PS: I am using Kafka / Kafka Streams 2.2.1 Thanks, Tony
Kafka Streams 1.1.0 - Significant Performance Drop
Hi All, I was trying to switch to the latest version of streams (1.1.0) and started seeing a significant drop in performance of the application. I was using 0.11.0.2 before. After doing some checks I found that the choking point was Rocksdb flush which contributes almost 80% of the CPU time (PFA the screenshots). One thing which I was doing earlier with the application was that I was doing a context.commit() from the Processor's process method for each record which gets processed. After doing some throttling on this and restricting the commit to every 100K records, I could see that the performance was on par with the previous version. So below are my queries 1. Is it wrong to do the store commits for each record which gets processed? 2. Are there any other configurations which I need to make in order to get rid of this other than throttling the commits 3. Or is it actually an issue with the 1.1.0, which I don't think will be the case as I haven't seen anyone else reporting this so far. Please suggest. Thanks, Tony
Re: Kafka Streams : TimeoutException Expiring Records
Thanks a lot Bill for looking in to this. I would definitely attempt the suggestions and let you know the outcome. I have gone through KIP-91, but struggling to understand the behavior. So does it mean that these errors are happening due to a failure in the broker? If so why would it kill all the threads which causes the consumers to be unavailable? Also is there anyway to handle these errors gracefully so that it does not kill the threads and hence the consumers? Thanks, Tony On Fri, Feb 23, 2018 at 1:15 AM, Bill Bejeck wrote: > Hi Tony, > > Looks like you have a known issue that KIP-91( > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 91+Provide+Intuitive+User+Timeouts+in+The+Producer) > will address. > > In the meantime, as a workaround, you could try > setting REQUEST_TIMEOUT_MS_CONFIG to a large value (Integer.MAX_VALUE ?) > other secondary configurations to consider changing would be increasing " > max.block.ms" and "retries" > > Thanks, > Bill > > On Thu, Feb 22, 2018 at 8:14 AM, Tony John > wrote: > > > Hi All, > > > > I am running into an issue with my Kafka Streams application. The > > application was running fine for almost 2 weeks, then it started throwing > > the below exception which caused the threads to die. Now when I restart > the > > application, it dies quickly (1-2 hrs) when trying to catch up the lag. > > > > The application is running on an AWS EC2 instance with 8 core processor > and > > 16GB of memory. The streams config is given below and more logs are > > available below (I have stripped of some logs which I though may not be > > relevant). Towards the end of this thread you will be able to see lot of > > exceptions similar to the below one + RocksDBExceptions > > (org.apache.kafka.streams.errors.ProcessorStateException: Error opening > > store vstore at location > > /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please > take > > a look at it and let me know what could be wrong here? > > > > INFO 2018-02-21 08:37:20.758 [Engine2-StreamThread-2] > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and > > standby tasks [4_0, 1_30] in 0ms > > ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread | > > Engine2-StreamThread-6-producer] > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task > > [1_34] Error sending record to topic cv-v1-cv. No more offsets will be > > recorded for this task and the exception will eventually be thrown > > org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) > for > > cv-v1-cv-2: 31439 ms has passed since last append > > DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3] > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [Engine2-StreamThread-3] processing latency 46282 > commit time 3 for > > 1 records. Adjusting down recordsProcessedBeforeCommit=6482 > > ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] > > org.apache.kafka.streams.processor.internals.AssignedTasks - > stream-thread > > [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the > > following error: > > org.apache.kafka.streams.errors.StreamsException: task [1_34] exception > > caught when producing > > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > > checkForException(RecordCollectorImpl.java:137) > > ~[kafka-streams-0.11.0.2.jar:?] > > at org.apache.kafka.streams.processor.internals. > RecordCollectorImpl.flush( > > RecordCollectorImpl.java:145) > > ~[kafka-streams-0.11.0.2.jar:?] > > at org.apache.kafka.streams.processor.internals.StreamTask.flushState( > > StreamTask.java:296) > > ~[kafka-streams-0.11.0.2.jar:?] > > at org.apache.kafka.streams.processor.internals. > > StreamTask$1.run(StreamTask.java:275) > > ~[kafka-streams-0.11.0.2.jar:?] > > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > measureLatencyNs(StreamsMetricsImpl.java:201) > > ~[kafka-streams-0.11.0.2.jar:? > > ] > > at org.apache.kafka.streams.processor.internals. > > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] > > at org.apache.kafka.streams.processor.internals. > > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] > > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply( > > AssignedTasks.java:374) > > ~[kafka-streams-0.11.0.2.jar:?] > > at org.apache.kafka.streams.processor.internals.AssignedTasks.
Kafka Streams : TimeoutException Expiring Records
Hi All, I am running into an issue with my Kafka Streams application. The application was running fine for almost 2 weeks, then it started throwing the below exception which caused the threads to die. Now when I restart the application, it dies quickly (1-2 hrs) when trying to catch up the lag. The application is running on an AWS EC2 instance with 8 core processor and 16GB of memory. The streams config is given below and more logs are available below (I have stripped of some logs which I though may not be relevant). Towards the end of this thread you will be able to see lot of exceptions similar to the below one + RocksDBExceptions (org.apache.kafka.streams.errors.ProcessorStateException: Error opening store vstore at location /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please take a look at it and let me know what could be wrong here? INFO 2018-02-21 08:37:20.758 [Engine2-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and standby tasks [4_0, 1_30] in 0ms ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread | Engine2-StreamThread-6-producer] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task [1_34] Error sending record to topic cv-v1-cv. No more offsets will be recorded for this task and the exception will eventually be thrown org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 31439 ms has passed since last append DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-3] processing latency 46282 > commit time 3 for 1 records. Adjusting down recordsProcessedBeforeCommit=6482 ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [1_34] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:145) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:296) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 31439 ms has passed since last append INFO 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-6] Shutting down INFO 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-6] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN. DEBUG 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-6] Shutting down all active tasks [6_12, 1_34, 7_15], standby tasks [1_2, 1_7], suspended tasks [1_18, 7_15, 6_3, 1_27], and suspended standby tasks [1_3, 1_30] INFO 2018-02-21 08:37:24.879 [Engine2-StreamThread-3] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-3] Committed all active tasks [1_31, 1_24, 6_7] and standby tasks [1_0, 1_6,
Re: Kafka Streams : Connection to node -1 could not be established. Broker may not be available.
Thanks a lot Guozhang. I was able to nail it down by looking at the log which you suggested. The log revealed that it was trying to connect to localhost and it was a problem with one of my sub component. It was trying to read the broker configuration from a different property file which didn't exist and defaulted to loopback address. Thanks a lot for the help. Thanks, Tony On Wed, Feb 7, 2018 at 3:05 AM, Guozhang Wang wrote: > Hi Tony, > > Your Streams configs look good to me, and the additional streams log from > StreamThread are normal operational logs that do not related to the issue. > I suspect there is a network partition between your client to the broker > node, and to investigate which host this `node -1` is referring to (note > that -1 is actually a valid node id, read from the bootstrapped list), you > can look into the same log file and search for the following line: > > "Initiating connection to node.." > > and the find out the host:port of node with id -1 . > > Guozhang > > On Tue, Feb 6, 2018 at 8:16 AM, Tony John > wrote: > > > Hi Guozhang, > > > > Thanks for looking into this. Below are the stream config values. > > > > INFO 2018-02-02 08:33:25.708 [main] org.apache.kafka.streams. > > StreamsConfig > > - StreamsConfig values: > > application.id = cv-v1 > > application.server = > > bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092] > > buffered.records.per.partition = 1000 > > cache.max.bytes.buffering = 104857600 > > client.id = I2 > > commit.interval.ms = 3 > > connections.max.idle.ms = 54 > > default.key.serde = class > > org.apache.kafka.common.serialization.Serdes$ByteArraySerde > > default.timestamp.extractor = class > > org.apache.kafka.streams.processor.FailOnInvalidTimestamp > > default.value.serde = class > > org.apache.kafka.common.serialization.Serdes$ByteArraySerde > > key.serde = null > > metadata.max.age.ms = 30 > > metric.reporters = [] > > metrics.num.samples = 2 > > metrics.recording.level = DEBUG > > metrics.sample.window.ms = 3 > > num.standby.replicas = 1 > > num.stream.threads = 1 > > partition.grouper = class > > org.apache.kafka.streams.processor.DefaultPartitionGrouper > > poll.ms = 100 > > processing.guarantee = at_least_once > > receive.buffer.bytes = 32768 > > reconnect.backoff.max.ms = 1000 > > reconnect.backoff.ms = 50 > > replication.factor = 2 > > request.timeout.ms = 4 > > retry.backoff.ms = 100 > > rocksdb.config.setter = null > > security.protocol = PLAINTEXT > > send.buffer.bytes = 131072 > > state.cleanup.delay.ms = 60 > > state.dir = /mnt/store/kafka-streams > > timestamp.extractor = null > > value.serde = null > > windowstore.changelog.additional.retention.ms = 8640 > > zookeeper.connect = > > INFO 2018-02-02 08:33:25.870 [main] org.apache.kafka.streams. > > StreamsConfig > > - StreamsConfig values: > > application.id = pe-v1 > > application.server = > > bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092] > > buffered.records.per.partition = 1000 > > cache.max.bytes.buffering = 2147483648 > > client.id = I2 > > commit.interval.ms = 3 > > connections.max.idle.ms = 54 > > default.key.serde = class > > org.apache.kafka.common.serialization.Serdes$ByteArraySerde > > default.timestamp.extractor = class > > org.apache.kafka.streams.processor.FailOnInvalidTimestamp > > default.value.serde = class > > org.apache.kafka.common.serialization.Serdes$ByteArraySerde > > key.serde = null > > metadata.max.age.ms = 30 > > metric.reporters = [] > > metrics.num.samples = 2 > > metrics.recording.level = DEBUG > > metrics.sample.window.ms = 3 > > num.standby.replicas = 1 > > num.stream.threads = 3 > > partition.grouper = class > > org.apache.kafka.streams.processor.DefaultPartitionGrouper > > poll.ms = 100 > > processing.guarantee = at_least_once > > receive.buffer.bytes = 32768 > > reconnect.backoff.max.ms = 1000 > > reconnect.backoff.ms = 50 > > replication.factor = 2 > > request.timeout.ms = 4 > > retry.backoff.ms = 100 > > rocksdb.config.setter = null > > security.protocol = PLAINTEXT > > send.buffer.bytes = 131072 > > state.cleanup.delay.ms = 60 > > state.dir = /mnt/store/kafka-streams > > timestamp.extractor = null > > value.serde = null > > windowstore.changelog.additional.retention.ms = 8640 > > zookeeper.connect = > > > > Please note there are 2 streams applicatio
Re: Kafka Streams : Connection to node -1 could not be established. Broker may not be available.
d - stream-thread [I2-StreamThread-7] processing latency 2698 < commit time 3 for 6880 records. Adjusting up recordsProcessedBeforeCommit=76501 Thanks, Tony On Tue, Feb 6, 2018 at 3:21 AM, Guozhang Wang wrote: > Hello Tony, > > > Could you share your Streams config values so that people can help further > investigating your issue? > > > Guozhang > > > On Mon, Feb 5, 2018 at 12:00 AM, Tony John > wrote: > > > Hi All, > > > > I have been running a streams application for sometime. The application > > runs fine for sometime but after a day or two I see the below log getting > > printed continuously on to the console. > > > > WARN 2018-02-05 02:50:04.060 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.160 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.261 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.311 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.361 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > WARN 2018-02-05 02:50:04.411 [kafka-producer-network-thread | > producer-1] > > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not > > be > > established. Broker may not be available. > > > > At this time, though the application is able to process the messages, I > > could also see lag building up in the consumers and the processing time > for > > a batch has increased 15 folds. > > > > I am using a single zoo-keeper instance with 2 brokers and 4 application > > instances. I checked the broker and zoo-keeper status, they are all > running > > fine as I could see. I have also verified the connectivity between the > > application and broker instances using telnet and it seems intact. The > > kafka broker and streams/client versions are 0.11.0.2. Results of broker > > status results from zoo-keeper below > > > > > > [root@app100 kafka]# echo dump | nc localhost 2181 > > > > SessionTracker dump: > > > > Session Sets (3): > > > > 0 expire at Mon Feb 05 06:16:39 UTC 2018: > > > > 1 expire at Mon Feb 05 06:16:42 UTC 2018: > > > > 0x161562860970001 > > > > 1 expire at Mon Feb 05 06:16:45 UTC 2018: > > > > 0x16156286097 > > > > ephemeral nodes dump: > > > > Sessions with Ephemerals (2): > > > > 0x16156286097: > > > > /brokers/ids/0 > > > > /controller > > > > 0x161562860970001: > > > > /brokers/ids/1 > > > > [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh > > localhost:2181 <<< "get /brokers/ids/0" > > > > Connecting to localhost:2181 > > > > Welcome to ZooKeeper! > > > > JLine support is disabled > > > > > > WATCHER:: > > > > > > WatchedEvent state:SyncConnected type:None path:null > > > > {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"} > > ,"endpoints":["PLAINTEXT://172.31.10.35:9092"],"jmx_port": > > 5,"host":"172.31.10.35","timestamp":"1517569007467"," > > port":9092,"version":4} > > > > cZxid = 0x1c > > > > ctime = Fri Feb 02 10:56:47 UTC 2018 > > > > mZxid = 0x1c > > > > mtime = Fri Feb 02 10:56:47 UTC 2018 > > > > pZxid = 0x1c > > > > cversion = 0 > > > > dataVersion = 0 > > > > aclVersion = 0 > > > > ephemeralOwner = 0x16156286097 > > > > dataLength = 197 > > > > numChildren = 0 > > > > [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh > > localhost:2181 <<< "get /brokers/ids/1" > > > > Connecting to localhost:2181 > > > > Welcome to ZooKeeper! > > > > JLine support is disabled > > > > > > WATCHER:: > > > > > > WatchedEvent state:SyncConnected type:None path:null > > > > {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"} > > ,"endpoints":["PLAINTEXT://172.31.14.8:9092"],"jmx_port": > > 5,"host":"172.31.14.8","timestamp":"1517569016562"," > > port":9092,"version":4} > > > > cZxid = 0x21 > > > > ctime = Fri Feb 02 10:56:56 UTC 2018 > > > > mZxid = 0x21 > > > > mtime = Fri Feb 02 10:56:56 UTC 2018 > > > > pZxid = 0x21 > > > > cversion = 0 > > > > dataVersion = 0 > > > > aclVersion = 0 > > > > ephemeralOwner = 0x161562860970001 > > > > dataLength = 195 > > > > numChildren = 0 > > > > Could you please throw some light on this as to what could be going wrong > > here? > > > > Thanks, > > Tony > > > > > > -- > -- Guozhang >
Kafka Streams : Connection to node -1 could not be established. Broker may not be available.
Hi All, I have been running a streams application for sometime. The application runs fine for sometime but after a day or two I see the below log getting printed continuously on to the console. WARN 2018-02-05 02:50:04.060 [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be established. Broker may not be available. WARN 2018-02-05 02:50:04.160 [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be established. Broker may not be available. WARN 2018-02-05 02:50:04.261 [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be established. Broker may not be available. WARN 2018-02-05 02:50:04.311 [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be established. Broker may not be available. WARN 2018-02-05 02:50:04.361 [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be established. Broker may not be available. WARN 2018-02-05 02:50:04.411 [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be established. Broker may not be available. At this time, though the application is able to process the messages, I could also see lag building up in the consumers and the processing time for a batch has increased 15 folds. I am using a single zoo-keeper instance with 2 brokers and 4 application instances. I checked the broker and zoo-keeper status, they are all running fine as I could see. I have also verified the connectivity between the application and broker instances using telnet and it seems intact. The kafka broker and streams/client versions are 0.11.0.2. Results of broker status results from zoo-keeper below [root@app100 kafka]# echo dump | nc localhost 2181 SessionTracker dump: Session Sets (3): 0 expire at Mon Feb 05 06:16:39 UTC 2018: 1 expire at Mon Feb 05 06:16:42 UTC 2018: 0x161562860970001 1 expire at Mon Feb 05 06:16:45 UTC 2018: 0x16156286097 ephemeral nodes dump: Sessions with Ephemerals (2): 0x16156286097: /brokers/ids/0 /controller 0x161562860970001: /brokers/ids/1 [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0" Connecting to localhost:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"} ,"endpoints":["PLAINTEXT://172.31.10.35:9092"],"jmx_port": 5,"host":"172.31.10.35","timestamp":"1517569007467"," port":9092,"version":4} cZxid = 0x1c ctime = Fri Feb 02 10:56:47 UTC 2018 mZxid = 0x1c mtime = Fri Feb 02 10:56:47 UTC 2018 pZxid = 0x1c cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x16156286097 dataLength = 197 numChildren = 0 [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/1" Connecting to localhost:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"} ,"endpoints":["PLAINTEXT://172.31.14.8:9092"],"jmx_port": 5,"host":"172.31.14.8","timestamp":"1517569016562"," port":9092,"version":4} cZxid = 0x21 ctime = Fri Feb 02 10:56:56 UTC 2018 mZxid = 0x21 mtime = Fri Feb 02 10:56:56 UTC 2018 pZxid = 0x21 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x161562860970001 dataLength = 195 numChildren = 0 Could you please throw some light on this as to what could be going wrong here? Thanks, Tony
Re: Kafka Streams : CommitFailedException
Hello Guozhang, Thanks for the analysis. Figured out the reason for the OOM and it was actually caused by an in memory queue in the app itself. I have fixed it and right now it all looks good. Sorry for the inconvenience and thanks for helping out. Thanks, Tony On Wed, Nov 8, 2017 at 1:20 AM, Guozhang Wang wrote: > Hello Tony, > > You mentioned in 0.11.0.0 the > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > 1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > 2). > But from your logs it seems you set this config as 2 in both versions. > Right? > > Anyways, I took a look into your logs and I think you are hitting a known > issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been > fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1 > but the app is not dying out. The running out of memory issues seems not > related to the CommitFailed error. Do you have any stateful operations in > your app that use an iterator? Did you close the iterator after complete > using it? > > > Guozhang > > > On Tue, Nov 7, 2017 at 12:42 AM, Tony John > wrote: > > > Hi Guozang, > > > > Thanks for looking into this. I was using 0.11.0.0 version of the library > > earlier when I was getting the CommitFailed exception and the tasks were > > terminating. The application config then was Replication Factor = 2, Num > > Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll > > Interval = 2147483647. The streams config code (*Streams Config While > > Using > > 0.11.0.0*) is given below and the logs of the application while using > > 0.11.0.0 can be downloaded from > > https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0 > > > > I have upgraded the libraries to 0.11.0.1 and ran into some other issues. > > Though the CommitFailed error logs are still showing up with 0.11.0.1 the > > tasks are not getting terminated, but the app quickly runs out of memory > > (GC overhead limit exceeded) and the CPU is choked, which was not the > case > > earlier. The logs are available @ > > https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and > > application config is also given below (*Streams Config While Using > > 0.11.0.1*). Since I am not sure what configuration helps reduce the > > CommitFailed error, which I think could be one of the reasons for the CPU > > choke and eventually cause an OOM, I have gone ahead and used all > possible > > configuration parameters, but still no luck. > > > > It would be great if you could shed some light on this as to what could > be > > causing this problem. > > > > *Streams Config While Using 0.11.0.0 * > > > > val props = Properties() > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > EngineConfig.APPLICATION_ID) > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > EngineConfig.KAFKA_SERVERS) > > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_RECORDS_CONFIG), > > 1000) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_INTERVAL_MS_CONFIG), > > Int.MAX_VALUE) > > > > streams = KafkaStreams(builder, StreamsConfig(props)) > > streams.start() > > > > > > *Streams Config While Using 0.11.0.1* > > > > val props = Properties() > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > EngineConfig.APPLICATION_ID) > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > EngineConfig.KAFKA_SERVERS) > > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2) > > > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB > > EAT_INTERVAL_MS_CONFIG), > > 1) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_ > > COMMIT_INTERVAL_MS_CONFIG), > > 1) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE > > _AUTO_COMMIT_CONFIG), > > true) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_RECORDS_CONFIG), > > 1) > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_ > > POLL_INTERVAL_MS_CONFIG), > > Int.MAX_VALUE) > > props.put(StreamsConfig.consumerPrefix(Consum
Re: Kafka Streams : CommitFailedException
Hi Guozang, Thanks for looking into this. I was using 0.11.0.0 version of the library earlier when I was getting the CommitFailed exception and the tasks were terminating. The application config then was Replication Factor = 2, Num Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll Interval = 2147483647. The streams config code (*Streams Config While Using 0.11.0.0*) is given below and the logs of the application while using 0.11.0.0 can be downloaded from https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0 I have upgraded the libraries to 0.11.0.1 and ran into some other issues. Though the CommitFailed error logs are still showing up with 0.11.0.1 the tasks are not getting terminated, but the app quickly runs out of memory (GC overhead limit exceeded) and the CPU is choked, which was not the case earlier. The logs are available @ https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and application config is also given below (*Streams Config While Using 0.11.0.1*). Since I am not sure what configuration helps reduce the CommitFailed error, which I think could be one of the reasons for the CPU choke and eventually cause an OOM, I have gone ahead and used all possible configuration parameters, but still no luck. It would be great if you could shed some light on this as to what could be causing this problem. *Streams Config While Using 0.11.0.0 * val props = Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1000) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Int.MAX_VALUE) streams = KafkaStreams(builder, StreamsConfig(props)) streams.start() *Streams Config While Using 0.11.0.1* val props = Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), 1) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), 1) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), true) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Int.MAX_VALUE) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 3) streams = KafkaStreams(builder, StreamsConfig(props)) streams.start() Thanks, Tony On Thu, Nov 2, 2017 at 4:39 PM, Tony John wrote: > Hi All, > > I am facing CommitFailedException in my streams application. As per the > log I tried changing the max.poll.interval.ms and max.poll.records. But > both didn't help. PFA the full stack trace of the exception and below is > the streams configuration used. What else could be wrong? > > val props = Properties() > props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), > 1000) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), > Int.MAX_VALUE) > > streams = KafkaStreams(builder, StreamsConfig(props)) > streams.start() > > > Thanks, > Tony >
Kafka Streams : CommitFailedException
Hi All, I am facing CommitFailedException in my streams application. As per the log I tried changing the max.poll.interval.ms and max.poll.records. But both didn't help. PFA the full stack trace of the exception and below is the streams configuration used. What else could be wrong? val props = Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1000) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Int.MAX_VALUE) streams = KafkaStreams(builder, StreamsConfig(props)) streams.start() Thanks, Tony
Re: Kafka Streams : Problem with Global State Restoration
Hi Damian, Thanks a lot for the response. Just saw your reply when I visited the mailer-list archive <http://mail-archives.apache.org/mod_mbox/kafka-users/201710.mbox/browser>. Unfortunately I haven't received the same on my inbox and I didn't even see the update in the archive when I checked earlier today. Anyways once again thanks a lot for the response. I will raise a JIRA as you suggested and I hope this isn't the case with local state stores. Thanks, Tony On Wed, Oct 18, 2017 at 9:21 PM, Tony John wrote: > Hello All, > > I have been trying to create an application on top of Kafka Streams. I am > newbie to Kafka & Kakfa streams. So please excuse if I my understanding are > wrong. > > I got the application running fine on a single instance ec2 instance in > AWS. Now I am looking at scaling and ran in to some issues. The application > has a global state store and couple of other local one's backed by RocksDB. > It uses the processor API's and the stream is built using the > TopologyBuilder. The global state store is fed by a topic which send a key > value pair (both are protobuf objects) and connected to a processor which > then transforms the value by applying some logic, finally stores the key > and the modified data to the store. Similarly the local stores are > connected via processors which are fed by different topics. Now the issue > is that when I launch a new instance of the app, task re-allocation and > state restoration happens, and the stores get replicated on to the new > instance. But the global store which is replicated on to the new instance > has some other data (I guess thats the raw data) as opposed to the > processed data. > > *Application Topology* > > *Global Store* > > Source Topic (Partition Count = 1, Replication Factor = 2, Compacted = > false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging > disabled) -> Global Store > > *Local Store* > > Source Topic (Partition Count = 16, Replication Factor = 2, Compacted = > true) > > -> LocalStoreProcessor ( > Persistent, Caching enabled, Logging enabled > > ) -> Local state stores on different partitions > > *Sample Code (Written in Kotlin)* > > val streams: KafkaStreams > init { > val builder = KStreamBuilder().apply { > > val globalStore = Stores.create(Config.DICTIONARY) > .withKeys(Serdes.String()) > .withValues(Serdes.String()) > .persistent() > .enableCaching() > .disableLogging() > .build() as > StateStoreSupplier> > > addGlobalStore(globalStore, "dictionary-words-source", > Serdes.String().deserializer(), Serdes.String().deserializer(), > Config.DICTIONARY_WORDS_TOPIC, "dictionary-words-processor", > DictionaryWordsProcessor.Supplier) > > > addSource("query-source", Serdes.String().deserializer(), > Serdes.String().deserializer(), Config.QUERIES_TOPIC) > addProcessor("query-processor", QueryProcessor.Supplier, > "query-source") > > } > > val config = StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to > Config.APPLICATION_ID, > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS, > StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR > )) > streams = KafkaStreams(builder, config) > > Runtime.getRuntime().addShutdownHook(Thread { > println("Shutting down Kafka Streams...") > streams.close() > println("Shut down successfully") > }) > } > > fun run() { > Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1, > Config.REPLICATION_FACTOR, true) > Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT, > Config.REPLICATION_FACTOR, false) > streams.start() > } > > > *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application > instances. > > > So just wanted to know the process of state store restoration while > scaling up and down. How does the streams manage to restore the data? I was > expecting when the new instance gets launched, the data flows through the > same processor so that it gets modified using the same logic which is > applied when it was stored in instance 1. Could you please help me > understand this little better. Please let me know if there is anyway to get > the restoration process to route the data via the same processor. > > > Thanks, > Tony >
Kafka Streams : Problem with Global State Restoration
Hello All, I have been trying to create an application on top of Kafka Streams. I am newbie to Kafka & Kakfa streams. So please excuse if I my understanding are wrong. I got the application running fine on a single instance ec2 instance in AWS. Now I am looking at scaling and ran in to some issues. The application has a global state store and couple of other local one's backed by RocksDB. It uses the processor API's and the stream is built using the TopologyBuilder. The global state store is fed by a topic which send a key value pair (both are protobuf objects) and connected to a processor which then transforms the value by applying some logic, finally stores the key and the modified data to the store. Similarly the local stores are connected via processors which are fed by different topics. Now the issue is that when I launch a new instance of the app, task re-allocation and state restoration happens, and the stores get replicated on to the new instance. But the global store which is replicated on to the new instance has some other data (I guess thats the raw data) as opposed to the processed data. *Application Topology* *Global Store* Source Topic (Partition Count = 1, Replication Factor = 2, Compacted = false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging disabled) -> Global Store *Local Store* Source Topic (Partition Count = 16, Replication Factor = 2, Compacted = true) -> LocalStoreProcessor ( Persistent, Caching enabled, Logging enabled ) -> Local state stores on different partitions *Sample Code (Written in Kotlin)* val streams: KafkaStreams init { val builder = KStreamBuilder().apply { val globalStore = Stores.create(Config.DICTIONARY) .withKeys(Serdes.String()) .withValues(Serdes.String()) .persistent() .enableCaching() .disableLogging() .build() as StateStoreSupplier> addGlobalStore(globalStore, "dictionary-words-source", Serdes.String().deserializer(), Serdes.String().deserializer(), Config.DICTIONARY_WORDS_TOPIC, "dictionary-words-processor", DictionaryWordsProcessor.Supplier) addSource("query-source", Serdes.String().deserializer(), Serdes.String().deserializer(), Config.QUERIES_TOPIC) addProcessor("query-processor", QueryProcessor.Supplier, "query-source") } val config = StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to Config.APPLICATION_ID, StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS, StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR )) streams = KafkaStreams(builder, config) Runtime.getRuntime().addShutdownHook(Thread { println("Shutting down Kafka Streams...") streams.close() println("Shut down successfully") }) } fun run() { Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1, Config.REPLICATION_FACTOR, true) Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT, Config.REPLICATION_FACTOR, false) streams.start() } *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application instances. So just wanted to know the process of state store restoration while scaling up and down. How does the streams manage to restore the data? I was expecting when the new instance gets launched, the data flows through the same processor so that it gets modified using the same logic which is applied when it was stored in instance 1. Could you please help me understand this little better. Please let me know if there is anyway to get the restoration process to route the data via the same processor. Thanks, Tony