[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942835#comment-15942835 ] Eno Thereska commented on KAFKA-3758: - [~guozhang] reopening since I'm seeing this again as part of new system tesks https://github.com/apache/kafka/pull/2719/files > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15542984#comment-15542984 ] Guozhang Wang commented on KAFKA-3758: -- This issue should have been fixed as part of KIP-62, if Greg or Strong or someone else still see this happening we can re-open it. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433738#comment-15433738 ] Greg Fodor commented on KAFKA-3758: --- Excited to try this, as our state stores have grown, it's become more and more difficult to get jobs to start up without timeouts. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433652#comment-15433652 ] Guozhang Wang commented on KAFKA-3758: -- [~stliu] [~gfodor] With KAFKA-3888 merged into trunk this issue should be fixed automatically, could you help verify if that is the case? > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15364085#comment-15364085 ] Strong Liu commented on KAFKA-3758: --- I run into same problem with the _io.confluent.examples.streams.AnomalyDetectionLambdaExample_ some details to reproduce: * change stream threads to 3 * start two processes > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312913#comment-15312913 ] Greg Fodor commented on KAFKA-3758: --- also, if we did not run at an elevated number of threads, we were hitting that issue due to the timeout happening before all tasks had initialized. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312912#comment-15312912 ] Greg Fodor commented on KAFKA-3758: --- Hey, we're running 16 threads -- for this job we have 25 topics, approx ~350 topic-partitions involved, but for most of the job there isn't much I/O against most of these. Basically we are taking in a very small % of the incoming data at the top of the job and processing it, and discarding most of it early. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312888#comment-15312888 ] Guozhang Wang commented on KAFKA-3758: -- Hi Greg, I have a few more questions that will help me investigating this issue. Would be appreciated if you can remember the answer: 1. How many threads (i.e. the {{num.stream.threads}} config) did you use per each KafkaStreams instance? 2. From your logs it seems you have at least two topic groups, and with at least 82 partitions for the largest topic. How many topic-partitions do you have in total? > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308306#comment-15308306 ] Greg Fodor commented on KAFKA-3758: --- No, the kstream job was running across 2 servers, and the kafka cluster was a 3 node cluster running on separate machines. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308047#comment-15308047 ] Guozhang Wang commented on KAFKA-3758: -- I think there may be an issue about directory locking that results in both this and KAFKA-3752. I will investigate further and update these tickets later. One question: is your Kafka Streams processes running on the same node as Kafka servers, i.e. if you shutdown some Kafka broker does that also kill some running Kafka Streams process? > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305503#comment-15305503 ] Greg Fodor commented on KAFKA-3758: --- Oh, actually, I'm not so sure. This was not during an unclean shutdown, but during a broker rebalance. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305501#comment-15305501 ] Greg Fodor commented on KAFKA-3758: --- Ah yes this looks like the same issue, thanks! > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305108#comment-15305108 ] Guozhang Wang commented on KAFKA-3758: -- Thanks for reporting, and we are aware of this issue: https://issues.apache.org/jira/browse/KAFKA-3752 Is this the same as your issue? > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302542#comment-15302542 ] Greg Fodor commented on KAFKA-3758: --- Also, the log is truncated at the top to the point where we shut the broker off. If there's additional useful information in the log before that you think we could share, happy to attach. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at >