[jira] [Commented] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
[ https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17175147#comment-17175147 ] Yogesh BG commented on KAFKA-10381: --- One more observation is that when i restart the leader node for that partition, it picks up and issue is reolved. But we can not do restart in real scenario - will be having data loss during restarts > Add broker to a cluster not rebalancing partitions > -- > > Key: KAFKA-10381 > URL: https://issues.apache.org/jira/browse/KAFKA-10381 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Yogesh BG >Priority: Major > > Hi > I have 3 node cluster, topic with one partition. when a node is deleted and > add another node. Topic goes on unknown state and not able to write/read > anything, below exception is seen > > {code:java} > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition C-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition B-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
[ https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogesh BG updated KAFKA-10381: -- Affects Version/s: 2.3.0 > Add broker to a cluster not rebalancing partitions > -- > > Key: KAFKA-10381 > URL: https://issues.apache.org/jira/browse/KAFKA-10381 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Yogesh BG >Priority: Major > > Hi > I have 3 node cluster, topic with one partition. when a node is deleted and > add another node. Topic goes on unknown state and not able to write/read > anything, below exception is seen > > {code:java} > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition C-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition B-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
[ https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogesh BG updated KAFKA-10381: -- Priority: Major (was: Trivial) > Add broker to a cluster not rebalancing partitions > -- > > Key: KAFKA-10381 > URL: https://issues.apache.org/jira/browse/KAFKA-10381 > Project: Kafka > Issue Type: Bug >Reporter: Yogesh BG >Priority: Major > > Hi > I have 3 node cluster, topic with one partition. when a node is deleted and > add another node. Topic goes on unknown state and not able to write/read > anything, below exception is seen > > {code:java} > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition C-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition B-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
Yogesh BG created KAFKA-10381: - Summary: Add broker to a cluster not rebalancing partitions Key: KAFKA-10381 URL: https://issues.apache.org/jira/browse/KAFKA-10381 Project: Kafka Issue Type: Bug Reporter: Yogesh BG Hi I have 3 node cluster, topic with one partition. when a node is deleted and add another node. Topic goes on unknown state and not able to write/read anything, below exception is seen {code:java} [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1003,1004 for partition A-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1003,1004 for partition C-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1002,1004 for partition A-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1002,1004 for partition B-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1003,1004 for partition A-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1003,1004 for partition A-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
[ https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogesh BG updated KAFKA-10381: -- Priority: Trivial (was: Major) > Add broker to a cluster not rebalancing partitions > -- > > Key: KAFKA-10381 > URL: https://issues.apache.org/jira/browse/KAFKA-10381 > Project: Kafka > Issue Type: Bug >Reporter: Yogesh BG >Priority: Trivial > > Hi > I have 3 node cluster, topic with one partition. when a node is deleted and > add another node. Topic goes on unknown state and not able to write/read > anything, below exception is seen > > {code:java} > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition C-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition B-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10303) kafka producer says connect failed in cluster mode
[ https://issues.apache.org/jira/browse/KAFKA-10303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169288#comment-17169288 ] Yogesh BG commented on KAFKA-10303: --- Hi this issue is resolved now, we had an issue with node ip assignment, due to which we used to get this issue. Thanks. we can close this issue > kafka producer says connect failed in cluster mode > -- > > Key: KAFKA-10303 > URL: https://issues.apache.org/jira/browse/KAFKA-10303 > Project: Kafka > Issue Type: Bug >Reporter: Yogesh BG >Priority: Major > > Hi > > I am using kafka broker version 2.3.0 > We have two setups with standalone(one node) and 3 nodes cluster > we pump huge data ~25MBPS, ~80K messages per second > It all works well in one node mode > but in case of cluster, producer start throwing connect failed(librd kafka) > after sometime again able to connect start sending traffic. > What could be the issue? some of the configurations are > > replica.fetch.max.bytes=10485760 > num.network.threads=12 > num.replica.fetchers=6 > queued.max.requests=5 > # The number of threads doing disk I/O > num.io.threads=12 > replica.socket.receive.buffer.bytes=1 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10303) kafka producer says connect failed in cluster mode
Yogesh BG created KAFKA-10303: - Summary: kafka producer says connect failed in cluster mode Key: KAFKA-10303 URL: https://issues.apache.org/jira/browse/KAFKA-10303 Project: Kafka Issue Type: Bug Reporter: Yogesh BG Hi I am using kafka broker version 2.3.0 We have two setups with standalone(one node) and 3 nodes cluster we pump huge data ~25MBPS, ~80K messages per second It all works well in one node mode but in case of cluster, producer start throwing connect failed(librd kafka) after sometime again able to connect start sending traffic. What could be the issue? some of the configurations are replica.fetch.max.bytes=10485760 num.network.threads=12 num.replica.fetchers=6 queued.max.requests=5 # The number of threads doing disk I/O num.io.threads=12 replica.socket.receive.buffer.bytes=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561447#comment-16561447 ] Yogesh BG commented on KAFKA-5998: -- Is there any update on this? I know its not a issue, but would like to suppress the logs as log file is getting filled with these message > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560143#comment-16560143 ] Yogesh BG commented on KAFKA-7209: -- offsets.topic.replication.factor set to 1 then also i receive something like below {{ Received GroupCoordinator response ClientResponse(receivedTimeMs=1532716985393, latencyMs=15, disconnected=false, requestHeader=\{api_key=10,api_version=1,correlation_id=6157,client_id=ks_0_inst_THUNDER_METRICS-StreamThread-37-consumer}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) for group aggregation-framework030_THUNDER_METRICS}}{{18:43:05.394 [ks_0_inst_THUNDER_LOG_L4-StreamThread-70] DEBUG o.a.k.c.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 0 for partition THUNDER_LOG_L4_PC-17 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0)}} > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559935#comment-16559935 ] Yogesh BG commented on KAFKA-7209: -- Hi can u suggest me anything am missing, we are blocked for our product release due to this bug... is there any way i safely clean the kstrems and restart them with the same application.id??? during thins process some amount of data loss is also fine... or either confirmation that its a bug in streaming app could help me taking some decision abt what alternative restart process i can do... > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559492#comment-16559492 ] Yogesh BG commented on KAFKA-7209: -- I tried setting these configuration and try, but no luck {{conf.put("retries",Integer.MAX_VALUE);}} {{conf.put("rebalance.max.retries",Integer.MAX_VALUE);}} {{conf.put("zookeeper.session.timeout.ms",1000);}} > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558874#comment-16558874 ] Yogesh BG commented on KAFKA-7209: -- Here are observation 3 broker and 3 stream app - initially working fine kill one app, then gets rebalanced and start streaming without loss in data i could see below logs {{20:15:26.627 [ks_0_inst_CSV_LOG-StreamThread-22] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [PR-35, PR_vThunder-27, PR-27, PR_vThunder-35] for group aggregation-framework03_CSV_LOG}}{{20:15:26.627 [ks_0_inst_CSV_LOG-StreamThread-20] INFO o.a.k.s.p.internals.StreamThread - stream-thread [ks_0_inst_CSV_LOG-StreamThread-20] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED.}}{{20:16:32.174 [ks_0_inst_THUNDER_LOG_L7-StreamThread-90] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [THUNDER_LOG_L7_PR-15, THUNDER_LOG_L7_PE-15] for group aggregation-framework03_THUNDER_LOG_L7}}{{20:16:32.175 [ks_0_inst_THUNDER_LOG_L7-StreamThread-86] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [THUNDER_LOG_L7_PR-9, THUNDER_LOG_L7_PE-9] for group aggregation-framework03_THUNDER_LOG_L7}}{{20:16:32.175 [ks_0_inst_THUNDER_LOG_L7-StreamThread-85] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [THUNDER_LOG_L7_PE-35, THUNDER_LOG_L7_PR-27, THUNDER_LOG_L7_PE-27, THUNDER_LOG_L7_PR-35] for group aggregation-framework03_THUNDER_LOG_L7}} But the thing i dont get is when i look into sate dir i dont see the partition folders get created for newly assigned partitions below is the initial state before i kill first one[rtp-worker-2] and for other two it remains same and does not changes at all {{[root@rtp-worker-2 /]# ls /tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_0* *0_10* *0_12* *0_14* *0_16* *0_18* *0_2* *0_21* *0_23* *0_25* *0_27* *0_29* *0_30* *0_32* *0_34* *0_4* *0_6* *0_8*}}{{*0_1* *0_11* *0_13* *0_15* *0_17* *0_19* *0_20* *0_22* *0_24* *0_26* *0_28* *0_3* *0_31* *0_33* *0_35* *0_5* *0_7* *0_9*}}{{[root@rtp-worker-0 /]# ls /tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_0* *0_1* *0_10* *0_11* *0_2* *0_3* *0_4* *0_5* *0_6* *0_7* *0_8* *0_9*}}{{[root@rtp-worker-1 /]# ls /tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_11* *0_12* *0_13* *0_14* *0_15* *0_16* *0_17* *0_18* *0_19* *0_20* *0_21* *0_22* *0_23*}} Another case is that all 3 apps running successfully, i bring down one broker then broker gets rebalanced itself. Apps also gets rebalanced with broker and start streaming data, *but there is a data loss observed, when rebalancing in broker is happening. Is there a way to avoid this? does other two broker become non responsive when cluster is rebalancing???* {color:#FF}*Next is when broker and stream goes down at the same time, then i could see broker gets rebalanced and i see some communication messages being received by apps but they never gets back to streaming, esp when multiple partitions are there, those topics which has one partitions gets to streaming in sometime.*{color} > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558772#comment-16558772 ] Yogesh BG commented on KAFKA-7209: -- in one of the forum i see below statement, does this impact anything? we keep default value being 3 for these configurations what is your offsets.topic.replication.factor and transaction.state.log.replication.factor? If those are set to 3 and you have less than 3 brokers I don't imagine things will go well > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558752#comment-16558752 ] Yogesh BG edited comment on KAFKA-7209 at 7/26/18 6:51 PM: --- when i kill the one of the broker it get rebalances its partitions among the other two brokers and i launch a new app with different name than previous one then its successfully streams i am bit confused abt existing streaming app whether they get rebalnced topic info and work accordingly, i am doing the test will update u soon was (Author: yogeshbelur): when i kill the one of the broker it get rebalances its partitions among the other two ingestors and i launch a new app with different name than previous one then its successfully streams i am bit confused abt existing streaming app whether they get rebalnced topic info and work accordingly, i am doing the test will update u soon > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
[ https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558731#comment-16558731 ] Yogesh BG commented on KAFKA-7209: -- is there any manual work around for this to resolve? i think killing only broker or killing only application works > Kafka stream does not rebalance when one node gets down > --- > > Key: KAFKA-7209 > URL: https://issues.apache.org/jira/browse/KAFKA-7209 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Yogesh BG >Priority: Critical > > I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and > backoff time default > > I have 3 nodes running kafka cluster of 3 broker > and i am running the 3 kafka stream with same > [application.id|http://application.id/] > each node has one broker one kafka stream application > everything works fine during setup > i bringdown one node, so one kafka broker and one streaming app is down > now i see exceptions in other two streaming apps and it never gets re > balanced waited for hours and never comes back to norma > is there anything am missing? > i also tried looking into when one broker is down call stream.close, cleanup > and restart this also doesn't help > can anyone help me? > > > > One thing i observed lately is that kafka topics with partitions one gets > reassigned but i have topics of 16 partitions and replication factor 3. It > never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7209) Kafka stream does not rebalance when one node gets down
Yogesh BG created KAFKA-7209: Summary: Kafka stream does not rebalance when one node gets down Key: KAFKA-7209 URL: https://issues.apache.org/jira/browse/KAFKA-7209 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.1 Reporter: Yogesh BG I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and backoff time default I have 3 nodes running kafka cluster of 3 broker and i am running the 3 kafka stream with same [application.id|http://application.id/] each node has one broker one kafka stream application everything works fine during setup i bringdown one node, so one kafka broker and one streaming app is down now i see exceptions in other two streaming apps and it never gets re balanced waited for hours and never comes back to norma is there anything am missing? i also tried looking into when one broker is down call stream.close, cleanup and restart this also doesn't help can anyone help me? One thing i observed lately is that kafka topics with partitions one gets reassigned but i have topics of 16 partitions and replication factor 3. It never settles up -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16187301#comment-16187301 ] Yogesh BG commented on KAFKA-5998: -- Patch v1 is one way to not leave source behind. Another option is to delete the source. Do you have the patch can you share? I will update my setup... We can not delete the source manually... What is the impact of this issue? Its gonna drop some messages or it violates any consumer semantics like exactly once? > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Yogesh BG > Attachments: 5998.v1.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) >
[jira] [Updated] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogesh BG updated KAFKA-5998: - Description: I have one kafka broker and one kafka stream running... I am running its since two days under load of around 2500 msgs per second.. On third day am getting below exception for some of the partitions, I have 16 partitions only 0_0 and 0_1 gives this error {{ 09:43:25.955 [ks_0_inst-StreamThread-6] WARN o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: java.io.FileNotFoundException: /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] at java.io.FileOutputStream.(FileOutputStream.java:221) ~[na:1.7.0_111] at java.io.FileOutputStream.(FileOutputStream.java:171) ~[na:1.7.0_111] at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] 09:43:25.974 [ks_0_inst-StreamThread-15] WARN o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: java.io.FileNotFoundException: /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] at java.io.FileOutputStream.(FileOutputStream.java:221) ~[na:1.7.0_111] at java.io.FileOutputStream.(FileOutputStream.java:171) ~[na:1.7.0_111] at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
[jira] [Commented] (KAFKA-5545) Kafka Streams not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16162903#comment-16162903 ] Yogesh BG commented on KAFKA-5545: -- I have taken source code and verified the issue, works fine. I am waiting for this release pls let me know when i can expect to be released > Kafka Streams not able to successfully restart over new broker ip > - > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) >
[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091115#comment-16091115 ] Yogesh BG edited comment on KAFKA-5545 at 8/29/17 5:00 AM: --- ok. Thanks for looking into the issue will wait till 0.10.2.2 release. On Mon, Jul 17, 2017 at 10:49 PM, Guozhang Wang (JIRA)was (Author: yogeshbelur): ok. Thanks for looking into the issue will wait till 0.10.2.2 release. On Mon, Jul 17, 2017 at 10:49 PM, Guozhang Wang (JIRA) -- Yogesh..BG A10 Networks Enzymes JNC Business Center Mpk Mansion, 6th floor, North Wing No 18, Gauve Garden, 5th block Koramangala. Banglore - 560 095 Contact no: 7760922118 > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144147#comment-16144147 ] Yogesh BG commented on KAFKA-5545: -- I have one qn. As part of this release when I call close on streams.it will be shutdown completely right. Because I see in previous version it has got threads invoking connection requests infinitely. Pls confirm me this. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143568#comment-16143568 ] Yogesh BG commented on KAFKA-5545: -- [~guozhang] When this feature is expected to release?I mean date?? > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130171#comment-16130171 ] Yogesh BG commented on KAFKA-5545: -- ok > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16129229#comment-16129229 ] Yogesh BG commented on KAFKA-5545: -- thank you > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info
[ https://issues.apache.org/jira/browse/KAFKA-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096112#comment-16096112 ] Yogesh BG commented on KAFKA-5618: -- I enabled debug and that is all the logs I got and there are no records present. its fresh deployment without sending any data > Kafka stream not receive any topic/partitions/records info > -- > > Key: KAFKA-5618 > URL: https://issues.apache.org/jira/browse/KAFKA-5618 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, > rtp-kafkastreams.log > > > I have 3 brokers and 3 stream consumers. > I have there are 360 partitions and not able to bring up streams successfully > even after several retry. > I have attached the logs. > There are other topics which are having around 16 partitions and they are > able to successfully be consumed by kafka client > when tried getting thread dump using jstack the process is not responding > Attaching to process ID 10663, please wait... > Debugger attached successfully. > Server compiler detected. > JVM version is 24.141-b02 > Deadlock Detection: > java.lang.RuntimeException: Unable to deduce type of thread from address > 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, > JvmtiAgentThread, or SurrogateLockerThread) > at > sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162) > at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150) > at > sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149) > at > sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56) > at > sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info
[ https://issues.apache.org/jira/browse/KAFKA-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094920#comment-16094920 ] Yogesh BG edited comment on KAFKA-5618 at 7/20/17 4:29 PM: --- You can see it periodically sending metadata request to other nodes with empty topics requests.. This one thing i observed. When i had 64 partitions it used to work well... I wonder start call is not returned because after start is called i log and do stat some other threads i see those threads are not started... For successful scenario u can see the log attached in the issue https://issues.apache.org/jira/browse/KAFKA-5545 https://issues.apache.org/jira/browse/KAFKA-5593 When 9i increased the partition size to 360 with bigger capacity machines it hanged and never able to recover And no error log or any logs from k streams was (Author: yogeshbelur): You can see it periodically sending metadata request to other nodes with empty topics requests.. This one thing i observed. When i had 64 partitions it used to work well... I wonder start call is not returned because after start is called i log and do stat some other threads i see those threads are not started... For successful scenario u can see the log attached in the issue > Kafka stream not receive any topic/partitions/records info > -- > > Key: KAFKA-5618 > URL: https://issues.apache.org/jira/browse/KAFKA-5618 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, > rtp-kafkastreams.log > > > I have 3 brokers and 3 stream consumers. > I have there are 360 partitions and not able to bring up streams successfully > even after several retry. > I have attached the logs. > There are other topics which are having around 16 partitions and they are > able to successfully be consumed by kafka client > when tried getting thread dump using jstack the process is not responding > Attaching to process ID 10663, please wait... > Debugger attached successfully. > Server compiler detected. > JVM version is 24.141-b02 > Deadlock Detection: > java.lang.RuntimeException: Unable to deduce type of thread from address > 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, > JvmtiAgentThread, or SurrogateLockerThread) > at > sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162) > at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150) > at > sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149) > at > sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56) > at > sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info
[ https://issues.apache.org/jira/browse/KAFKA-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094910#comment-16094910 ] Yogesh BG commented on KAFKA-5618: -- It hung up... initially had those exception but after that i restart i dont get those exception... I waited more than a half hour.. > Kafka stream not receive any topic/partitions/records info > -- > > Key: KAFKA-5618 > URL: https://issues.apache.org/jira/browse/KAFKA-5618 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, > rtp-kafkastreams.log > > > I have 3 brokers and 3 stream consumers. > I have there are 360 partitions and not able to bring up streams successfully > even after several retry. > I have attached the logs. > There are other topics which are having around 16 partitions and they are > able to successfully be consumed by kafka client > when tried getting thread dump using jstack the process is not responding > Attaching to process ID 10663, please wait... > Debugger attached successfully. > Server compiler detected. > JVM version is 24.141-b02 > Deadlock Detection: > java.lang.RuntimeException: Unable to deduce type of thread from address > 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, > JvmtiAgentThread, or SurrogateLockerThread) > at > sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162) > at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150) > at > sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149) > at > sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56) > at > sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info
Yogesh BG created KAFKA-5618: Summary: Kafka stream not receive any topic/partitions/records info Key: KAFKA-5618 URL: https://issues.apache.org/jira/browse/KAFKA-5618 Project: Kafka Issue Type: Bug Components: streams Reporter: Yogesh BG Priority: Critical Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, rtp-kafkastreams.log I have 3 brokers and 3 stream consumers. I have there are 360 partitions and not able to bring up streams successfully even after several retry. I have attached the logs. There are other topics which are having around 16 partitions and they are able to successfully be consumed by kafka client when tried getting thread dump using jstack the process is not responding Attaching to process ID 10663, please wait... Debugger attached successfully. Server compiler detected. JVM version is 24.141-b02 Deadlock Detection: java.lang.RuntimeException: Unable to deduce type of thread from address 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, JvmtiAgentThread, or SurrogateLockerThread) at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162) at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150) at sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149) at sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56) at sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5593) Kafka streams not re-balancing when 3 consumer streams are there
[ https://issues.apache.org/jira/browse/KAFKA-5593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091135#comment-16091135 ] Yogesh BG commented on KAFKA-5593: -- Hey thanks fro pointing down that. Yes you are correct. I changed the configuration and verified it works. > Kafka streams not re-balancing when 3 consumer streams are there > > > Key: KAFKA-5593 > URL: https://issues.apache.org/jira/browse/KAFKA-5593 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: log1.txt, log2.txt, log3.txt > > > I have 3 broker nodes, 3 kafka streams > I observe that all 3 consumer streams are part of the group named > rtp-kafkastreams. but when i see the data is processed only by one node. > > DEBUG n.a.a.k.a.AccessLogMetricEnrichmentProcessor - > AccessLogMetricEnrichmentProcessor.process > when i do check the partition information shared by each of them i see first > node has all partitions like all 8. but in other streams the folder is empty. > > [root@ip-172-31-11-139 ~]# ls /data/kstreams/rtp-kafkastreams > 0_0 0_1 0_2 0_3 0_4 0_5 0_6 0_7 > and this folder is empty > I tried restarting the other two consumer streams still they won't become the > part of the group and re-balance. > I have attached the logs. > Configurations are inside the log file. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5593) Kafka streams not re-balancing when 3 consumer streams are there
Yogesh BG created KAFKA-5593: Summary: Kafka streams not re-balancing when 3 consumer streams are there Key: KAFKA-5593 URL: https://issues.apache.org/jira/browse/KAFKA-5593 Project: Kafka Issue Type: Bug Components: streams Reporter: Yogesh BG Priority: Critical Attachments: log1.txt, log2.txt, log3.txt I have 3 broker nodes, 3 kafka streams I observe that all 3 consumer streams are part of the group named rtp-kafkastreams. but when i see the data is processed only by one node. DEBUG n.a.a.k.a.AccessLogMetricEnrichmentProcessor - AccessLogMetricEnrichmentProcessor.process when i do check the partition information shared by each of them i see first node has all partitions like all 8. but in other streams the folder is empty. [root@ip-172-31-11-139 ~]# ls /data/kstreams/rtp-kafkastreams 0_0 0_1 0_2 0_3 0_4 0_5 0_6 0_7 and this folder is empty I tried restarting the other two consumer streams still they won't become the part of the group and re-balance. I have attached the logs. Configurations are inside the log file. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087204#comment-16087204 ] Yogesh BG commented on KAFKA-5545: -- Any updates on this issue? > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081860#comment-16081860 ] Yogesh BG commented on KAFKA-5545: -- its not the time out in close. this time is fixed always to one minute. i meant if ip change, close and restart happs with in session timeout. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079842#comment-16079842 ] Yogesh BG commented on KAFKA-5545: -- Sorry its session timeout. session.timeout.ms. we have set it to 30. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075945#comment-16075945 ] Yogesh BG edited comment on KAFKA-5545 at 7/6/17 5:16 AM: -- Hey setupDiscovery is scheduled thread, having logic to check the ip's of broker has changed or not and then u can see the code i am calling close(), which internally calls stream.close(); You can also see the logs that the close has been triggered. If not called how shutdowns will be initiated? <> _ But from your attached logs it does seems the thread was notified to shutdown but never existed the main loop:_ You should check why shutdown didn't happen. why there are some threads still alive which were part of the previous stream instance once the close has been invoked??? Is there any way i can shutdown the stream completely without restarting the app. BTW restarting application is having its own problem, when i do restart with new broker ip threads are hung, never coming back to process the data. was (Author: yogeshbelur): Hey setupDiscovery is scheduled thread, having logic to check the ip's of broker has changed or not and then u can see the code i am calling close(), which internally calls stream.close(); You can also see the logs that the close has been triggered. If not called how shutdowns will be initiated? <> _ But from your attached logs it does seems the thread was notified to shutdown but never existed the main loop:_ You should check why shutdown didn't happen. why there are some threads still alive which were part of the previous stream instance once the close has been invoked??? Is there any way i can shutdown the stream completely without restarting the app. BTW restarting application is having its own problem, when i do restart with new broker ip threads are hung, never coming back to process the data. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075945#comment-16075945 ] Yogesh BG commented on KAFKA-5545: -- Hey setupDiscovery is scheduled thread, having logic to check the ip's of broker has changed or not and then u can see the code i am calling close(), which internally calls stream.close(); You can also see the logs that the close has been triggered. If not called how shutdowns will be initiated? <> _ But from your attached logs it does seems the thread was notified to shutdown but never existed the main loop:_ You should check why shutdown didn't happen. why there are some threads still alive which were part of the previous stream instance once the close has been invoked??? Is there any way i can shutdown the stream completely without restarting the app. BTW restarting application is having its own problem, when i do restart with new broker ip threads are hung, never coming back to process the data. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075061#comment-16075061 ] Yogesh BG commented on KAFKA-5545: -- I see. But what could be the problems in closing the strems. I don't see restarting application a good idea. From log we can see some threads still polling to connect to old ip. We should try closing those threads right. One more thing is if I do close with in connction timeout all goes well. But if I issue close after connection timeout the threads are stuck > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) >
[jira] [Updated] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogesh BG updated KAFKA-5545: - Attachment: kafkastreams.log I have attached the file. Whenever i get these debug log exceptions kstream is not able to process further > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072228#comment-16072228 ] Yogesh BG edited comment on KAFKA-5545 at 7/3/17 10:27 AM: --- Here I found out something that if the stream got closed successfully then its able to re-establish the connection with new ip of the broker and process the data further. But some times what happening is. Previously stream is not getting closed properly. Because some threads are trying to re-establish the connection to the old ip of broker which is not available. And keeps logging DEBUG exceptions. I have attached the debug log. In this situation stream is not processing the data further. Here is the logic used to reestablish the connection. close timeout is 60sec {code:java} private ScheduledFuture setupDiscovery(final AbstractConfiguration configInstance, int refreshInterval, final String vipAddress, final boolean useSecurePort, final boolean useHostNames) { return executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { List bootstrapServers = getBootstrapServer(configInstance, vipAddress, useSecurePort, useHostNames); String oldBootstrapServerString = config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); logger.info("New bootstrap servers obtained from registry server are " + bootstrapServers + ", old bootstrap server are " + oldBootstrapServerString); boolean isChanged = checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString); if (isChanged) { String bootstrapServerString = bootstrapServersStr(bootstrapServers); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString); logger.info( "Closing connection to oldBootstrapServerString [" + oldBootstrapServerString + "]."); close(); streams = new KafkaStreams(buildTopology(config), config); logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString + "]."); streams.cleanUp(); start(); logger.info("Completed restart of kafka streams connection to new broker with configuration " + config); } } catch (Throwable ex) { logger.error("discovery of kafka broker instances failed with reason : " + ex.getMessage() + ", will retry again", ex); } } }, 0, refreshInterval, TimeUnit.MINUTES); } {code} was (Author: yogeshbelur): Here I found out something that if the stream got closed successfully then its able to re-establish the connection with new ip of the broker and process the data further. But some times what happening is. Previously stream is not getting closed properly. Because some threads are trying to re-establish the connection to the old ip of broker which is not available. And keeps logging DEBUG exceptions. I have attached the debug log. In this situation stream is not processing the data further. Here is the logic used to reestablish the connection. close timeout is 60sec {code:java} private ScheduledFuture setupDiscovery(final AbstractConfiguration configInstance, int refreshInterval, final String vipAddress, final boolean useSecurePort, final boolean useHostNames) { return executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { List bootstrapServers = getBootstrapServer(configInstance, vipAddress, useSecurePort, useHostNames); String oldBootstrapServerString = config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072228#comment-16072228 ] Yogesh BG commented on KAFKA-5545: -- Here I found out something that if the stream got closed successfully then its able to re-establish the connection with new ip of the broker and process the data further. But some times what happening is. Previously stream is not getting closed properly. Because some threads are trying to re-establish the connection to the old ip of broker which is not available. And keeps logging DEBUG exceptions. I have attached the debug log. In this situation stream is not processing the data further. Here is the logic used to reestablish the connection. close timeout is 60sec {code:java} private ScheduledFuture setupDiscovery(final AbstractConfiguration configInstance, int refreshInterval, final String vipAddress, final boolean useSecurePort, final boolean useHostNames) { return executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { List bootstrapServers = getBootstrapServer(configInstance, vipAddress, useSecurePort, useHostNames); String oldBootstrapServerString = config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); logger.info("New bootstrap servers obtained from registry server are " + bootstrapServers + ", old bootstrap server are " + oldBootstrapServerString); boolean isChanged = checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString); if (isChanged) { String bootstrapServerString = bootstrapServersStr(bootstrapServers); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString); logger.info( "Closing connection to oldBootstrapServerString [" + oldBootstrapServerString + "]."); close(); streams = new KafkaStreams(buildTopology(config), config); logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString + "]."); streams.cleanUp(); start(); logger.info("Completed restart of kafka streams connection to new broker with configuration " + config); } } catch (Throwable ex) { logger.error("discovery of kafka broker instances failed with reason : " + ex.getMessage() + ", will retry again", ex); } } }, 0, refreshInterval, TimeUnit.MINUTES); } {code} > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070481#comment-16070481 ] Yogesh BG commented on KAFKA-5545: -- My application doesn't recieve msgs from broker. After sometime this exception stops but consumer is stuck > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug >Reporter: Yogesh BG >Priority: Critical > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069923#comment-16069923 ] Yogesh BG commented on KAFKA-5545: -- 11:03:08.216 [pool-1-thread-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1 11:03:08.216 [pool-1-thread-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799 11:03:08.216 [pool-1-thread-1] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-48] State transition from NOT_RUNNING to RUNNING. 11:03:08.216 [pool-1-thread-1] INFO n.a.a.k.t.KStreamTopologyBase - cleanup the kafka streams : oldBootstrap[10.16.0.38:9092] newBootStrap[10.16.0.39:9092].CREATED 11:03:08.218 [pool-1-thread-1] INFO n.a.a.k.t.KStreamTopologyBase - cleanedup the kafka streams : oldBootstrap[10.16.0.38:9092] newBootStrap[10.16.0.39:9092].CREATED 11:03:08.218 [pool-1-thread-1] INFO n.a.a.k.t.KStreamTopologyBase - Completed restart of kafka streams connection to new broker with configuration {pr-metric-enrichment-proc.schedule_interval=5000, metrics-store-proc.trackFailed=true, commit.interval.ms=1, metrics-store-proc.accumulateBatchRequests=false, pr-metric-kstream-source.useSecurePort=false, metrics-store-proc.maxBatchSizeBytes=5242880, poll.ms=100, pr-metric-kstream-source.vipAddress=metrics-ingestor.analytics.default.16012014.appcito.net, metrics-store-proc.schedule_interval=5000, application.id=rtp-kafkastreams, metrics-store-proc.useSecurePort=false, pr-metric-enrichment-proc.skipKStreamProcess=false, value.serde=class org.apache.kafka.common.serialization.Serdes$ByteBufferSerde, metrics-store-proc.shutdownAwaitSecs=300, state.dir=/data/kstreams, metrics-store-proc.numExecuteConcurrency=16, metrics-store-proc.separateDataStoreThread=true, metrics-store-proc.maxBatchPendingFactor=12, auto.offset.reset=latest, metrics-store-proc.retryFailed=false, metrics-store-proc.maxBatchSize=5000, bootstrap.servers=10.16.0.39:9092, max.poll.records=100, session.timeout.ms=30, pr-metric-kstream-source.kafka.queue=PR, client.id=ks_0_inst, metrics-store-proc.elastic.search.cluster=metrics-datastore, heartbeat.interval.ms=6, num.stream.threads=16, metrics-store-proc.writeTimeoutSecs=30, key.serde=class org.apache.kafka.common.serialization.Serdes$StringSerde, metrics-store-proc.vipAddress=metrics-store.analytics.default.16012014.appcito.net, num.standby.replicas=1, metrics-store-proc.maxBatchInterval=5, metrics-store-proc.logFailed=true} 11:03:08.325 [Thread-6] INFO o.apache.kafka.streams.KafkaStreams - stream-client [ks_0_inst] State transition from CREATED to RUNNING. 11:03:08.333 [StreamThread-33] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-33] Starting 11:03:08.333 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-34] Starting 11:03:08.350 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-36] Starting 11:03:08.351 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-37] Starting 11:03:08.351 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-38] Starting 11:03:08.351 [StreamThread-39] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-39] Starting 11:03:08.351 [StreamThread-40] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-40] Starting 11:03:08.351 [StreamThread-35] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-35] Starting 11:03:08.354 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-41] Starting 11:03:08.354 [StreamThread-42] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-42] Starting 11:03:08.355 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-43] Starting 11:03:08.355 [StreamThread-44] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-44] Starting 11:03:08.355 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-45] Starting 11:03:08.355 [StreamThread-46] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-46] Starting 11:03:08.355 [StreamThread-47] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-47] Starting 11:03:08.355 [Thread-6] INFO o.apache.kafka.streams.KafkaStreams - stream-client [ks_0_inst] Started Kafka Stream process 11:03:08.360 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-48] Starting 11:03:08.363 [StreamThread-33] INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator 10.16.0.39:9092 (id: 2147483647 rack: null) for group rtp-kafkastreams. 11:03:08.375 [StreamThread-33] INFO o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [] for group rtp-kafkastreams 11:03:08.375
[jira] [Created] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
Yogesh BG created KAFKA-5545: Summary: Kafka Stream not able to successfully restart over new broker ip Key: KAFKA-5545 URL: https://issues.apache.org/jira/browse/KAFKA-5545 Project: Kafka Issue Type: Bug Reporter: Yogesh BG Priority: Critical Hi I have one kafka broker and one kafka stream application initially kafka stream connected and starts processing data. Then i restart the broker. When broker restarts new ip will be assigned. In kafka stream i have a 5min interval thread which checks if broker ip changed and if changed, we cleanup the stream, rebuild topology(tried with reusing topology) and start the stream again. I end up with the following exceptions. 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-38] Creating active task 0_5 with assigned partitions [PR-5] 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-41] Creating active task 0_1 with assigned partitions [PR-1] 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-34] Creating active task 0_7 with assigned partitions [PR-7] 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-37] Creating active task 0_3 with assigned partitions [PR-3] 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-45] Creating active task 0_0 with assigned partitions [PR-0] 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-36] Creating active task 0_4 with assigned partitions [PR-4] 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-43] Creating active task 0_6 with assigned partitions [PR-6] 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-48] Creating active task 0_2 with assigned partitions [PR-2] 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could not create task 0_5. Will retry. org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the state directory for task 0_5 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] at