[jira] [Commented] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17175147#comment-17175147
 ] 

Yogesh BG commented on KAFKA-10381:
---

One more observation is that when i restart the leader node for that partition, 
it picks up and issue is reolved. But we can not do restart in real scenario - 
will be having data loss during restarts

> Add broker to a cluster not rebalancing partitions
> --
>
> Key: KAFKA-10381
> URL: https://issues.apache.org/jira/browse/KAFKA-10381
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: Yogesh BG
>Priority: Major
>
> Hi
> I have 3 node cluster, topic with one partition. when a node is deleted and 
> add another node. Topic goes on unknown state and not able to write/read 
> anything, below exception is seen
>  
> {code:java}
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition C-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition B-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-10381:
--
Affects Version/s: 2.3.0

> Add broker to a cluster not rebalancing partitions
> --
>
> Key: KAFKA-10381
> URL: https://issues.apache.org/jira/browse/KAFKA-10381
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: Yogesh BG
>Priority: Major
>
> Hi
> I have 3 node cluster, topic with one partition. when a node is deleted and 
> add another node. Topic goes on unknown state and not able to write/read 
> anything, below exception is seen
>  
> {code:java}
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition C-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition B-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-10381:
--
Priority: Major  (was: Trivial)

> Add broker to a cluster not rebalancing partitions
> --
>
> Key: KAFKA-10381
> URL: https://issues.apache.org/jira/browse/KAFKA-10381
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Major
>
> Hi
> I have 3 node cluster, topic with one partition. when a node is deleted and 
> add another node. Topic goes on unknown state and not able to write/read 
> anything, below exception is seen
>  
> {code:java}
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition C-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition B-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)
Yogesh BG created KAFKA-10381:
-

 Summary: Add broker to a cluster not rebalancing partitions
 Key: KAFKA-10381
 URL: https://issues.apache.org/jira/browse/KAFKA-10381
 Project: Kafka
  Issue Type: Bug
Reporter: Yogesh BG


Hi

I have 3 node cluster, topic with one partition. when a node is deleted and add 
another node. Topic goes on unknown state and not able to write/read anything, 
below exception is seen

 
{code:java}
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1003,1004 for partition A-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1003,1004 for partition C-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1002,1004 for partition A-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1002,1004 for partition B-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1003,1004 for partition A-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1003,1004 for partition A-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-10381:
--
Priority: Trivial  (was: Major)

> Add broker to a cluster not rebalancing partitions
> --
>
> Key: KAFKA-10381
> URL: https://issues.apache.org/jira/browse/KAFKA-10381
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Trivial
>
> Hi
> I have 3 node cluster, topic with one partition. when a node is deleted and 
> add another node. Topic goes on unknown state and not able to write/read 
> anything, below exception is seen
>  
> {code:java}
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition C-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition B-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10303) kafka producer says connect failed in cluster mode

2020-08-01 Thread Yogesh BG (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169288#comment-17169288
 ] 

Yogesh BG commented on KAFKA-10303:
---

Hi this issue is resolved now, we had an issue with node ip assignment, due to 
which we used to get this issue. Thanks. we can close this issue

> kafka producer says connect failed in cluster mode
> --
>
> Key: KAFKA-10303
> URL: https://issues.apache.org/jira/browse/KAFKA-10303
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Major
>
> Hi
>  
> I am using kafka broker version 2.3.0
> We have two setups with standalone(one node) and 3 nodes cluster
> we pump huge data ~25MBPS, ~80K messages per second
> It all works well in one node mode
> but in case of cluster, producer start throwing connect failed(librd kafka)
> after sometime again able to connect start sending traffic.
> What could be the issue? some of the configurations are
>  
> replica.fetch.max.bytes=10485760
> num.network.threads=12
> num.replica.fetchers=6
> queued.max.requests=5
> # The number of threads doing disk I/O
> num.io.threads=12
> replica.socket.receive.buffer.bytes=1
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10303) kafka producer says connect failed in cluster mode

2020-07-23 Thread Yogesh BG (Jira)
Yogesh BG created KAFKA-10303:
-

 Summary: kafka producer says connect failed in cluster mode
 Key: KAFKA-10303
 URL: https://issues.apache.org/jira/browse/KAFKA-10303
 Project: Kafka
  Issue Type: Bug
Reporter: Yogesh BG


Hi

 

I am using kafka broker version 2.3.0

We have two setups with standalone(one node) and 3 nodes cluster

