Hi all,

    Any one can help me on my problem?
    I setup a 3 workers cluster on 4 nodes with Storm 1.0.0. My jobs is 
generating messages with a increment id and the timestamp when it was generated 
in spout and  calculating the delay and throughput , and then push the result 
to local file in a bolt. However, my bolt is keeping shut down and restarting 
on another node(from node A to B to A to B to …) once submitted to  the cluster.

log here:


 562 2016-06-28 19:10:16.352 o.a.s.d.worker [INFO] Worker 
3f9fa92d-e386-4f55-aed6-4044400c47f4 for storm ref-2-1467110596 on 
2d6dc803-4242-4de3-82d6-902d36f47d98:6703 has finished l     oading

 563 2016-06-28 19:10:17.001 o.a.s.d.worker [INFO] All connections are ready 
for worker 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 with id 
3f9fa92d-e386-4f55-aed6-4044400c47f4

 564 2016-06-28 19:10:17.029 o.a.s.d.executor [INFO] Preparing bolt simple:(2)

 565 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Prepared bolt simple:(2)

 566 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Preparing bolt 
__system:(-1)

 567 2016-06-28 19:10:17.048 o.a.s.d.executor [INFO] Prepared bolt __system:(-1)

 568 2016-06-28 19:10:55.211 o.a.s.s.o.a.z.ClientCnxn [INFO] Client session 
timed out, have not heard from server in 13702ms for sessionid 
0x2558b61a1be0057, closing socket connecti     on and attempting reconnect

 569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC 
overhead limit exceeded

 570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump 
...

 571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump: 
File exists

 572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] 
State change: SUSPENDED

 573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket 
connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to 
authenticate using SASL (     unknown error)

 574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection 
established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session

 575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received 
event :disconnected::none: with disconnected Writer Zookeeper.

 576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session 
establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181, 
sessionid = 0x2558b61a1be0057, negotiate     d timeout = 20000

 577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] 
State change: RECONNECTED

 578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors 
in handling the request

 579 java.lang.OutOfMemoryError: GC overhead limit exceeded

 580         at java.util.Arrays.copyOf(Arrays.java:2219) ~[?:1.7.0_40]

 581         at java.util.ArrayList.grow(ArrayList.java:242) ~[?:1.7.0_40]

 582         at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) 
~[?:1.7.0_40]

 583         at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) 
~[?:1.7.0_40]

 584         at java.util.ArrayList.add(ArrayList.java:440) ~[?:1.7.0_40]

 585         at org.apache.storm.utils.ListDelegate.add(ListDelegate.java:73) 
~[storm-core-1.0.0.jar:1.0.0]

 586         at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
 ~[kryo-3.0.3.jar:?]

 587         at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
 ~[kryo-3.0.3.jar:?]

 588         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:689) 
~[kryo-3.0.3.jar:?]

 589         at 
org.apache.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:37)
 ~[storm-core-1.0.0.jar:1.0.0]

 590         at 
org.apache.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:50)
 ~[storm-core-1.0.0.jar:1.0.0]

 591         at 
org.apache.storm.messaging.DeserializingConnectionCallback.recv(DeserializingConnectionCallback.java:56)
 ~[storm-core-1.0.0.jar:1.0.0]

 592         at 
org.apache.storm.messaging.netty.Server.enqueue(Server.java:133) 
~[storm-core-1.0.0.jar:1.0.0]

 593         at 
org.apache.storm.messaging.netty.Server.received(Server.java:254) 
~[storm-core-1.0.0.jar:1.0.0]

 594         at 
org.apache.storm.messaging.netty.StormServerHandler.messageReceived(StormServerHandler.java:61)
 ~[storm-core-1.0.0.jar:1.0.0]

 595         at 
org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
 ~[storm-core-1.0.0.jar:1.0.0]


 596         at 
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
 ~[storm-core-1.0.0.jar:1.0.0]

 597         at 
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
 ~[storm-core-1.0.0.j     ar:1.0.0]

 598         at 
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
 ~[storm-core-1.0.0.jar:1.0.0]

 599         at 
org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
 ~[storm-core-1.0.0.jar:1.0.0]

 600         at 
org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
 ~[storm-core-1.0.0.jar:1.0.0]

 601

 602         at 
