try {
221 writer = Files.newBufferedWriter(file, charset,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
223 writer.write(firstline, 0, firstline.length());
224 writer.flush();
225 } catch (IOException x) {
226 System.err.format("prepare() - IOException: %s%n", x);
227 }
You forgot to close your writer. Or use try-catch with resources for writer to
be closed automatically.
Thanks,
Michael Kazansky
From: Yang Ruoxue [mailto:[email protected]]
Sent: Tuesday, June 28, 2016 9:44 AM
To: [email protected]
Subject: Bolt keep shutting down
Hi all,
Any one can help me on my problem?
I setup a 3 workers cluster on 4 nodes with Storm 1.0.0. My jobs is
generating messages with a increment id and the timestamp when it was generated
in spout and calculating the delay and throughput , and then push the result
to local file in a bolt. However, my bolt is keeping shut down and restarting
on another node(from node A to B to A to B to ...) once submitted to the
cluster.
log here:
562 2016-06-28 19:10:16.352 o.a.s.d.worker [INFO] Worker
3f9fa92d-e386-4f55-aed6-4044400c47f4 for storm ref-2-1467110596 on
2d6dc803-4242-4de3-82d6-902d36f47d98:6703 has finished l oading
563 2016-06-28 19:10:17.001 o.a.s.d.worker [INFO] All connections are ready
for worker 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 with id
3f9fa92d-e386-4f55-aed6-4044400c47f4
564 2016-06-28 19:10:17.029 o.a.s.d.executor [INFO] Preparing bolt simple:(2)
565 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Prepared bolt simple:(2)
566 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Preparing bolt
__system:(-1)
567 2016-06-28 19:10:17.048 o.a.s.d.executor [INFO] Prepared bolt __system:(-1)
568 2016-06-28 19:10:55.211 o.a.s.s.o.a.z.ClientCnxn [INFO] Client session
timed out, have not heard from server in 13702ms for sessionid
0x2558b61a1be0057, closing socket connecti on and attempting reconnect
569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC
overhead limit exceeded
570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump
...
571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump:
File exists
572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO]
State change: SUSPENDED
573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket
connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to
authenticate using SASL ( unknown error)
574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection
established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session
575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received
event :disconnected::none: with disconnected Writer Zookeeper.
576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session
establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181,
sessionid = 0x2558b61a1be0057, negotiate d timeout = 20000
577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO]
State change: RECONNECTED
578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors
in handling the request
579 java.lang.OutOfMemoryError: GC overhead limit exceeded
580 at java.util.Arrays.copyOf(Arrays.java:2219) ~[?:1.7.0_40]
581 at java.util.ArrayList.grow(ArrayList.java:242) ~[?:1.7.0_40]
582 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
~[?:1.7.0_40]
583 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
~[?:1.7.0_40]
584 at java.util.ArrayList.add(ArrayList.java:440) ~[?:1.7.0_40]
585 at org.apache.storm.utils.ListDelegate.add(ListDelegate.java:73)
~[storm-core-1.0.0.jar:1.0.0]
586 at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
~[kryo-3.0.3.jar:?]
587 at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
~[kryo-3.0.3.jar:?]
588 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:689)
~[kryo-3.0.3.jar:?]
589 at
org.apache.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:37)
~[storm-core-1.0.0.jar:1.0.0]
590 at
org.apache.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:50)
~[storm-core-1.0.0.jar:1.0.0]
591 at
org.apache.storm.messaging.DeserializingConnectionCallback.recv(DeserializingConnectionCallback.java:56)
~[storm-core-1.0.0.jar:1.0.0]
592 at
org.apache.storm.messaging.netty.Server.enqueue(Server.java:133)
~[storm-core-1.0.0.jar:1.0.0]
593 at
org.apache.storm.messaging.netty.Server.received(Server.java:254)
~[storm-core-1.0.0.jar:1.0.0]
594 at
org.apache.storm.messaging.netty.StormServerHandler.messageReceived(StormServerHandler.java:61)
~[storm-core-1.0.0.jar:1.0.0]
595 at
org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
~[storm-core-1.0.0.jar:1.0.0]
596 at
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
~[storm-core-1.0.0.jar:1.0.0]
597 at
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
~[storm-core-1.0.0.j ar:1.0.0]
598 at
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
~[storm-core-1.0.0.jar:1.0.0]
599 at
org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
~[storm-core-1.0.0.jar:1.0.0]
600 at
org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
~[storm-core-1.0.0.jar:1.0.0]
601
602 at
org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
~[storm-core-1.0.0.jar:1.0.0]
603 at
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
~[storm-core-1.0.0.jar:1.0.0]
604 at
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
~[storm-core-1.0.0.jar:1.0.0]
605 at
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
~[storm-core-1.0.0.jar:1.0.0]
606 at
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
~[storm-core-1.0.0.jar:1.0.0]
607 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
~[storm-core-1.0.0.jar:1.0.0]
608 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
~[storm-core-1.0.0.jar:1.0.0]
609 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
~[storm-core-1.0.0.jar:1.0.0]
610 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
~[storm-core-1.0.0.jar:1.0.0]
611 at
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
~[storm-core-1.0.0.jar:1.0.0]
612 2016-06-28 19:11:01.557 STDIO [ERROR] Halting due to Out Of Memory
Error...Netty-server-localhost-6703-worker-1
And codes here:
83 public static class BaseBolt extends BaseRichBolt {
184
185 OutputCollector collector;
186
187 TopologyContext context;
188 String name; // id
189 int executor;
190
191 BufferedWriter writer = null;
192 long filestamp;
193
194 private long count = 0;
195 long begin;
196 long last;
197 float delay_sum = 0;
198
199 public BaseBolt(long stp){
200 filestamp = stp;
201 }
202
203 @Override
204 public void declareOutputFields(OutputFieldsDeclarer declarer) {
205 declarer.declare(new Fields("msgid", "stamp"));
206 } // declareOutputFields
207
208 @Override
209 public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
210
211 this.collector = collector;
212
213 this.context = context;
214 name = context.getThisComponentId();
215 executor = context.getThisTaskIndex();
216
217 Path file = Paths.get(System.getProperty("user.home"), "ref" +
filestamp + name + executor);
218 Charset charset = Charset.forName("US-ASCII");
219 String firstline =
"--------------------------------------------------\nname executor elapsed(ms)
elapsed_sec(ms) count(tuples) delay(ms) avg_delay rate(tuple/s)\n";
220 try {
221 writer = Files.newBufferedWriter(file, charset,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
223 writer.write(firstline, 0, firstline.length());
224 writer.flush();
225 } catch (IOException x) {
226 System.err.format("prepare() - IOException: %s%n", x);
227 }
228
229 begin = System.currentTimeMillis();
230 last = begin;
231 } // prepare
232
233 @Override
234 public void cleanup() {
235 long end = System.currentTimeMillis();
236 System.out.println("whole_elapased_time: "+(end - begin)/ARY);
237 try{
238 writer.close();
239 } catch (IOException x) {
240 System.err.format("cleanup() - IOException: %s%n", x);
241 }
242 } // cleanup
243
244 public long getMsgId(Tuple tuple) {
245 return tuple.getLongByField("msgid").longValue();
246 }
247
248 public void baseExecution(long msgid, long stamp){
249 collector.emit(new Values(msgid, stamp));
250
251 // count;
252 count++;
253
254 // dealy;
255 long crt = System.currentTimeMillis();
256 float delay = (crt - stamp) ;
257 // avg_delay;
258 delay_sum += delay ;
259 float avg_delay = delay_sum / count;
260
261 long interval = (crt - last);
262 float interval_sec = (crt - last) / ARY;
263 if(interval - INTERVAL*ARY >= 0){
264
265 // elapsed
266 long elapsed = (crt - begin); // ms
267 float elapsed_sec = (crt - begin)/ARY;
268 // rate
269 float rate = count / elapsed_sec; // tuple/s
270
271 // output
272 String s = name + " " + executor + " " + elapsed + " " +
elapsed_sec + " " + count + " " + delay + " " + avg_delay + " " + rate + "\n";
273 try {
274 writer.write(s, 0, s.length());
275 writer.flush(); // XXX !!!
276 } catch (IOException x) {
277 System.err.format("IOException: %s%n", x);
278 }
279
280 last = crt;
281 } // if
282 } // baseExecution
283
284 @Override
285 public void execute(Tuple tuple) {
286
287 long msgid = getMsgId(tuple);
288 long stamp = tuple.getLongByField("stamp").longValue();
289 baseExecution(msgid,stamp);
290
291 } // execute
292
293 } // BaseBolt
Thanks for any help!
yesimsure
This communication is for informational purposes only. It is not intended as
an offer or solicitation for the purchase or sale of any financial instrument
or as an official confirmation of any transaction. All market prices, data and
other information are not warranted as to completeness or accuracy and are
subject to change without notice. Any comments or statements made herein do
not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and
affiliates (collectively, "JPMC").
This transmission may contain information that is proprietary, privileged,
confidential and/or exempt from disclosure under applicable law. If you are
not the intended recipient, you are hereby notified that any disclosure,
copying, distribution, or use of the information contained herein (including
any reliance thereon) is STRICTLY PROHIBITED. If you received this
transmission in error, please immediately contact the sender and destroy the
material in its entirety, whether in electronic or hard copy format. Although
this transmission and any attachments are believed to be free of any virus or
other defect that might affect any computer system into which it is received
and opened, it is the responsibility of the recipient to ensure that it is
virus free and no responsibility is accepted by JPMC for any loss or damage
arising in any way from its use. Please note that any electronic communication
that is conducted within or through JPMC's systems is subject to interception,
monitoring, review, retention and external production in accordance with JPMC's
policy and local laws, rules and regulations; may be stored or otherwise
processed in countries other than the country in which you are located; and
will be treated in accordance with JPMC policies and applicable laws and
regulations.
Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures
relating to European legal entities.