Yes I cutted those from this post for the sake simplicity aswell. I can past the whole bolt it just has some intern logic On Feb 10, 2015 8:14 PM, "Michael Rose" <[email protected]> wrote:
> Your output fields declarer isn't invoked. Is it empty in your actual > implementation as well? > > Perhaps you can post a gist with your full file? > > *Michael Rose* > Senior Platform Engineer > *Full*Contact | fullcontact.com > <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> > m: +1.720.837.1357 | t: @xorlev > > > All Your Contacts, Updated and In One Place. > Try FullContact for Free > <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> > > On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <[email protected]> > wrote: > >> I actually created pojo of mine and emitted it. I edited here the code >> for the sake of simplicity. So you think I should try convert the Object >> into byte/string and then emmit? What about the outputcollector >> synchronizing ? Does it make sense? >> On Feb 10, 2015 8:00 PM, "Michael Rose" <[email protected]> wrote: >> >>> Out of curiosity, have you tried just emitting the response as a >>> string/byte array vs. the whole response object? >>> >>> *Michael Rose* >>> Senior Platform Engineer >>> *Full*Contact | fullcontact.com >>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> >>> m: +1.720.837.1357 | t: @xorlev >>> >>> >>> All Your Contacts, Updated and In One Place. >>> Try FullContact for Free >>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> >>> >>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <[email protected]> >>> wrote: >>> >>>> One note: it's not BasicOutputCollector. but OutputCollector >>>> >>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <[email protected]>: >>>> >>>>> I am opening async call to a webservice from a bolt. >>>>> >>>>> I'am opening socket and retrieving the result asynchronous(using >>>>> external AsycHttpClient library) and after that I am emitting to the next >>>>> bolt >>>>> >>>>> I asked and read that if I synchronized the outputCollector it will >>>>> make sure all that all acks and callbacks will be called from the same >>>>> Thread. >>>>> >>>>> However after load-test I started to get this: >>>>> >>>>> >>>>> java.lang.RuntimeException: java.lang.NullPointerException at >>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>>> at >>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>>>> at >>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>>>> at >>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at >>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.NullPointerException at >>>>> clojure.lang.RT.intCast(RT.java:1087) at >>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) >>>>> at >>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) >>>>> at >>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>>>> >>>>> Thats my bolt: >>>>> >>>>> public class AsyncBolt extends BaseRichBolt { >>>>> >>>>> ... >>>>> >>>>> @Override >>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>>> } >>>>> >>>>> >>>>> @Override >>>>> public void prepare(Map stormConf, TopologyContext context, >>>>> OutputCollector collector) { >>>>> asyncHttpClient = new AsyncHttpClient(); >>>>> outputCollector = collector; >>>>> } >>>>> >>>>> >>>>> @Override >>>>> public void execute(final Tuple tuple) { >>>>> >>>>> asyncHttpClient.preparePost(url).execute(new >>>>> AsyncCompletionHandler<Response>() { >>>>> @Override >>>>> public Response onCompleted(Response response) throws >>>>> Exception { >>>>> ... >>>>> emitTuple(response, tuple); >>>>> return response; >>>>> } >>>>> }); >>>>> } >>>>> //we are synchronizing basicOutputCollector >>>>> // because we have callbacks and we need to make sure all acks are >>>>> called from the same thread >>>>> private void emitTuple(Response response, Tuple tuple) { >>>>> synchronized (outputCollector) { >>>>> outputCollector.emit(tuple, new Values(response)); >>>>> outputCollector.ack(tuple); >>>>> } >>>>> } >>>>> } >>>>> >>>>> Please any leads about that? >>>>> >>>>> Thank you. >>>>> >>>> >>> >
