Out of curiosity, have you tried just emitting the response as a
string/byte array vs. the whole response object?

*Michael Rose*
Senior Platform Engineer
*Full*Contact | fullcontact.com
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
m: +1.720.837.1357 | t: @xorlev


All Your Contacts, Updated and In One Place.
Try FullContact for Free
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>

On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <[email protected]> wrote:

> One note: it's not BasicOutputCollector. but OutputCollector
>
> 2015-02-10 18:29 GMT+02:00 Idan Fridman <[email protected]>:
>
>> I am opening async call to a webservice from a bolt.
>>
>> I'am opening socket and retrieving the result asynchronous(using external
>> AsycHttpClient library) and after that I am emitting to the next bolt
>>
>> I asked and read that if I synchronized the outputCollector it will make
>> sure all that all acks and callbacks will be called from the same Thread.
>>
>> However after load-test I started to get this:
>>
>>
>> java.lang.RuntimeException: java.lang.NullPointerException at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> at
>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException at
>> clojure.lang.RT.intCast(RT.java:1087) at
>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>> at
>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>> at
>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>
>> Thats my bolt:
>>
>> public class AsyncBolt extends BaseRichBolt {
>>
>> ...
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>     }
>>
>>
>>     @Override
>>     public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector collector) {
>>         asyncHttpClient = new AsyncHttpClient();
>>         outputCollector = collector;
>>     }
>>
>>
>>     @Override
>>     public void execute(final Tuple tuple) {
>>
>>         asyncHttpClient.preparePost(url).execute(new
>> AsyncCompletionHandler<Response>() {
>>             @Override
>>             public Response onCompleted(Response response) throws
>> Exception {
>>                ...
>>                 emitTuple(response, tuple);
>>                 return response;
>>             }
>>         });
>>     }
>>     //we are synchronizing basicOutputCollector
>>     // because we have callbacks and we need to make sure all acks are
>> called from the same thread
>>     private void emitTuple(Response response, Tuple tuple) {
>>         synchronized (outputCollector) {
>>             outputCollector.emit(tuple, new Values(response));
>>             outputCollector.ack(tuple);
>>         }
>>     }
>> }
>>
>> Please any leads about that?
>>
>> Thank you.
>>
>

Reply via email to