we pump huge data ~25MBPS, ~80K messages per second

It all works well in one node mode

but in case of cluster, producer start throwing connect failed(librd kafka)

after sometime again able to connect start sending traffic.

What could be the issue? some of the configurations are

 
replica.fetch.max.bytes=10485760
num.network.threads=12

num.replica.fetchers=6

queued.max.requests=5

# The number of threads doing disk I/O

num.io.threads=12

replica.socket.receive.buffer.bytes=1

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-07-29 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561447#comment-16561447
 ] 

Yogesh BG commented on KAFKA-5998:
--

Is there any update on this? I know its not a issue, but would like to suppress 
 the logs as log file is getting filled with these message

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-27 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560143#comment-16560143
 ] 

Yogesh BG commented on KAFKA-7209:
--

offsets.topic.replication.factor set to 1 then also i receive something like 
below

 

{{ Received GroupCoordinator response 
ClientResponse(receivedTimeMs=1532716985393, latencyMs=15, disconnected=false, 
requestHeader=\{api_key=10,api_version=1,correlation_id=6157,client_id=ks_0_inst_THUNDER_METRICS-StreamThread-37-consumer},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) for group 
aggregation-framework030_THUNDER_METRICS}}{{18:43:05.394 
[ks_0_inst_THUNDER_LOG_L4-StreamThread-70] DEBUG 
o.a.k.c.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 0 for 
partition THUNDER_LOG_L4_PC-17 returned fetch data (error=NONE, 
highWaterMark=0, lastStableOffset = -1, logStartOffset = 0, abortedTransactions 
= null, recordsSizeInBytes=0)}}

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-27 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559935#comment-16559935
 ] 

Yogesh BG commented on KAFKA-7209:
--

Hi can u suggest me anything am missing, we are blocked for our product release 
due to this bug... is there any way i safely clean the kstrems and restart them 
with the same application.id??? during thins process some amount of data loss 
is also fine...

or either confirmation that its a bug in streaming app could help me taking 
some decision abt what alternative restart process i can do...

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-27 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559492#comment-16559492
 ] 

Yogesh BG commented on KAFKA-7209:
--

I tried setting these configuration and try, but no luck

 

{{conf.put("retries",Integer.MAX_VALUE);}}

{{conf.put("rebalance.max.retries",Integer.MAX_VALUE);}}

{{conf.put("zookeeper.session.timeout.ms",1000);}}

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558874#comment-16558874
 ] 

Yogesh BG commented on KAFKA-7209:
--

Here are observation

3 broker and 3 stream app - initially working fine

kill one app, then gets rebalanced and start streaming without loss in data

i could see below logs

{{20:15:26.627 [ks_0_inst_CSV_LOG-StreamThread-22] INFO  
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [PR-35, 
PR_vThunder-27, PR-27, PR_vThunder-35] for group 
aggregation-framework03_CSV_LOG}}{{20:15:26.627 
[ks_0_inst_CSV_LOG-StreamThread-20] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [ks_0_inst_CSV_LOG-StreamThread-20] State transition from 
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED.}}{{20:16:32.174 
[ks_0_inst_THUNDER_LOG_L7-StreamThread-90] INFO  
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions 
[THUNDER_LOG_L7_PR-15, THUNDER_LOG_L7_PE-15] for group 
aggregation-framework03_THUNDER_LOG_L7}}{{20:16:32.175 
[ks_0_inst_THUNDER_LOG_L7-StreamThread-86] INFO  
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions 
[THUNDER_LOG_L7_PR-9, THUNDER_LOG_L7_PE-9] for group 
aggregation-framework03_THUNDER_LOG_L7}}{{20:16:32.175 
[ks_0_inst_THUNDER_LOG_L7-StreamThread-85] INFO  
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions 
[THUNDER_LOG_L7_PE-35, THUNDER_LOG_L7_PR-27, THUNDER_LOG_L7_PE-27, 
THUNDER_LOG_L7_PR-35] for group aggregation-framework03_THUNDER_LOG_L7}}

 

But the thing i dont get is when i look into sate dir i dont see the partition 
folders get created for newly assigned partitions

below is the initial state before i kill first one[rtp-worker-2] and for other 
two it remains same and does not changes at all

 

