Can you find correlated events between the Storm Worker and the ZooKeeper server? i.e., which logs occurs first? The 2 log dumps you pasted are not from the same time frame.
On Tue, Aug 9, 2016 at 4:01 AM, Rama Nallamilli <[email protected]> wrote: > Hi all, > > We are seeing an issue on one of our environments with a topology that has > a Kafka Spout, when nextTuple is called, we end up getting a > java.nio.ClosedChannelException. > This happens when the worker attempts to ask zookeeper for the kafka > message offset inside KafkaUtils.getOffsetsBefore(…). The stack trace is > below. > > In the zookeeper log, we are seeing zookeeper clean up its open > connections with the reason being “EndOfStreamException: Unable to read > additional data from client sessionid 0x1566ed464ec03ad, likely client has > closed socket” (the client terminated the connection) > > What we have tried: > > * we can telnet from storm worker -> zookeeper 2181 > > * we can ping from zookeeper -> storm worker > > * we have noticed another environment where we have this code deployed > with same setup and it works as expected with no issues. > > Is there some kind of zookeeper connection timeout that is potentially > causing the KafkaUtils to drop the connection or is there some netty > timeout settings we can change that could cause this error? What else can > cause a ClosedChannelException. > > > *Storm Worker Stacktrace:* > > 09.08.2016 10:50:00.405 o.a.s.util [ERROR] Async loop died! > > java.lang.RuntimeException: java.nio.channels.ClosedChannelException > > at > org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) > ~[stormjar.jar:1.22.0] > > at > org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) > ~[stormjar.jar:1.22.0] > > at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) > ~[stormjar.jar:1.22.0] > > at > org.apache.storm.daemon.executor$fn__4954$fn__4969$fn__5000.invoke(executor.clj:645) > ~[storm-core-1.0.1.jar:1.0.1] > > at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) > [storm-core-1.0.1.jar:1.0.1] > > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65] > > Caused by: java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) > ~[stormjar.jar:1.22.0] > > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) > ~[stormjar.jar:1.22.0] > > at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ > $sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:1.22.0] > > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) > ~[stormjar.jar:1.22.0] > > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) > ~[stormjar.jar:1.22.0] > > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) > ~[stormjar.jar:1.22.0] > > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) > ~[stormjar.jar:1.22.0] > > at > org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) > ~[stormjar.jar:1.22.0] > > at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) > ~[stormjar.jar:1.22.0] > > ... 6 more > > 09.08.2016 10:50:00.410 o.a.s.d.executor [ERROR] > > java.lang.RuntimeException: java.nio.channels.ClosedChannelException > > at > org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) > ~[stormjar.jar:1.22.0] > > at > org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) > ~[stormjar.jar:1.22.0] > > at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) > ~[stormjar.jar:1.22.0] > > at > org.apache.storm.daemon.executor$fn__4954$fn__4969$fn__5000.invoke(executor.clj:645) > ~[storm-core-1.0.1.jar:1.0.1] > > at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) > [storm-core-1.0.1.jar:1.0.1] > > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65] > > Caused by: java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) > ~[stormjar.jar:1.22.0] > > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) > ~[stormjar.jar:1.22.0] > > at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ > $sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:1.22.0] > > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) > ~[stormjar.jar:1.22.0] > > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) > ~[stormjar.jar:1.22.0] > > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) > ~[stormjar.jar:1.22.0] > > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) > ~[stormjar.jar:1.22.0] > > at > org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) > ~[stormjar.jar:1.22.0] > > at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) > ~[stormjar.jar:1.22.0] > > ... 6 more > > *Zookeeper stack trace:* > > 8/9/2016 11:52:07 AM2016-08-09 10:52:07,438 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@900] - Client > attempting to establish new session at /10.42.31.113:50348 > > 8/9/2016 11:52:07 AM2016-08-09 10:52:07,439 [myid:] - INFO > [SyncThread:0:ZooKeeperServer@645] - Established session > 0x1566ed464ec03b0 with negotiated timeout 20000 for client / > 10.42.31.113:50348 > > 8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - WARN > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught > end of stream exception > > 8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data > from client sessionid 0x1566ed464ec03ad, likely client has closed socket > > 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO( > NIOServerCnxn.java:230) > > 8/9/2016 11:52:09 AM at org.apache.zookeeper.server. > NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) > > 8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745) > > 8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed > socket connection for client /10.42.31.113:50274 which had sessionid > 0x1566ed464ec03ad > > 8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - WARN > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught > end of stream exception > > 8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data > from client sessionid 0x1566ed464ec03af, likely client has closed socket > > 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO( > NIOServerCnxn.java:230) > > 8/9/2016 11:52:09 AM at org.apache.zookeeper.server. > NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) > > 8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745) > > 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed > socket connection for client /10.42.31.113:50344 which had sessionid > 0x1566ed464ec03af > > 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - WARN > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught > end of stream exception > > 8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data > from client sessionid 0x1566ed464ec03ae, likely client has closed socket > > 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO( > NIOServerCnxn.java:230) > > 8/9/2016 11:52:09 AM at org.apache.zookeeper.server. > NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) > > 8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745) > > 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed > socket connection for client /10.42.31.113:50346 which had sessionid > 0x1566ed464ec03ae > > 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - WARN > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught > end of stream exception > > 8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data > from client sessionid 0x1566ed464ec03b0, likely client has closed socket > > 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO( > NIOServerCnxn.java:230) > > 8/9/2016 11:52:09 AM at org.apache.zookeeper.server. > NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) > > 8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745) > > 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed > socket connection for client /10.42.31.113:50348 which had sessionid > 0x1566ed464ec03b0 > > > Regards > > Rama >
