Hi Michael,

Actually, I’m comfused where to I/O.
I'm wondering about openning the BufferWriter in the bolt’s prepare method, and 
writing in execute method, then closing it in bolt close's method. But bolt' 
close method is not guaranteed to be called.
Then consider flushing and closing the bufferwriter periodically in execute 
method or in another thread, while both of the 2 way(time-waste operation in 
execute and new thread myself in a component) are not recommended in storm.
I’m really comfused where to I/O.
________________________________
发件人: Kazansky, Michael <[email protected]>
发送时间: 2016年6月28日 21:47:48
收件人: [email protected]
主题: RE: Bolt keep shutting down


try {

221               writer = Files.newBufferedWriter(file, charset, 
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);

223               writer.write(firstline, 0, firstline.length());

224               writer.flush();

225         } catch (IOException x) {

226               System.err.format("prepare() - IOException: %s%n", x);

227         }

You forgot to close your writer. Or use try-catch with resources for writer to 
be closed automatically.

Thanks,
Michael Kazansky

From: Yang Ruoxue [mailto:[email protected]]
Sent: Tuesday, June 28, 2016 9:44 AM
To: [email protected]
Subject: Bolt keep shutting down


Hi all,

    Any one can help me on my problem?
    I setup a 3 workers cluster on 4 nodes with Storm 1.0.0. My jobs is 
generating messages with a increment id and the timestamp when it was generated 
in spout and  calculating the delay and throughput , and then push the result 
to local file in a bolt. However, my bolt is keeping shut down and restarting 
on another node(from node A to B to A to B to …) once submitted to  the cluster.

log here:


 562 2016-06-28 19:10:16.352 o.a.s.d.worker [INFO] Worker 
3f9fa92d-e386-4f55-aed6-4044400c47f4 for storm ref-2-1467110596 on 
2d6dc803-4242-4de3-82d6-902d36f47d98:6703 has finished l     oading

 563 2016-06-28 19:10:17.001 o.a.s.d.worker [INFO] All connections are ready 
for worker 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 with id 
3f9fa92d-e386-4f55-aed6-4044400c47f4

 564 2016-06-28 19:10:17.029 o.a.s.d.executor [INFO] Preparing bolt simple:(2)

 565 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Prepared bolt simple:(2)

 566 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Preparing bolt 
__system:(-1)

 567 2016-06-28 19:10:17.048 o.a.s.d.executor [INFO] Prepared bolt __system:(-1)

 568 2016-06-28 19:10:55.211 o.a.s.s.o.a.z.ClientCnxn [INFO] Client session 
timed out, have not heard from server in 13702ms for sessionid 
0x2558b61a1be0057, closing socket connecti     on and attempting reconnect

 569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC 
overhead limit exceeded

 570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump 
...

 571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump: 
File exists

 572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] 
State change: SUSPENDED

 573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket 
connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to 
authenticate using SASL (     unknown error)

 574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection 
established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session

 575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received 
event :disconnected::none: with disconnected Writer Zookeeper.

 576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session 
establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181, 
sessionid = 0x2558b61a1be0057, negotiate     d timeout = 20000

 577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] 
State change: RECONNECTED

 578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors 
in handling the request

 579 java.lang.OutOfMemoryError: GC overhead limit exceeded

 580         at java.util.Arrays.copyOf(Arrays.java:2219) ~[?:1.7.0_40]

 581         at java.util.ArrayList.grow(ArrayList.java:242) ~[?:1.7.0_40]

 582         at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) 
~[?:1.7.0_40]

 583         at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) 
~[?:1.7.0_40]

 584         at java.util.ArrayList.add(ArrayList.java:440) ~[?:1.7.0_40]

 585         at org.apache.storm.utils.ListDelegate.add(ListDelegate.java:73) 
~[storm-core-1.0.0.jar:1.0.0]

 586         at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
 ~[kryo-3.0.3.jar:?]

 587         at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
 ~[kryo-3.0.3.jar:?]

 588         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:689) 
~[kryo-3.0.3.jar:?]

 589         at 
org.apache.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:37)
 ~[storm-core-1.0.0.jar:1.0.0]

 590         at 
org.apache.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:50)
 ~[storm-core-1.0.0.jar:1.0.0]

 591         at 
org.apache.storm.messaging.DeserializingConnectionCallback.recv(DeserializingConnectionCallback.java:56)
 ~[storm-core-1.0.0.jar:1.0.0]

 592         at 
org.apache.storm.messaging.netty.Server.enqueue(Server.java:133) 
~[storm-core-1.0.0.jar:1.0.0]

 593         at 
org.apache.storm.messaging.netty.Server.received(Server.java:254) 
~[storm-core-1.0.0.jar:1.0.0]

 594         at 
org.apache.storm.messaging.netty.StormServerHandler.messageReceived(StormServerHandler.java:61)
 ~[storm-core-1.0.0.jar:1.0.0]

 595         at 
org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
 ~[storm-core-1.0.0.jar:1.0.0]



 596         at 
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
 ~[storm-core-1.0.0.jar:1.0.0]

 597         at 
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
 ~[storm-core-1.0.0.j     ar:1.0.0]

 598         at 
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
 ~[storm-core-1.0.0.jar:1.0.0]

 599         at 
org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
 ~[storm-core-1.0.0.jar:1.0.0]

 600         at 
org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
 ~[storm-core-1.0.0.jar:1.0.0]

 601

 602         at 