{{[root@rtp-worker-2 /]# ls 
/tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_0*  *0_10*  
*0_12*  *0_14*  *0_16*  *0_18*  *0_2*   *0_21*  *0_23*  *0_25*  *0_27*  *0_29*  
*0_30*  *0_32*  *0_34*  *0_4*  *0_6*  *0_8*}}{{*0_1*  *0_11*  *0_13*  *0_15*  
*0_17*  *0_19*  *0_20*  *0_22*  *0_24*  *0_26*  *0_28*  *0_3*   *0_31*  *0_33*  
*0_35*  *0_5*  *0_7*  *0_9*}}{{[root@rtp-worker-0 /]# ls 
/tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_0*  *0_1*  
*0_10*  *0_11*  *0_2*  *0_3*  *0_4*  *0_5*  *0_6*  *0_7*  *0_8*  
*0_9*}}{{[root@rtp-worker-1 /]# ls 
/tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_11*  *0_12*  
*0_13*  *0_14*  *0_15*  *0_16*  *0_17*  *0_18*  *0_19*  *0_20*  *0_21*  *0_22*  
*0_23*}}

 

Another case is that all 3 apps running successfully, i bring down one broker 
then broker gets rebalanced itself. Apps also gets rebalanced with broker and 
start streaming data, *but there is a data loss observed, when rebalancing in 
broker is happening. Is there a way to avoid this? does other two broker become 
non responsive when cluster is rebalancing???*

 

{color:#FF}*Next is when broker and stream goes down at the same time, then 
i could see broker gets rebalanced and i see some communication messages being 
received by apps but they never gets back to streaming, esp when multiple 
partitions are there, those topics which has one partitions gets to streaming 
in sometime.*{color}

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558772#comment-16558772
 ] 

Yogesh BG commented on KAFKA-7209:
--

in one of the forum i see below statement, does this impact anything? we keep 
default value being 3 for these configurations

 

what is your offsets.topic.replication.factor and 
transaction.state.log.replication.factor? If those are set to 3 and you have 
less than 3 brokers I don't imagine things will go well

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558752#comment-16558752
 ] 

Yogesh BG edited comment on KAFKA-7209 at 7/26/18 6:51 PM:
---

when i kill the one of the broker it get rebalances its partitions among the 
other two brokers

and i launch a new app with different name than previous one then its 
successfully streams

i am bit confused abt existing streaming app whether they get rebalnced topic 
info and work accordingly, i am doing the test will update u soon


was (Author: yogeshbelur):
when i kill the one of the broker it get rebalances its partitions among the 
other two ingestors

and i launch a new app with different name than previous one then its 
successfully streams

i am bit confused abt existing streaming app whether they get rebalnced topic 
info and work accordingly, i am doing the test will update u soon

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558731#comment-16558731
 ] 

Yogesh BG commented on KAFKA-7209:
--

is there any manual work around for this to resolve? i think killing only 
broker or killing only application works

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)
Yogesh BG created KAFKA-7209:


 Summary: Kafka stream does not rebalance when one node gets down
 Key: KAFKA-7209
 URL: https://issues.apache.org/jira/browse/KAFKA-7209
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.1
Reporter: Yogesh BG


I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
backoff time default
 
I have 3 nodes running kafka cluster of 3 broker

and i am running the 3 kafka stream with same 
[application.id|http://application.id/]
each node has one broker one kafka stream application
everything works fine during setup
i bringdown one node, so one kafka broker and one streaming app is down
now i see exceptions in other two streaming apps and it never gets re balanced 
waited for hours and never comes back to norma
is there anything am missing?
i also tried looking into when one broker is down call stream.close, cleanup 
and restart this also doesn't help
can anyone help me?
 
 
 
 One thing i observed lately is that kafka topics with partitions one gets 
reassigned but i have topics of 16 partitions and replication factor 3. It 
never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2017-10-01 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16187301#comment-16187301
 ] 

Yogesh BG commented on KAFKA-5998:
--

Patch v1 is one way to not leave source behind. Another option is to delete the 
source.

Do you have the patch can you share? I will update my setup... We can not 
delete the source manually... What is the impact of this issue? Its gonna drop 
some messages or it violates any consumer semantics like exactly once?
 

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Yogesh BG
> Attachments: 5998.v1.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  

[jira] [Updated] (KAFKA-5998) /.checkpoint.tmp Not found exception

