I would look at other pieces of your code as well as analyze heapdump. Looks like you have memory leak
569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC overhead limit exceeded 570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump ... 571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump: File exists 572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED 573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to authenticate using SASL ( unknown error) 574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session 575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received event :disconnected::none: with disconnected Writer Zookeeper. 576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181, sessionid = 0x2558b61a1be0057, negotiate d timeout = 20000 577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: RECONNECTED 578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors in handling the request 579 java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded the error happens "if too much time is being spent in garbage collection: if more than 98% of the total time is spent in garbage collection and less than 2% of the heap is recovered, an OutOfMemoryError will be thrown.". Try to increase the heap size, via "-Xmx1024m" (or more) Thanks, Michael Kazansky From: Yang Ruoxue [mailto:[email protected]] Sent: Tuesday, June 28, 2016 10:51 AM To: [email protected] Subject: 答复: Bolt keep shutting down Michael, thanks a lot. I change the code and use try-catch with resources, the problem continues. Any other idea? ________________________________ 发件人: Kazansky, Michael <[email protected]<mailto:[email protected]>> 发送时间: 2016年6月28日 21:47:48 收件人: [email protected]<mailto:[email protected]> 主题: RE: Bolt keep shutting down try { 221 writer = Files.newBufferedWriter(file, charset, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND); 223 writer.write(firstline, 0, firstline.length()); 224 writer.flush(); 225 } catch (IOException x) { 226 System.err.format("prepare() - IOException: %s%n", x); 227 } You forgot to close your writer. Or use try-catch with resources for writer to be closed automatically. Thanks, Michael Kazansky From: Yang Ruoxue [mailto:[email protected]] Sent: Tuesday, June 28, 2016 9:44 AM To: [email protected]<mailto:[email protected]> Subject: Bolt keep shutting down Hi all, Any one can help me on my problem? I setup a 3 workers cluster on 4 nodes with Storm 1.0.0. My jobs is generating messages with a increment id and the timestamp when it was generated in spout and calculating the delay and throughput , and then push the result to local file in a bolt. However, my bolt is keeping shut down and restarting on another node(from node A to B to A to B to …) once submitted to the cluster. log here: 562 2016-06-28 19:10:16.352 o.a.s.d.worker [INFO] Worker 3f9fa92d-e386-4f55-aed6-4044400c47f4 for storm ref-2-1467110596 on 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 has finished l oading 563 2016-06-28 19:10:17.001 o.a.s.d.worker [INFO] All connections are ready for worker 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 with id 3f9fa92d-e386-4f55-aed6-4044400c47f4 564 2016-06-28 19:10:17.029 o.a.s.d.executor [INFO] Preparing bolt simple:(2) 565 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Prepared bolt simple:(2) 566 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Preparing bolt __system:(-1) 567 2016-06-28 19:10:17.048 o.a.s.d.executor [INFO] Prepared bolt __system:(-1) 568 2016-06-28 19:10:55.211 o.a.s.s.o.a.z.ClientCnxn [INFO] Client session timed out, have not heard from server in 13702ms for sessionid 0x2558b61a1be0057, closing socket connecti on and attempting reconnect 569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC overhead limit exceeded 570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump ... 571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump: File exists 572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED 573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to authenticate using SASL ( unknown error) 574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session 575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received event :disconnected::none: with disconnected Writer Zookeeper. 576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181, sessionid = 0x2558b61a1be0057, negotiate d timeout = 20000 577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: RECONNECTED 578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors in handling the request 579 java.lang.OutOfMemoryError: GC overhead limit exceeded 580 at java.util.Arrays.copyOf(Arrays.java:2219) ~[?:1.7.0_40] 581 at java.util.ArrayList.grow(ArrayList.java:242) ~[?:1.7.0_40] 582 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) ~[?:1.7.0_40] 583 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) ~[?:1.7.0_40] 584 at java.util.ArrayList.add(ArrayList.java:440) ~[?:1.7.0_40] 585 at org.apache.storm.utils.ListDelegate.add(ListDelegate.java:73) ~[storm-core-1.0.0.jar:1.0.0] 586 at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134) ~[kryo-3.0.3.jar:?] 587 at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?] 588 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:689) ~[kryo-3.0.3.jar:?] 589 at org.apache.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:37) ~[storm-core-1.0.0.jar:1.0.0] 590 at org.apache.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:50) ~[storm-core-1.0.0.jar:1.0.0] 591 at org.apache.storm.messaging.DeserializingConnectionCallback.recv(DeserializingConnectionCallback.java:56) ~[storm-core-1.0.0.jar:1.0.0] 592 at org.apache.storm.messaging.netty.Server.enqueue(Server.java:133) ~[storm-core-1.0.0.jar:1.0.0] 593 at org.apache.storm.messaging.netty.Server.received(Server.java:254) ~[storm-core-1.0.0.jar:1.0.0] 594 at org.apache.storm.messaging.netty.StormServerHandler.messageReceived(StormServerHandler.java:61) ~[storm-core-1.0.0.jar:1.0.0] 595 at org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[storm-core-1.0.0.jar:1.0.0] 596 at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[storm-core-1.0.0.jar:1.0.0] 597 at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[storm-core-1.0.0.j ar:1.0.0] 598 at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[storm-core-1.0.0.jar:1.0.0] 599 at org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[storm-core-1.0.0.jar:1.0.0] 600 at org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[storm-core-1.0.0.jar:1.0.0] 601 602 at org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[storm-core-1.0.0.jar:1.0.0] 603 at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[storm-core-1.0.0.jar:1.0.0] 604 at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[storm-core-1.0.0.jar:1.0.0] 605 at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[storm-core-1.0.0.jar:1.0.0] 606 at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[storm-core-1.0.0.jar:1.0.0] 607 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[storm-core-1.0.0.jar:1.0.0] 608 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[storm-core-1.0.0.jar:1.0.0] 609 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) ~[storm-core-1.0.0.jar:1.0.0] 610 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[storm-core-1.0.0.jar:1.0.0] 611 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[storm-core-1.0.0.jar:1.0.0] 612 2016-06-28 19:11:01.557 STDIO [ERROR] Halting due to Out Of Memory Error...Netty-server-localhost-6703-worker-1 And codes here: 83 public static class BaseBolt extends BaseRichBolt { 184 185 OutputCollector collector; 186 187 TopologyContext context; 188 String name; // id 189 int executor; 190 191 BufferedWriter writer = null; 192 long filestamp; 193 194 private long count = 0; 195 long begin; 196 long last; 197 float delay_sum = 0; 198 199 public BaseBolt(long stp){ 200 filestamp = stp; 201 } 202 203 @Override 204 public void declareOutputFields(OutputFieldsDeclarer declarer) { 205 declarer.declare(new Fields("msgid", "stamp")); 206 } // declareOutputFields 207 208 @Override 209 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 210 211 this.collector = collector; 212 213 this.context = context; 214 name = context.getThisComponentId(); 215 executor = context.getThisTaskIndex(); 216 217 Path file = Paths.get(System.getProperty("user.home"), "ref" + filestamp + name + executor); 218 Charset charset = Charset.forName("US-ASCII"); 219 String firstline = "--------------------------------------------------\nname executor elapsed(ms) elapsed_sec(ms) count(tuples) delay(ms) avg_delay rate(tuple/s)\n"; 220 try { 221 writer = Files.newBufferedWriter(file, charset, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND); 223 writer.write(firstline, 0, firstline.length()); 224 writer.flush(); 225 } catch (IOException x) { 226 System.err.format("prepare() - IOException: %s%n", x); 227 } 228 229 begin = System.currentTimeMillis(); 230 last = begin; 231 } // prepare 232 233 @Override 234 public void cleanup() { 235 long end = System.currentTimeMillis(); 236 System.out.println("whole_elapased_time: "+(end - begin)/ARY); 237 try{ 238 writer.close(); 239 } catch (IOException x) { 240 System.err.format("cleanup() - IOException: %s%n", x); 241 } 242 } // cleanup 243 244 public long getMsgId(Tuple tuple) { 245 return tuple.getLongByField("msgid").longValue(); 246 } 247 248 public void baseExecution(long msgid, long stamp){ 249 collector.emit(new Values(msgid, stamp)); 250 251 // count; 252 count++; 253 254 // dealy; 255 long crt = System.currentTimeMillis(); 256 float delay = (crt - stamp) ; 257 // avg_delay; 258 delay_sum += delay ; 259 float avg_delay = delay_sum / count; 260 261 long interval = (crt - last); 262 float interval_sec = (crt - last) / ARY; 263 if(interval - INTERVAL*ARY >= 0){ 264 265 // elapsed 266 long elapsed = (crt - begin); // ms 267 float elapsed_sec = (crt - begin)/ARY; 268 // rate 269 float rate = count / elapsed_sec; // tuple/s 270 271 // output 272 String s = name + " " + executor + " " + elapsed + " " + elapsed_sec + " " + count + " " + delay + " " + avg_delay + " " + rate + "\n"; 273 try { 274 writer.write(s, 0, s.length()); 275 writer.flush(); // XXX !!! 276 } catch (IOException x) { 277 System.err.format("IOException: %s%n", x); 278 } 279 280 last = crt; 281 } // if 282 } // baseExecution 283 284 @Override 285 public void execute(Tuple tuple) { 286 287 long msgid = getMsgId(tuple); 288 long stamp = tuple.getLongByField("stamp").longValue(); 289 baseExecution(msgid,stamp); 290 291 } // execute 292 293 } // BaseBolt Thanks for any help! yesimsure This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, "JPMC"). This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising in any way from its use. Please note that any electronic communication that is conducted within or through JPMC's systems is subject to interception, monitoring, review, retention and external production in accordance with JPMC's policy and local laws, rules and regulations; may be stored or otherwise processed in countries other than the country in which you are located; and will be treated in accordance with JPMC policies and applicable laws and regulations. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, "JPMC"). This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising in any way from its use. Please note that any electronic communication that is conducted within or through JPMC's systems is subject to interception, monitoring, review, retention and external production in accordance with JPMC's policy and local laws, rules and regulations; may be stored or otherwise processed in countries other than the country in which you are located; and will be treated in accordance with JPMC policies and applicable laws and regulations. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.