org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
 ~[storm-core-1.0.0.jar:1.0.0]

 603         at 
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
 ~[storm-core-1.0.0.jar:1.0.0]

 604         at 
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
 ~[storm-core-1.0.0.jar:1.0.0]

 605         at 
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
 ~[storm-core-1.0.0.jar:1.0.0]

 606         at 
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
 ~[storm-core-1.0.0.jar:1.0.0]

 607         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
 ~[storm-core-1.0.0.jar:1.0.0]

 608         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
 ~[storm-core-1.0.0.jar:1.0.0]

 609         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
 ~[storm-core-1.0.0.jar:1.0.0]

 610         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
 ~[storm-core-1.0.0.jar:1.0.0]

 611         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
 ~[storm-core-1.0.0.jar:1.0.0]

 612 2016-06-28 19:11:01.557 STDIO [ERROR] Halting due to Out Of Memory 
Error...Netty-server-localhost-6703-worker-1

And codes here:

83   public static class BaseBolt extends BaseRichBolt {

184

185       OutputCollector collector;

186

187       TopologyContext context;

188       String name; // id

189       int executor;

190

191       BufferedWriter writer = null;

192       long filestamp;

193

194       private long count = 0;

195       long begin;

196       long last;

197       float delay_sum = 0;

198

199     public BaseBolt(long stp){

200       filestamp = stp;

201     }

202

203     @Override

204     public void declareOutputFields(OutputFieldsDeclarer declarer) {

205        declarer.declare(new Fields("msgid", "stamp"));

206     } // declareOutputFields

207

208     @Override

209     public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {

210

211         this.collector = collector;

212

213         this.context = context;

214         name = context.getThisComponentId();

215         executor = context.getThisTaskIndex();

216

217         Path file = Paths.get(System.getProperty("user.home"), "ref" + 
filestamp + name + executor);

218         Charset charset = Charset.forName("US-ASCII");

219         String firstline = 
"--------------------------------------------------\nname executor elapsed(ms) 
elapsed_sec(ms) count(tuples) delay(ms) avg_delay rate(tuple/s)\n";

220         try {

221               writer = Files.newBufferedWriter(file, charset, 
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);

223               writer.write(firstline, 0, firstline.length());

224               writer.flush();

225         } catch (IOException x) {

226               System.err.format("prepare() - IOException: %s%n", x);

227         }

228

229         begin = System.currentTimeMillis();

230         last = begin;

231     } // prepare

232

233     @Override

234     public void cleanup() {

235       long end = System.currentTimeMillis();

236       System.out.println("whole_elapased_time: "+(end - begin)/ARY);

237       try{

238         writer.close();

239       } catch (IOException x) {

240             System.err.format("cleanup() - IOException: %s%n", x);

241       }

242     } // cleanup

243

244     public long getMsgId(Tuple tuple) {

245         return tuple.getLongByField("msgid").longValue();

246     }

247

248     public void baseExecution(long msgid, long stamp){

249         collector.emit(new Values(msgid, stamp));

250

251         // count;

252         count++;

253

254         // dealy;

255         long crt = System.currentTimeMillis();

256         float delay = (crt - stamp) ;

257         // avg_delay;

258         delay_sum += delay ;

259         float avg_delay = delay_sum / count;

260

261         long interval = (crt - last);

262         float interval_sec = (crt - last) / ARY;

263         if(interval - INTERVAL*ARY >= 0){

264

265           // elapsed

266           long elapsed = (crt - begin); // ms

267           float elapsed_sec = (crt - begin)/ARY;

268           // rate

269           float rate = count / elapsed_sec; // tuple/s

270

271           // output

272           String s = name + " " + executor + " " + elapsed + " " + 
elapsed_sec + " "  + count + " " + delay + " " + avg_delay + " " + rate + "\n";

273           try {

274                 writer.write(s, 0, s.length());

275                 writer.flush(); // XXX !!!

276           } catch (IOException x) {

277                 System.err.format("IOException: %s%n", x);

278           }

279

280           last = crt;

281         }  // if

282     } // baseExecution

283

284     @Override

285     public void execute(Tuple tuple) {

286

287         long msgid = getMsgId(tuple);

288         long stamp = tuple.getLongByField("stamp").longValue();

289         baseExecution(msgid,stamp);

290

291     } // execute

292

293   } // BaseBolt


Thanks for any help!


yesimsure

This communication is for informational purposes only. It is not intended as an 
offer or solicitation for the purchase or sale of any financial instrument or 
as an official confirmation of any transaction. All market prices, data and 
other information are not warranted as to completeness or accuracy and are 
subject to change without notice. Any comments or statements made herein do not 
necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and 
affiliates (collectively, "JPMC"). This transmission may contain information 
that is proprietary, privileged, confidential and/or exempt from disclosure 
under applicable law. If you are not the intended recipient, you are hereby 
notified that any disclosure, copying, distribution, or use of the information 
contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If 
you received this transmission in error, please immediately contact the sender 
and destroy the material in its entirety, whether in electronic or hard copy 
format. Although this transmission and any attachments are believed to be free 
of any virus or other defect that might affect any computer system into which 
it is received and opened, it is the responsibility of the recipient to ensure 
that it is virus free and no responsibility is accepted by JPMC for any loss or 
damage arising in any way from its use. Please note that any electronic 
communication that is conducted within or through JPMC's systems is subject to 
interception, monitoring, review, retention and external production in 
accordance with JPMC's policy and local laws, rules and regulations; may be 
stored or otherwise processed in countries other than the country in which you 
are located; and will be treated in accordance with JPMC policies and 
applicable laws and regulations. Please refer to 
http://www.jpmorgan.com/pages/disclosures for disclosures relating to European 
legal entities.

Reply via email to