2017-09-30 Thread Yogesh BG (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-5998:
-
Description: 
I have one kafka broker and one kafka stream running... I am running its since 
two days under load of around 2500 msgs per second.. On third day am getting 
below exception for some of the partitions, I have 16 partitions only 0_0 and 
0_1 gives this error

{{
09:43:25.955 [ks_0_inst-StreamThread-6] WARN  o.a.k.s.p.i.ProcessorStateManager 
- Failed to write checkpoint file to 
/data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
java.io.FileNotFoundException: 
/data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or directory)
at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
at java.io.FileOutputStream.(FileOutputStream.java:221) 
~[na:1.7.0_111]
at java.io.FileOutputStream.(FileOutputStream.java:171) 
~[na:1.7.0_111]
at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
/data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
java.io.FileNotFoundException: 
/data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or directory)
at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
at java.io.FileOutputStream.(FileOutputStream.java:221) 
~[na:1.7.0_111]
at java.io.FileOutputStream.(FileOutputStream.java:171) 
~[na:1.7.0_111]
at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]

[jira] [Commented] (KAFKA-5545) Kafka Streams not able to successfully restart over new broker ip

2017-09-12 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16162903#comment-16162903
 ] 

Yogesh BG commented on KAFKA-5545:
--

I have taken source code and verified the issue, works fine. I am waiting for 
this release pls let me know when i can expect to be released

> Kafka Streams not able to successfully restart over new broker ip
> -
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-08-28 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091115#comment-16091115
 ] 

Yogesh BG edited comment on KAFKA-5545 at 8/29/17 5:00 AM:
---

ok. Thanks for looking into the issue will wait till 0.10.2.2 release.

On Mon, Jul 17, 2017 at 10:49 PM, Guozhang Wang (JIRA) 




was (Author: yogeshbelur):
ok. Thanks for looking into the issue will wait till 0.10.2.2 release.

On Mon, Jul 17, 2017 at 10:49 PM, Guozhang Wang (JIRA) 




-- 
Yogesh..BG
A10 Networks
Enzymes JNC Business Center
Mpk Mansion, 6th floor, North Wing
No 18, Gauve Garden, 5th block
Koramangala.
Banglore - 560 095
Contact no: 7760922118


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Fix For: 0.11.0.1, 1.0.0
>
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-08-28 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144147#comment-16144147
 ] 

Yogesh BG commented on KAFKA-5545:
--

I have one qn. As part of this release when I call close on streams.it will
be shutdown completely right. Because I see in previous version it has got
threads invoking connection requests infinitely. Pls confirm me this.




> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Fix For: 0.11.0.1, 1.0.0
>
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-08-28 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143568#comment-16143568
 ] 

Yogesh BG commented on KAFKA-5545:
--

[~guozhang] When this feature is expected to release?I mean date??

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Fix For: 0.11.0.1, 1.0.0
>
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-08-17 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130171#comment-16130171
 ] 

Yogesh BG commented on KAFKA-5545:
--

ok

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Fix For: 0.11.0.1, 1.0.0
>
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-08-16 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16129229#comment-16129229
 ] 

Yogesh BG commented on KAFKA-5545:
--

thank you




> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Fix For: 0.11.0.1, 1.0.0
>
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info

2017-07-21 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096112#comment-16096112
 ] 

Yogesh BG commented on KAFKA-5618:
--

I enabled debug and that is all the logs I got and there are no records
present. its fresh deployment without sending any data




> Kafka stream not receive any topic/partitions/records info
> --
>
> Key: KAFKA-5618
> URL: https://issues.apache.org/jira/browse/KAFKA-5618
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, 
> rtp-kafkastreams.log
>
>
> I have 3 brokers and 3 stream consumers.
> I have there are 360 partitions and not able to bring up streams successfully 
> even after several retry.
> I have attached the logs. 
> There are other topics which are having around 16 partitions and they are 
> able to successfully be consumed by kafka client
> when tried getting thread dump using jstack the process is not responding
> Attaching to process ID 10663, please wait...
> Debugger attached successfully.
> Server compiler detected.
> JVM version is 24.141-b02
> Deadlock Detection:
> java.lang.RuntimeException: Unable to deduce type of thread from address 
> 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, 
> JvmtiAgentThread, or SurrogateLockerThread)
> at 
> sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162)
> at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info

2017-07-20 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094920#comment-16094920
 ] 

Yogesh BG edited comment on KAFKA-5618 at 7/20/17 4:29 PM:
---

You can see it periodically sending metadata request to other nodes with empty 
topics requests.. This one thing i observed. When i had 64 partitions it used 
to work well... I wonder start call is not returned because after start is 
called i log and do stat some other threads i see those threads are not 
started...

For successful scenario u can see the log attached in the issue 

