Out of curiosity, have you tried just emitting the response as a string/byte array vs. the whole response object?
*Michael Rose* Senior Platform Engineer *Full*Contact | fullcontact.com <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> m: +1.720.837.1357 | t: @xorlev All Your Contacts, Updated and In One Place. Try FullContact for Free <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <[email protected]> wrote: > One note: it's not BasicOutputCollector. but OutputCollector > > 2015-02-10 18:29 GMT+02:00 Idan Fridman <[email protected]>: > >> I am opening async call to a webservice from a bolt. >> >> I'am opening socket and retrieving the result asynchronous(using external >> AsycHttpClient library) and after that I am emitting to the next bolt >> >> I asked and read that if I synchronized the outputCollector it will make >> sure all that all acks and callbacks will be called from the same Thread. >> >> However after load-test I started to get this: >> >> >> java.lang.RuntimeException: java.lang.NullPointerException at >> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >> at >> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >> at >> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >> at >> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at >> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.NullPointerException at >> clojure.lang.RT.intCast(RT.java:1087) at >> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) >> at >> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) >> at >> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >> >> Thats my bolt: >> >> public class AsyncBolt extends BaseRichBolt { >> >> ... >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> } >> >> >> @Override >> public void prepare(Map stormConf, TopologyContext context, >> OutputCollector collector) { >> asyncHttpClient = new AsyncHttpClient(); >> outputCollector = collector; >> } >> >> >> @Override >> public void execute(final Tuple tuple) { >> >> asyncHttpClient.preparePost(url).execute(new >> AsyncCompletionHandler<Response>() { >> @Override >> public Response onCompleted(Response response) throws >> Exception { >> ... >> emitTuple(response, tuple); >> return response; >> } >> }); >> } >> //we are synchronizing basicOutputCollector >> // because we have callbacks and we need to make sure all acks are >> called from the same thread >> private void emitTuple(Response response, Tuple tuple) { >> synchronized (outputCollector) { >> outputCollector.emit(tuple, new Values(response)); >> outputCollector.ack(tuple); >> } >> } >> } >> >> Please any leads about that? >> >> Thank you. >> >
