[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145986#comment-16145986 ] Guozhang Wang commented on KAFKA-5545: -- That is right. Note that in {{KafkaStreams}} you have two close functions, one without parameters and one with a timeout. The previous will wait for all the stream threads to shutdown and complete, and in 0.11.0 we have a deadlock issue that is causing this call to stall forever, in 0.11.0.1 we have fixed the deadlock issue so that with the {{close()}} call it should return eventually, but may take long depending on your application's state. I'd encourage you to just download the current 0.11.0 branch and compile locally, and see if it has fixed your issue before the bug-fix is packaged and released. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144147#comment-16144147 ] Yogesh BG commented on KAFKA-5545: -- I have one qn. As part of this release when I call close on streams.it will be shutdown completely right. Because I see in previous version it has got threads invoking connection requests infinitely. Pls confirm me this. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143568#comment-16143568 ] Yogesh BG commented on KAFKA-5545: -- [~guozhang] When this feature is expected to release?I mean date?? > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130171#comment-16130171 ] Yogesh BG commented on KAFKA-5545: -- ok > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16129229#comment-16129229 ] Yogesh BG commented on KAFKA-5545: -- thank you > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Fix For: 0.11.0.1, 1.0.0 > > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16129204#comment-16129204 ] Guozhang Wang commented on KAFKA-5545: -- [~yogeshbelur] We have KAFKA-5152 which summarizes the root cause of this issue, and we already have a fix in the 0.11 branch. It will be included in the 0.11.0.1 bug fix release which is expected to be out soon. Note that upgrading Streams client from 0.10.2.x to 0.11.0.x should be smooth as there is no backward incompatibility changes. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16090111#comment-16090111 ] Guozhang Wang commented on KAFKA-5545: -- Hello [~yogeshbelur] sorry for the late reply. I think I have found the root cause of the issue, which is indeed a deadlock that cause the stream thread to be not able to exit its current loop call and hence not be able to shutdown. More details are the following: 1. When {{stream.close()}} is called, it does nothing but just set a flag in the stream threads and let the shutdown thread to wait until the stream threads have seen this flag and end the while loop and shutdown themselves. 2. There is an edge case, that the stream thread is still in the {{onPartitionAssigned}} callback of the current loop, and is trying to grab the state directory locks in order to create the assigned tasks; however the state directory locks are not released by other threads who have seen a disconnect / request timeout from the brokers since the broker IP has changed within the same JVM, they will retry indefinitely as the rebalance timeout is default to {{Integer.MAX_VALUE}}. So the thread will never complete its current iteration and will not see the shutdown flag and will not shutdown itself. I have started proposing a fix for the state directory deadlock now, but this will probably only in 0.11.0.1 release and backported if we are having an 0.10.2.2 release. As for the current version (0.10.2.1) version you are on. One walkaround is to set the {{ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG}} to a smaller value instead of the default {{Integer.MAX_VALUE}} so that it will not cause a dead lock, and making sure that for whatever this config value is set to, the corresponding {{REQUEST_TIMEOUT_MS_CONFIG}} is set to be larger than it. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087204#comment-16087204 ] Yogesh BG commented on KAFKA-5545: -- Any updates on this issue? > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081860#comment-16081860 ] Yogesh BG commented on KAFKA-5545: -- its not the time out in close. this time is fixed always to one minute. i meant if ip change, close and restart happs with in session timeout. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16081698#comment-16081698 ] Guozhang Wang commented on KAFKA-5545: -- Thanks for the update. So when you mentioned {code} One more thing is if I do close with in connction timeout all goes well. But if I issue close after connection timeout the threads are stuck {code} Did you mean that if you call `close(timeout)` with a timeout parameter you set it to the session timeout value, or mean that the broker ip changes event happens within the session timeout right after the streams app was started that triggers the close / restart of the streams app? > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079842#comment-16079842 ] Yogesh BG commented on KAFKA-5545: -- Sorry its session timeout. session.timeout.ms. we have set it to 30. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075945#comment-16075945 ] Yogesh BG commented on KAFKA-5545: -- Hey setupDiscovery is scheduled thread, having logic to check the ip's of broker has changed or not and then u can see the code i am calling close(), which internally calls stream.close(); You can also see the logs that the close has been triggered. If not called how shutdowns will be initiated? <> _ But from your attached logs it does seems the thread was notified to shutdown but never existed the main loop:_ You should check why shutdown didn't happen. why there are some threads still alive which were part of the previous stream instance once the close has been invoked??? Is there any way i can shutdown the stream completely without restarting the app. BTW restarting application is having its own problem, when i do restart with new broker ip threads are hung, never coming back to process the data. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075061#comment-16075061 ] Yogesh BG commented on KAFKA-5545: -- I see. But what could be the problems in closing the strems. I don't see restarting application a good idea. From log we can see some threads still polling to connect to old ip. We should try closing those threads right. One more thing is if I do close with in connction timeout all goes well. But if I issue close after connection timeout the threads are stuck > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075053#comment-16075053 ] Damian Guy commented on KAFKA-5545: --- As you suggested, it would see that the first KafkaStreams instance hasn't closed successfully as some threads are stuck. You should probably try calling {{boolean KafkaStreams.close(timeout, timeunit)}} and check the return value. If the result is {{false}} then you should probably terminate the application and restart. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072587#comment-16072587 ] Matthias J. Sax commented on KAFKA-5545: Can you do me a favor and attach the log file instead of c into the comments? I quite hard to keep an overview. Thanks a lot! > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072228#comment-16072228 ] Yogesh BG commented on KAFKA-5545: -- Here I found out something that if the stream got closed successfully then its able to re-establish the connection with new ip of the broker and process the data further. But some times what happening is. Previously stream is not getting closed properly. Because some threads are trying to re-establish the connection to the old ip of broker which is not available. And keeps logging DEBUG exceptions. I have attached the debug log. In this situation stream is not processing the data further. Here is the logic used to reestablish the connection. close timeout is 60sec {code:java} private ScheduledFuture setupDiscovery(final AbstractConfiguration configInstance, int refreshInterval, final String vipAddress, final boolean useSecurePort, final boolean useHostNames) { return executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { List bootstrapServers = getBootstrapServer(configInstance, vipAddress, useSecurePort, useHostNames); String oldBootstrapServerString = config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); logger.info("New bootstrap servers obtained from registry server are " + bootstrapServers + ", old bootstrap server are " + oldBootstrapServerString); boolean isChanged = checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString); if (isChanged) { String bootstrapServerString = bootstrapServersStr(bootstrapServers); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString); logger.info( "Closing connection to oldBootstrapServerString [" + oldBootstrapServerString + "]."); close(); streams = new KafkaStreams(buildTopology(config), config); logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString + "]."); streams.cleanUp(); start(); logger.info("Completed restart of kafka streams connection to new broker with configuration " + config); } } catch (Throwable ex) { logger.error("discovery of kafka broker instances failed with reason : " + ex.getMessage() + ", will retry again", ex); } } }, 0, refreshInterval, TimeUnit.MINUTES); } {code} > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070481#comment-16070481 ] Yogesh BG commented on KAFKA-5545: -- My application doesn't recieve msgs from broker. After sometime this exception stops but consumer is stuck > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug >Reporter: Yogesh BG >Priority: Critical > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069923#comment-16069923 ] Yogesh BG commented on KAFKA-5545: -- 11:03:08.216 [pool-1-thread-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1 11:03:08.216 [pool-1-thread-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799 11:03:08.216 [pool-1-thread-1] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-48] State transition from NOT_RUNNING to RUNNING. 11:03:08.216 [pool-1-thread-1] INFO n.a.a.k.t.KStreamTopologyBase - cleanup the kafka streams : oldBootstrap[10.16.0.38:9092] newBootStrap[10.16.0.39:9092].CREATED 11:03:08.218 [pool-1-thread-1] INFO n.a.a.k.t.KStreamTopologyBase - cleanedup the kafka streams : oldBootstrap[10.16.0.38:9092] newBootStrap[10.16.0.39:9092].CREATED 11:03:08.218 [pool-1-thread-1] INFO n.a.a.k.t.KStreamTopologyBase - Completed restart of kafka streams connection to new broker with configuration {pr-metric-enrichment-proc.schedule_interval=5000, metrics-store-proc.trackFailed=true, commit.interval.ms=1, metrics-store-proc.accumulateBatchRequests=false, pr-metric-kstream-source.useSecurePort=false, metrics-store-proc.maxBatchSizeBytes=5242880, poll.ms=100, pr-metric-kstream-source.vipAddress=metrics-ingestor.analytics.default.16012014.appcito.net, metrics-store-proc.schedule_interval=5000, application.id=rtp-kafkastreams, metrics-store-proc.useSecurePort=false, pr-metric-enrichment-proc.skipKStreamProcess=false, value.serde=class org.apache.kafka.common.serialization.Serdes$ByteBufferSerde, metrics-store-proc.shutdownAwaitSecs=300, state.dir=/data/kstreams, metrics-store-proc.numExecuteConcurrency=16, metrics-store-proc.separateDataStoreThread=true, metrics-store-proc.maxBatchPendingFactor=12, auto.offset.reset=latest, metrics-store-proc.retryFailed=false, metrics-store-proc.maxBatchSize=5000, bootstrap.servers=10.16.0.39:9092, max.poll.records=100, session.timeout.ms=30, pr-metric-kstream-source.kafka.queue=PR, client.id=ks_0_inst, metrics-store-proc.elastic.search.cluster=metrics-datastore, heartbeat.interval.ms=6, num.stream.threads=16, metrics-store-proc.writeTimeoutSecs=30, key.serde=class org.apache.kafka.common.serialization.Serdes$StringSerde, metrics-store-proc.vipAddress=metrics-store.analytics.default.16012014.appcito.net, num.standby.replicas=1, metrics-store-proc.maxBatchInterval=5, metrics-store-proc.logFailed=true} 11:03:08.325 [Thread-6] INFO o.apache.kafka.streams.KafkaStreams - stream-client [ks_0_inst] State transition from CREATED to RUNNING. 11:03:08.333 [StreamThread-33] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-33] Starting 11:03:08.333 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-34] Starting 11:03:08.350 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-36] Starting 11:03:08.351 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-37] Starting 11:03:08.351 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-38] Starting 11:03:08.351 [StreamThread-39] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-39] Starting 11:03:08.351 [StreamThread-40] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-40] Starting 11:03:08.351 [StreamThread-35] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-35] Starting 11:03:08.354 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-41] Starting 11:03:08.354 [StreamThread-42] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-42] Starting 11:03:08.355 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-43] Starting 11:03:08.355 [StreamThread-44] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-44] Starting 11:03:08.355 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-45] Starting 11:03:08.355 [StreamThread-46] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-46] Starting 11:03:08.355 [StreamThread-47] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-47] Starting 11:03:08.355 [Thread-6] INFO o.apache.kafka.streams.KafkaStreams - stream-client [ks_0_inst] Started Kafka Stream process 11:03:08.360 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-48] Starting 11:03:08.363 [StreamThread-33] INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator 10.16.0.39:9092 (id: 2147483647 rack: null) for group rtp-kafkastreams. 11:03:08.375 [StreamThread-33] INFO o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [] for group rtp-kafkastreams 11:03:08.375