https://issues.apache.org/jira/browse/KAFKA-5545

https://issues.apache.org/jira/browse/KAFKA-5593


When 9i increased the partition size to 360 with bigger capacity machines it 
hanged and never able to recover And no error log or any logs from k streams


was (Author: yogeshbelur):
You can see it periodically sending metadata request to other nodes with empty 
topics requests.. This one thing i observed. When i had 64 partitions it used 
to work well... I wonder start call is not returned because after start is 
called i log and do stat some other threads i see those threads are not 
started...

For successful scenario u can see the log attached in the issue 


> Kafka stream not receive any topic/partitions/records info
> --
>
> Key: KAFKA-5618
> URL: https://issues.apache.org/jira/browse/KAFKA-5618
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, 
> rtp-kafkastreams.log
>
>
> I have 3 brokers and 3 stream consumers.
> I have there are 360 partitions and not able to bring up streams successfully 
> even after several retry.
> I have attached the logs. 
> There are other topics which are having around 16 partitions and they are 
> able to successfully be consumed by kafka client
> when tried getting thread dump using jstack the process is not responding
> Attaching to process ID 10663, please wait...
> Debugger attached successfully.
> Server compiler detected.
> JVM version is 24.141-b02
> Deadlock Detection:
> java.lang.RuntimeException: Unable to deduce type of thread from address 
> 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, 
> JvmtiAgentThread, or SurrogateLockerThread)
> at 
> sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162)
> at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info

2017-07-20 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094910#comment-16094910
 ] 

Yogesh BG commented on KAFKA-5618:
--

It hung up... initially had those exception but after that i restart i dont get 
those exception...

I waited more than a half hour..

> Kafka stream not receive any topic/partitions/records info
> --
>
> Key: KAFKA-5618
> URL: https://issues.apache.org/jira/browse/KAFKA-5618
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, 
> rtp-kafkastreams.log
>
>
> I have 3 brokers and 3 stream consumers.
> I have there are 360 partitions and not able to bring up streams successfully 
> even after several retry.
> I have attached the logs. 
> There are other topics which are having around 16 partitions and they are 
> able to successfully be consumed by kafka client
> when tried getting thread dump using jstack the process is not responding
> Attaching to process ID 10663, please wait...
> Debugger attached successfully.
> Server compiler detected.
> JVM version is 24.141-b02
> Deadlock Detection:
> java.lang.RuntimeException: Unable to deduce type of thread from address 
> 0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, 
> JvmtiAgentThread, or SurrogateLockerThread)
> at 
> sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162)
> at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56)
> at 
> sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5618) Kafka stream not receive any topic/partitions/records info

2017-07-20 Thread Yogesh BG (JIRA)
Yogesh BG created KAFKA-5618:


 Summary: Kafka stream not receive any topic/partitions/records info
 Key: KAFKA-5618
 URL: https://issues.apache.org/jira/browse/KAFKA-5618
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Yogesh BG
Priority: Critical
 Attachments: rtp-kafkastreams2.log, rtp-kafkastreams3.log, 
rtp-kafkastreams.log

I have 3 brokers and 3 stream consumers.

I have there are 360 partitions and not able to bring up streams successfully 
even after several retry.

I have attached the logs. 

There are other topics which are having around 16 partitions and they are able 
to successfully be consumed by kafka client

when tried getting thread dump using jstack the process is not responding

Attaching to process ID 10663, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 24.141-b02
Deadlock Detection:

java.lang.RuntimeException: Unable to deduce type of thread from address 
0x7fdac4009000 (expected type JavaThread, CompilerThread, ServiceThread, 
JvmtiAgentThread, or SurrogateLockerThread)
at 
sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:162)
at sun.jvm.hotspot.runtime.Threads.first(Threads.java:150)
at 
sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149)
at 
sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56)
at 
sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5593) Kafka streams not re-balancing when 3 consumer streams are there

2017-07-17 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091135#comment-16091135
 ] 

Yogesh BG commented on KAFKA-5593:
--

Hey thanks fro pointing down that. Yes you are correct. I changed the 
configuration and verified it works.

