One note: it's not BasicOutputCollector. but OutputCollector
2015-02-10 18:29 GMT+02:00 Idan Fridman <[email protected]>:
> I am opening async call to a webservice from a bolt.
>
> I'am opening socket and retrieving the result asynchronous(using external
> AsycHttpClient library) and after that I am emitting to the next bolt
>
> I asked and read that if I synchronized the outputCollector it will make
> sure all that all acks and callbacks will be called from the same Thread.
>
> However after load-test I started to get this:
>
>
> java.lang.RuntimeException: java.lang.NullPointerException at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException at
> clojure.lang.RT.intCast(RT.java:1087) at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
> at
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
> at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>
> Thats my bolt:
>
> public class AsyncBolt extends BaseRichBolt {
>
> ...
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> }
>
>
> @Override
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> asyncHttpClient = new AsyncHttpClient();
> outputCollector = collector;
> }
>
>
> @Override
> public void execute(final Tuple tuple) {
>
> asyncHttpClient.preparePost(url).execute(new
> AsyncCompletionHandler<Response>() {
> @Override
> public Response onCompleted(Response response) throws
> Exception {
> ...
> emitTuple(response, tuple);
> return response;
> }
> });
> }
> //we are synchronizing basicOutputCollector
> // because we have callbacks and we need to make sure all acks are
> called from the same thread
> private void emitTuple(Response response, Tuple tuple) {
> synchronized (outputCollector) {
> outputCollector.emit(tuple, new Values(response));
> outputCollector.ack(tuple);
> }
> }
> }
>
> Please any leads about that?
>
> Thank you.
>