Hi all,
Any one can help me on my problem?
I setup a 3 workers cluster on 4 nodes with Storm 1.0.0. My jobs is
generating messages with a increment id and the timestamp when it was generated
in spout and calculating the delay and throughput , and then push the result
to local file in a bolt. However, my bolt is keeping shut down and restarting
on another node(from node A to B to A to B to …) once submitted to the cluster.
log here:
562 2016-06-28 19:10:16.352 o.a.s.d.worker [INFO] Worker
3f9fa92d-e386-4f55-aed6-4044400c47f4 for storm ref-2-1467110596 on
2d6dc803-4242-4de3-82d6-902d36f47d98:6703 has finished l oading
563 2016-06-28 19:10:17.001 o.a.s.d.worker [INFO] All connections are ready
for worker 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 with id
3f9fa92d-e386-4f55-aed6-4044400c47f4
564 2016-06-28 19:10:17.029 o.a.s.d.executor [INFO] Preparing bolt simple:(2)
565 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Prepared bolt simple:(2)
566 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Preparing bolt
__system:(-1)
567 2016-06-28 19:10:17.048 o.a.s.d.executor [INFO] Prepared bolt __system:(-1)
568 2016-06-28 19:10:55.211 o.a.s.s.o.a.z.ClientCnxn [INFO] Client session
timed out, have not heard from server in 13702ms for sessionid
0x2558b61a1be0057, closing socket connecti on and attempting reconnect
569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC
overhead limit exceeded
570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump
...
571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump:
File exists
572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO]
State change: SUSPENDED
573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket
connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to
authenticate using SASL ( unknown error)
574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection
established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session
575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received
event :disconnected::none: with disconnected Writer Zookeeper.
576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session
establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181,
sessionid = 0x2558b61a1be0057, negotiate d timeout = 20000
577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO]
State change: RECONNECTED
578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors
in handling the request
579 java.lang.OutOfMemoryError: GC overhead limit exceeded
580 at java.util.Arrays.copyOf(Arrays.java:2219) ~[?:1.7.0_40]
581 at java.util.ArrayList.grow(ArrayList.java:242) ~[?:1.7.0_40]
582 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
~[?:1.7.0_40]
583 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
~[?:1.7.0_40]
584 at java.util.ArrayList.add(ArrayList.java:440) ~[?:1.7.0_40]
585 at org.apache.storm.utils.ListDelegate.add(ListDelegate.java:73)
~[storm-core-1.0.0.jar:1.0.0]
586 at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
~[kryo-3.0.3.jar:?]
587 at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
~[kryo-3.0.3.jar:?]
588 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:689)
~[kryo-3.0.3.jar:?]
589 at
org.apache.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:37)
~[storm-core-1.0.0.jar:1.0.0]
590 at
org.apache.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:50)
~[storm-core-1.0.0.jar:1.0.0]
591 at
org.apache.storm.messaging.DeserializingConnectionCallback.recv(DeserializingConnectionCallback.java:56)
~[storm-core-1.0.0.jar:1.0.0]
592 at
org.apache.storm.messaging.netty.Server.enqueue(Server.java:133)
~[storm-core-1.0.0.jar:1.0.0]
593 at
org.apache.storm.messaging.netty.Server.received(Server.java:254)
~[storm-core-1.0.0.jar:1.0.0]
594 at
org.apache.storm.messaging.netty.StormServerHandler.messageReceived(StormServerHandler.java:61)
~[storm-core-1.0.0.jar:1.0.0]
595 at
org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
~[storm-core-1.0.0.jar:1.0.0]
596 at
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
~[storm-core-1.0.0.jar:1.0.0]
597 at
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
~[storm-core-1.0.0.j ar:1.0.0]
598 at
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
~[storm-core-1.0.0.jar:1.0.0]
599 at
org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
~[storm-core-1.0.0.jar:1.0.0]
600 at
org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
~[storm-core-1.0.0.jar:1.0.0]
601
602 at
org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
~[storm-core-1.0.0.jar:1.0.0]
603 at
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
~[storm-core-1.0.0.jar:1.0.0]
604 at
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
~[storm-core-1.0.0.jar:1.0.0]
605 at
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
~[storm-core-1.0.0.jar:1.0.0]
606 at
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
~[storm-core-1.0.0.jar:1.0.0]
607 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
~[storm-core-1.0.0.jar:1.0.0]
608 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
~[storm-core-1.0.0.jar:1.0.0]
609 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
~[storm-core-1.0.0.jar:1.0.0]
610 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
~[storm-core-1.0.0.jar:1.0.0]
611 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
~[storm-core-1.0.0.jar:1.0.0]
612 2016-06-28 19:11:01.557 STDIO [ERROR] Halting due to Out Of Memory
Error...Netty-server-localhost-6703-worker-1
And codes here:
83 public static class BaseBolt extends BaseRichBolt {
184
185 OutputCollector collector;
186
187 TopologyContext context;
188 String name; // id
189 int executor;
190
191 BufferedWriter writer = null;
192 long filestamp;
193
194 private long count = 0;
195 long begin;
196 long last;
197 float delay_sum = 0;
198
199 public BaseBolt(long stp){
200 filestamp = stp;
201 }
202
203 @Override
204 public void declareOutputFields(OutputFieldsDeclarer declarer) {
205 declarer.declare(new Fields("msgid", "stamp"));
206 } // declareOutputFields
207
208 @Override
209 public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
210
211 this.collector = collector;
212
213 this.context = context;
214 name = context.getThisComponentId();
215 executor = context.getThisTaskIndex();
216
217 Path file = Paths.get(System.getProperty("user.home"), "ref" +
filestamp + name + executor);
218 Charset charset = Charset.forName("US-ASCII");
219 String firstline =
"--------------------------------------------------\nname executor elapsed(ms)
elapsed_sec(ms) count(tuples) delay(ms) avg_delay rate(tuple/s)\n";
220 try {
221 writer = Files.newBufferedWriter(file, charset,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
223 writer.write(firstline, 0, firstline.length());
224 writer.flush();
225 } catch (IOException x) {
226 System.err.format("prepare() - IOException: %s%n", x);
227 }
228
229 begin = System.currentTimeMillis();
230 last = begin;
231 } // prepare
232
233 @Override
234 public void cleanup() {
235 long end = System.currentTimeMillis();
236 System.out.println("whole_elapased_time: "+(end - begin)/ARY);
237 try{
238 writer.close();
239 } catch (IOException x) {
240 System.err.format("cleanup() - IOException: %s%n", x);
241 }
242 } // cleanup
243
244 public long getMsgId(Tuple tuple) {
245 return tuple.getLongByField("msgid").longValue();
246 }
247
248 public void baseExecution(long msgid, long stamp){
249 collector.emit(new Values(msgid, stamp));
250
251 // count;
252 count++;
253
254 // dealy;
255 long crt = System.currentTimeMillis();
256 float delay = (crt - stamp) ;
257 // avg_delay;
258 delay_sum += delay ;
259 float avg_delay = delay_sum / count;
260
261 long interval = (crt - last);
262 float interval_sec = (crt - last) / ARY;
263 if(interval - INTERVAL*ARY >= 0){
264
265 // elapsed
266 long elapsed = (crt - begin); // ms
267 float elapsed_sec = (crt - begin)/ARY;
268 // rate
269 float rate = count / elapsed_sec; // tuple/s
270
271 // output
272 String s = name + " " + executor + " " + elapsed + " " +
elapsed_sec + " " + count + " " + delay + " " + avg_delay + " " + rate + "\n";
273 try {
274 writer.write(s, 0, s.length());
275 writer.flush(); // XXX !!!
276 } catch (IOException x) {
277 System.err.format("IOException: %s%n", x);
278 }
279
280 last = crt;
281 } // if
282 } // baseExecution
283
284 @Override
285 public void execute(Tuple tuple) {
286
287 long msgid = getMsgId(tuple);
288 long stamp = tuple.getLongByField("stamp").longValue();
289 baseExecution(msgid,stamp);
290
291 } // execute
292
293 } // BaseBolt
Thanks for any help!
yesimsure