> Kafka streams not re-balancing when 3 consumer streams are there
> 
>
> Key: KAFKA-5593
> URL: https://issues.apache.org/jira/browse/KAFKA-5593
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: log1.txt, log2.txt, log3.txt
>
>
> I have 3 broker nodes, 3 kafka streams
> I observe that all 3 consumer streams are part of the group named 
> rtp-kafkastreams. but when i see the data is processed only by one node. 
> 
> DEBUG n.a.a.k.a.AccessLogMetricEnrichmentProcessor - 
> AccessLogMetricEnrichmentProcessor.process
> when i do check the partition information shared by each of them i see first 
> node has all partitions like all 8. but in other streams the folder is empty.
> 
> [root@ip-172-31-11-139 ~]# ls /data/kstreams/rtp-kafkastreams
> 0_0  0_1  0_2  0_3  0_4  0_5  0_6  0_7
>  and  this folder is empty
> I tried restarting the other two consumer streams still they won't become the 
> part of the group and re-balance.
> I have attached the logs.
> Configurations are inside the log file.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5593) Kafka streams not re-balancing when 3 consumer streams are there

2017-07-14 Thread Yogesh BG (JIRA)
Yogesh BG created KAFKA-5593:


 Summary: Kafka streams not re-balancing when 3 consumer streams 
are there
 Key: KAFKA-5593
 URL: https://issues.apache.org/jira/browse/KAFKA-5593
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Yogesh BG
Priority: Critical
 Attachments: log1.txt, log2.txt, log3.txt

I have 3 broker nodes, 3 kafka streams

I observe that all 3 consumer streams are part of the group named 
rtp-kafkastreams. but when i see the data is processed only by one node. 

DEBUG n.a.a.k.a.AccessLogMetricEnrichmentProcessor - 
AccessLogMetricEnrichmentProcessor.process

when i do check the partition information shared by each of them i see first 
node has all partitions like all 8. but in other streams the folder is empty.


[root@ip-172-31-11-139 ~]# ls /data/kstreams/rtp-kafkastreams
0_0  0_1  0_2  0_3  0_4  0_5  0_6  0_7

 and  this folder is empty

I tried restarting the other two consumer streams still they won't become the 
part of the group and re-balance.

I have attached the logs.

Configurations are inside the log file.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-14 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087204#comment-16087204
 ] 

Yogesh BG commented on KAFKA-5545:
--

Any updates on this issue?


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-11 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081860#comment-16081860
 ] 

Yogesh BG commented on KAFKA-5545:
--

its not the time out in close. this time is fixed always to one minute.

i meant if ip change, close and restart happs with in session timeout.




> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-09 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079842#comment-16079842
 ] 

Yogesh BG commented on KAFKA-5545:
--

Sorry its session timeout. session.timeout.ms. we have set it to 30.

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075945#comment-16075945
 ] 

Yogesh BG edited comment on KAFKA-5545 at 7/6/17 5:16 AM:
--

Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data. 



was (Author: yogeshbelur):
Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data.


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075945#comment-16075945
 ] 

Yogesh BG commented on KAFKA-5545:
--

Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data.


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075061#comment-16075061
 ] 

Yogesh BG commented on KAFKA-5545:
--

I see. But what could be the problems in closing the strems.  I don't see
restarting application a good idea. From log we can see some threads still
polling to connect to old ip. We should try closing those threads right.

One more thing is if I do close with in connction timeout all goes well.
But if I issue close after connection timeout the threads are stuck




> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  

[jira] [Updated] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Yogesh BG (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-5545:
-
Attachment: kafkastreams.log

I have attached the file.


Whenever i get these debug log exceptions kstream is not able to process further

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072228#comment-16072228
 ] 

Yogesh BG edited comment on KAFKA-5545 at 7/3/17 10:27 AM:
---

Here I found out something that if the stream got closed successfully then its 
able to re-establish the connection with new ip of the broker and process the 
data further.

But some times what happening is. Previously stream is not getting closed 
properly. Because some threads are trying to re-establish the connection to the 
old ip of broker which is not available. And keeps logging DEBUG exceptions. I 
have attached the debug log. In this situation stream is not processing the 
data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture setupDiscovery(final AbstractConfiguration 
configInstance, int refreshInterval,
final String vipAddress, final boolean useSecurePort, 
final boolean useHostNames) {
return executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
List bootstrapServers = 
getBootstrapServer(configInstance, vipAddress, useSecurePort,
useHostNames);
String oldBootstrapServerString = 
config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
logger.info("New bootstrap servers 
obtained from registry server are " + bootstrapServers
+ ", old bootstrap 
server are " + oldBootstrapServerString);
boolean isChanged = 
checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString);
if (isChanged) {
String bootstrapServerString = 
bootstrapServersStr(bootstrapServers);

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString);
logger.info(
"Closing 
connection to oldBootstrapServerString [" + oldBootstrapServerString + "].");
close();
streams = new 
KafkaStreams(buildTopology(config), config);
logger.info("cleaning up 
oldBootstrapServerString [" + oldBootstrapServerString + "].");
streams.cleanUp();
start();
logger.info("Completed restart 
of kafka streams connection to new broker with configuration "
+ config);
}
} catch (Throwable ex) {
logger.error("discovery of kafka broker 
instances failed with reason : " + ex.getMessage()
+ ", will retry again", 
ex);
}
}

}, 0, refreshInterval, TimeUnit.MINUTES);
}

