Hi All, I wonder if anyone could take a look into it? this exception is keep occurring to me. any leads please?
2015-02-11 10:18 GMT+02:00 Idan Fridman <[email protected]>: > Hi, > Here is the almost full bolt I am using which causing that exception: > > > public class AsyncBolt extends BaseRichBolt { > > > private AsyncHttpClient asyncHttpClient; > private OutputCollector outputCollector; > > > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("pushMessageResponse")); > } > > > > @Override > public void prepare(Map stormConf, TopologyContext context, > OutputCollector collector) { > asyncHttpClient = new AsyncHttpClient(); > outputCollector = collector; > > } > > > @Override > public void execute(final Tuple tuple) { > final PushMessageRequestDTO pushMessageRequestDTO = > (PushMessageRequestDTO) tuple.getValueByField("pushMessage"); > String url = "some.url"; > asyncHttpClient.preparePost(url).execute(new > AsyncCompletionHandler<Response>() { > @Override > public Response onCompleted(Response response) throws > Exception { > PushMessageResponseDTO pushMessageResponseDTO = new > PushMessageResponseDTO(pushMessageRequestDTO); > emitTuple(pushMessageResponseDTO, tuple); > return response; > } > }); > } > //we are synchronizing basicOutputCollector > // because we have callbacks and we need to make sure all acks are > called from the same thread > private void emitTuple(PushMessageResponseDTO pushMessageResponseDTO, > Tuple tuple) { > synchronized (outputCollector) { > outputCollector.emit(tuple, new > Values(pushMessageResponseDTO)); > outputCollector.ack(tuple); > } > } > } > > > > 2015-02-10 20:27 GMT+02:00 Idan Fridman <[email protected]>: > >> Yes I cutted those from this post for the sake simplicity aswell. I can >> past the whole bolt it just has some intern logic >> On Feb 10, 2015 8:14 PM, "Michael Rose" <[email protected]> wrote: >> >>> Your output fields declarer isn't invoked. Is it empty in your actual >>> implementation as well? >>> >>> Perhaps you can post a gist with your full file? >>> >>> *Michael Rose* >>> Senior Platform Engineer >>> *Full*Contact | fullcontact.com >>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> >>> m: +1.720.837.1357 | t: @xorlev >>> >>> >>> All Your Contacts, Updated and In One Place. >>> Try FullContact for Free >>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> >>> >>> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <[email protected]> >>> wrote: >>> >>>> I actually created pojo of mine and emitted it. I edited here the code >>>> for the sake of simplicity. So you think I should try convert the Object >>>> into byte/string and then emmit? What about the outputcollector >>>> synchronizing ? Does it make sense? >>>> On Feb 10, 2015 8:00 PM, "Michael Rose" <[email protected]> >>>> wrote: >>>> >>>>> Out of curiosity, have you tried just emitting the response as a >>>>> string/byte array vs. the whole response object? >>>>> >>>>> *Michael Rose* >>>>> Senior Platform Engineer >>>>> *Full*Contact | fullcontact.com >>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> >>>>> m: +1.720.837.1357 | t: @xorlev >>>>> >>>>> >>>>> All Your Contacts, Updated and In One Place. >>>>> Try FullContact for Free >>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> >>>>> >>>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <[email protected]> >>>>> wrote: >>>>> >>>>>> One note: it's not BasicOutputCollector. but OutputCollector >>>>>> >>>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <[email protected]>: >>>>>> >>>>>>> I am opening async call to a webservice from a bolt. >>>>>>> >>>>>>> I'am opening socket and retrieving the result asynchronous(using >>>>>>> external AsycHttpClient library) and after that I am emitting to the >>>>>>> next >>>>>>> bolt >>>>>>> >>>>>>> I asked and read that if I synchronized the outputCollector it will >>>>>>> make sure all that all acks and callbacks will be called from the same >>>>>>> Thread. >>>>>>> >>>>>>> However after load-test I started to get this: >>>>>>> >>>>>>> >>>>>>> java.lang.RuntimeException: java.lang.NullPointerException at >>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>>>>> at >>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>>>>>> at >>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>>>>>> at >>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at >>>>>>> clojure.lang.AFn.run(AFn.java:24) at >>>>>>> java.lang.Thread.run(Thread.java:745) >>>>>>> Caused by: java.lang.NullPointerException at >>>>>>> clojure.lang.RT.intCast(RT.java:1087) at >>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) >>>>>>> at >>>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) >>>>>>> at >>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>>>>>> >>>>>>> Thats my bolt: >>>>>>> >>>>>>> public class AsyncBolt extends BaseRichBolt { >>>>>>> >>>>>>> ... >>>>>>> >>>>>>> @Override >>>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>>>>> } >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> public void prepare(Map stormConf, TopologyContext context, >>>>>>> OutputCollector collector) { >>>>>>> asyncHttpClient = new AsyncHttpClient(); >>>>>>> outputCollector = collector; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> public void execute(final Tuple tuple) { >>>>>>> >>>>>>> asyncHttpClient.preparePost(url).execute(new >>>>>>> AsyncCompletionHandler<Response>() { >>>>>>> @Override >>>>>>> public Response onCompleted(Response response) throws >>>>>>> Exception { >>>>>>> ... >>>>>>> emitTuple(response, tuple); >>>>>>> return response; >>>>>>> } >>>>>>> }); >>>>>>> } >>>>>>> //we are synchronizing basicOutputCollector >>>>>>> // because we have callbacks and we need to make sure all acks >>>>>>> are called from the same thread >>>>>>> private void emitTuple(Response response, Tuple tuple) { >>>>>>> synchronized (outputCollector) { >>>>>>> outputCollector.emit(tuple, new Values(response)); >>>>>>> outputCollector.ack(tuple); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> Please any leads about that? >>>>>>> >>>>>>> Thank you. >>>>>>> >>>>>> >>>>> >>> >