org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
 ~[storm-core-1.0.0.jar:1.0.0]

 603         at 
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
 ~[storm-core-1.0.0.jar:1.0.0]

 604         at 
org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
 ~[storm-core-1.0.0.jar:1.0.0]

 605         at 
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
 ~[storm-core-1.0.0.jar:1.0.0]

 606         at 
org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
 ~[storm-core-1.0.0.jar:1.0.0]

 607         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
 ~[storm-core-1.0.0.jar:1.0.0]

 608         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
 ~[storm-core-1.0.0.jar:1.0.0]

 609         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
 ~[storm-core-1.0.0.jar:1.0.0]

 610         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
 ~[storm-core-1.0.0.jar:1.0.0]

 611         at 
org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
 ~[storm-core-1.0.0.jar:1.0.0]

 612 2016-06-28 19:11:01.557 STDIO [ERROR] Halting due to Out Of Memory 
Error...Netty-server-localhost-6703-worker-1

And codes here:

83   public static class BaseBolt extends BaseRichBolt {

184

185       OutputCollector collector;

186

187       TopologyContext context;

188       String name; // id

189       int executor;

190

191       BufferedWriter writer = null;

192       long filestamp;

193

194       private long count = 0;

195       long begin;

196       long last;

197       float delay_sum = 0;

198

199     public BaseBolt(long stp){

200       filestamp = stp;

201     }

202

203     @Override

204     public void declareOutputFields(OutputFieldsDeclarer declarer) {

205        declarer.declare(new Fields("msgid", "stamp"));

206     } // declareOutputFields

207

208     @Override

209     public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {

210

211         this.collector = collector;

212

213         this.context = context;

214         name = context.getThisComponentId();

215         executor = context.getThisTaskIndex();

216

217         Path file = Paths.get(System.getProperty("user.home"), "ref" + 
filestamp + name + executor);

218         Charset charset = Charset.forName("US-ASCII");

219         String firstline = 
"--------------------------------------------------\nname executor elapsed(ms) 
elapsed_sec(ms) count(tuples) delay(ms) avg_delay rate(tuple/s)\n";

220         try {

221               writer = Files.newBufferedWriter(file, charset, 
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);

223               writer.write(firstline, 0, firstline.length());

224               writer.flush();

225         } catch (IOException x) {

226               System.err.format("prepare() - IOException: %s%n", x);

227         }

228

229         begin = System.currentTimeMillis();

230         last = begin;

231     } // prepare

232

233     @Override

234     public void cleanup() {

235       long end = System.currentTimeMillis();

236       System.out.println("whole_elapased_time: "+(end - begin)/ARY);

237       try{

238         writer.close();

239       } catch (IOException x) {

240             System.err.format("cleanup() - IOException: %s%n", x);

241       }

242     } // cleanup

243

244     public long getMsgId(Tuple tuple) {

245         return tuple.getLongByField("msgid").longValue();

246     }

247

248     public void baseExecution(long msgid, long stamp){

249         collector.emit(new Values(msgid, stamp));

250

251         // count;

252         count++;

253

254         // dealy;

255         long crt = System.currentTimeMillis();

256         float delay = (crt - stamp) ;

257         // avg_delay;

258         delay_sum += delay ;

259         float avg_delay = delay_sum / count;

260

261         long interval = (crt - last);

262         float interval_sec = (crt - last) / ARY;

263         if(interval - INTERVAL*ARY >= 0){

264

265           // elapsed

266           long elapsed = (crt - begin); // ms

267           float elapsed_sec = (crt - begin)/ARY;

268           // rate

269           float rate = count / elapsed_sec; // tuple/s

270

271           // output

272           String s = name + " " + executor + " " + elapsed + " " + 
elapsed_sec + " "  + count + " " + delay + " " + avg_delay + " " + rate + "\n";

273           try {

274                 writer.write(s, 0, s.length());

275                 writer.flush(); // XXX !!!

276           } catch (IOException x) {

277                 System.err.format("IOException: %s%n", x);

278           }

279

280           last = crt;

281         }  // if

282     } // baseExecution

283

284     @Override

285     public void execute(Tuple tuple) {

286

287         long msgid = getMsgId(tuple);

288         long stamp = tuple.getLongByField("stamp").longValue();

289         baseExecution(msgid,stamp);

290

291     } // execute

292

293   } // BaseBolt


Thanks for any help!


yesimsure

Reply via email to