{code}



was (Author: yogeshbelur):
Here I found out something that if the stream got closed successfully then its 
able to re-establish the connection with new ip of the broker and process the 
data further.

But some times what happening is. Previously stream is not getting closed 
properly. Because some threads are trying to re-establish the connection to the 
old ip of broker which is not available. And keeps logging DEBUG exceptions. I 
have attached the debug log. In this situation stream is not processing the 
data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture setupDiscovery(final AbstractConfiguration 
configInstance, int refreshInterval,
final String vipAddress, final boolean useSecurePort, 
final boolean useHostNames) {
return executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
List bootstrapServers = 
getBootstrapServer(configInstance, vipAddress, useSecurePort,
useHostNames);
String oldBootstrapServerString = 
config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072228#comment-16072228
 ] 

Yogesh BG commented on KAFKA-5545:
--

Here I found out something that if the stream got closed successfully then its 
able to re-establish the connection with new ip of the broker and process the 
data further.

But some times what happening is. Previously stream is not getting closed 
properly. Because some threads are trying to re-establish the connection to the 
old ip of broker which is not available. And keeps logging DEBUG exceptions. I 
have attached the debug log. In this situation stream is not processing the 
data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture setupDiscovery(final AbstractConfiguration 
configInstance, int refreshInterval,
final String vipAddress, final boolean useSecurePort, 
final boolean useHostNames) {
return executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
List bootstrapServers = 
getBootstrapServer(configInstance, vipAddress, useSecurePort,
useHostNames);
String oldBootstrapServerString = 
config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
logger.info("New bootstrap servers 
obtained from registry server are " + bootstrapServers
+ ", old bootstrap 
server are " + oldBootstrapServerString);
boolean isChanged = 
checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString);
if (isChanged) {
String bootstrapServerString = 
bootstrapServersStr(bootstrapServers);

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString);
logger.info(
"Closing 
connection to oldBootstrapServerString [" + oldBootstrapServerString + "].");
close();
streams = new 
KafkaStreams(buildTopology(config), config);
logger.info("cleaning up 
oldBootstrapServerString [" + oldBootstrapServerString + "].");
streams.cleanUp();
start();
logger.info("Completed restart 
of kafka streams connection to new broker with configuration "
+ config);
}
} catch (Throwable ex) {
logger.error("discovery of kafka broker 
instances failed with reason : " + ex.getMessage()
+ ", will retry again", 
ex);
}
}

}, 0, refreshInterval, TimeUnit.MINUTES);
}

{code}


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-06-30 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070481#comment-16070481
 ] 

Yogesh BG commented on KAFKA-5545:
--

My application doesn't recieve msgs from broker. After sometime this
exception stops but consumer is stuck




> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Critical
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-06-30 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069923#comment-16069923
 ] 

Yogesh BG commented on KAFKA-5545:
--

