I actually created pojo of mine and emitted it. I edited here the code for the sake of simplicity. So you think I should try convert the Object into byte/string and then emmit? What about the outputcollector synchronizing ? Does it make sense? On Feb 10, 2015 8:00 PM, "Michael Rose" <[email protected]> wrote:
> Out of curiosity, have you tried just emitting the response as a > string/byte array vs. the whole response object? > > *Michael Rose* > Senior Platform Engineer > *Full*Contact | fullcontact.com > <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> > m: +1.720.837.1357 | t: @xorlev > > > All Your Contacts, Updated and In One Place. > Try FullContact for Free > <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures> > > On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <[email protected]> wrote: > >> One note: it's not BasicOutputCollector. but OutputCollector >> >> 2015-02-10 18:29 GMT+02:00 Idan Fridman <[email protected]>: >> >>> I am opening async call to a webservice from a bolt. >>> >>> I'am opening socket and retrieving the result asynchronous(using >>> external AsycHttpClient library) and after that I am emitting to the next >>> bolt >>> >>> I asked and read that if I synchronized the outputCollector it will make >>> sure all that all acks and callbacks will be called from the same Thread. >>> >>> However after load-test I started to get this: >>> >>> >>> java.lang.RuntimeException: java.lang.NullPointerException at >>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>> at >>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>> at >>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at >>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.NullPointerException at >>> clojure.lang.RT.intCast(RT.java:1087) at >>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) >>> at >>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) >>> at >>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>> >>> Thats my bolt: >>> >>> public class AsyncBolt extends BaseRichBolt { >>> >>> ... >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> } >>> >>> >>> @Override >>> public void prepare(Map stormConf, TopologyContext context, >>> OutputCollector collector) { >>> asyncHttpClient = new AsyncHttpClient(); >>> outputCollector = collector; >>> } >>> >>> >>> @Override >>> public void execute(final Tuple tuple) { >>> >>> asyncHttpClient.preparePost(url).execute(new >>> AsyncCompletionHandler<Response>() { >>> @Override >>> public Response onCompleted(Response response) throws >>> Exception { >>> ... >>> emitTuple(response, tuple); >>> return response; >>> } >>> }); >>> } >>> //we are synchronizing basicOutputCollector >>> // because we have callbacks and we need to make sure all acks are >>> called from the same thread >>> private void emitTuple(Response response, Tuple tuple) { >>> synchronized (outputCollector) { >>> outputCollector.emit(tuple, new Values(response)); >>> outputCollector.ack(tuple); >>> } >>> } >>> } >>> >>> Please any leads about that? >>> >>> Thank you. >>> >> >