11:03:08.216 [pool-1-thread-1] INFO  o.a.kafka.common.utils.AppInfoParser - 
Kafka version : 0.10.2.1
11:03:08.216 [pool-1-thread-1] INFO  o.a.kafka.common.utils.AppInfoParser - 
Kafka commitId : e89bffd6b2eff799
11:03:08.216 [pool-1-thread-1] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-48] State transition from NOT_RUNNING to RUNNING.
11:03:08.216 [pool-1-thread-1] INFO  n.a.a.k.t.KStreamTopologyBase - cleanup 
the kafka streams : oldBootstrap[10.16.0.38:9092] 
newBootStrap[10.16.0.39:9092].CREATED
11:03:08.218 [pool-1-thread-1] INFO  n.a.a.k.t.KStreamTopologyBase - cleanedup 
the kafka streams : oldBootstrap[10.16.0.38:9092] 
newBootStrap[10.16.0.39:9092].CREATED
11:03:08.218 [pool-1-thread-1] INFO  n.a.a.k.t.KStreamTopologyBase - Completed 
restart of kafka streams connection to new broker with configuration 
{pr-metric-enrichment-proc.schedule_interval=5000, 
metrics-store-proc.trackFailed=true, commit.interval.ms=1, 
metrics-store-proc.accumulateBatchRequests=false, 
pr-metric-kstream-source.useSecurePort=false, 
metrics-store-proc.maxBatchSizeBytes=5242880, poll.ms=100, 
pr-metric-kstream-source.vipAddress=metrics-ingestor.analytics.default.16012014.appcito.net,
 metrics-store-proc.schedule_interval=5000, application.id=rtp-kafkastreams, 
metrics-store-proc.useSecurePort=false, 
pr-metric-enrichment-proc.skipKStreamProcess=false, value.serde=class 
org.apache.kafka.common.serialization.Serdes$ByteBufferSerde, 
metrics-store-proc.shutdownAwaitSecs=300, state.dir=/data/kstreams, 
metrics-store-proc.numExecuteConcurrency=16, 
metrics-store-proc.separateDataStoreThread=true, 
metrics-store-proc.maxBatchPendingFactor=12, auto.offset.reset=latest, 
metrics-store-proc.retryFailed=false, metrics-store-proc.maxBatchSize=5000, 
bootstrap.servers=10.16.0.39:9092, max.poll.records=100, 
session.timeout.ms=30, pr-metric-kstream-source.kafka.queue=PR, 
client.id=ks_0_inst, 
metrics-store-proc.elastic.search.cluster=metrics-datastore, 
heartbeat.interval.ms=6, num.stream.threads=16, 
metrics-store-proc.writeTimeoutSecs=30, key.serde=class 
org.apache.kafka.common.serialization.Serdes$StringSerde, 
metrics-store-proc.vipAddress=metrics-store.analytics.default.16012014.appcito.net,
 num.standby.replicas=1, metrics-store-proc.maxBatchInterval=5, 
metrics-store-proc.logFailed=true}
11:03:08.325 [Thread-6] INFO  o.apache.kafka.streams.KafkaStreams - 
stream-client [ks_0_inst] State transition from CREATED to RUNNING.
11:03:08.333 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-33] Starting
11:03:08.333 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-34] Starting
11:03:08.350 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-36] Starting
11:03:08.351 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-37] Starting
11:03:08.351 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-38] Starting
11:03:08.351 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-39] Starting
11:03:08.351 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-40] Starting
11:03:08.351 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-35] Starting
11:03:08.354 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-41] Starting
11:03:08.354 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-42] Starting
11:03:08.355 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-43] Starting
11:03:08.355 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-44] Starting
11:03:08.355 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-45] Starting
11:03:08.355 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-46] Starting
11:03:08.355 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-47] Starting
11:03:08.355 [Thread-6] INFO  o.apache.kafka.streams.KafkaStreams - 
stream-client [ks_0_inst] Started Kafka Stream process
11:03:08.360 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-48] Starting
11:03:08.363 [StreamThread-33] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Discovered coordinator 10.16.0.39:9092 (id: 2147483647 rack: null) for group 
rtp-kafkastreams.
11:03:08.375 [StreamThread-33] INFO  o.a.k.c.c.i.ConsumerCoordinator - Revoking 
previously assigned partitions [] for group rtp-kafkastreams
11:03:08.375 

[jira] [Created] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-06-30 Thread Yogesh BG (JIRA)
Yogesh BG created KAFKA-5545:


 Summary: Kafka Stream not able to successfully restart over new 
broker ip
 Key: KAFKA-5545
 URL: https://issues.apache.org/jira/browse/KAFKA-5545
 Project: Kafka
  Issue Type: Bug
Reporter: Yogesh BG
Priority: Critical


Hi

I have one kafka broker and one kafka stream application

initially kafka stream connected and starts processing data. Then i restart the 
broker. When broker restarts new ip will be assigned.

In kafka stream i have a 5min interval thread which checks if broker ip changed 
and if changed, we cleanup the stream, rebuild topology(tried with reusing 
topology) and start the stream again. I end up with the following exceptions.

11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
partitions [PR-5]
11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
partitions [PR-1]
11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
partitions [PR-7]
11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
partitions [PR-3]
11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
partitions [PR-0]
11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
partitions [PR-4]
11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
partitions [PR-6]
11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
partitions [PR-2]
11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
not create task 0_5. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
state directory for task 0_5
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
 ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
 [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
at