Your output fields declarer isn't invoked. Is it empty in your actual
implementation as well?

Perhaps you can post a gist with your full file?

*Michael Rose*
Senior Platform Engineer
*Full*Contact | fullcontact.com
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
m: +1.720.837.1357 | t: @xorlev


All Your Contacts, Updated and In One Place.
Try FullContact for Free
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>

On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <[email protected]> wrote:

> I actually created pojo of mine and emitted it. I edited here the code for
> the sake of simplicity. So you think I should try convert the Object into
> byte/string and then emmit? What about the outputcollector synchronizing ?
> Does it make sense?
> On Feb 10, 2015 8:00 PM, "Michael Rose" <[email protected]> wrote:
>
>> Out of curiosity, have you tried just emitting the response as a
>> string/byte array vs. the whole response object?
>>
>> *Michael Rose*
>> Senior Platform Engineer
>> *Full*Contact | fullcontact.com
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>> m: +1.720.837.1357 | t: @xorlev
>>
>>
>> All Your Contacts, Updated and In One Place.
>> Try FullContact for Free
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>
>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <[email protected]>
>> wrote:
>>
>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>
>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <[email protected]>:
>>>
>>>> I am opening async call to a webservice from a bolt.
>>>>
>>>> I'am opening socket and retrieving the result asynchronous(using
>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>> bolt
>>>>
>>>> I asked and read that if I synchronized the outputCollector it will
>>>> make sure all that all acks and callbacks will be called from the same
>>>> Thread.
>>>>
>>>> However after load-test I started to get this:
>>>>
>>>>
>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>> at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>> at
>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>> at
>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.NullPointerException at
>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>> at
>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>> at
>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>
>>>> Thats my bolt:
>>>>
>>>> public class AsyncBolt extends BaseRichBolt {
>>>>
>>>> ...
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>> OutputCollector collector) {
>>>>         asyncHttpClient = new AsyncHttpClient();
>>>>         outputCollector = collector;
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public void execute(final Tuple tuple) {
>>>>
>>>>         asyncHttpClient.preparePost(url).execute(new
>>>> AsyncCompletionHandler<Response>() {
>>>>             @Override
>>>>             public Response onCompleted(Response response) throws
>>>> Exception {
>>>>                ...
>>>>                 emitTuple(response, tuple);
>>>>                 return response;
>>>>             }
>>>>         });
>>>>     }
>>>>     //we are synchronizing basicOutputCollector
>>>>     // because we have callbacks and we need to make sure all acks are
>>>> called from the same thread
>>>>     private void emitTuple(Response response, Tuple tuple) {
>>>>         synchronized (outputCollector) {
>>>>             outputCollector.emit(tuple, new Values(response));
>>>>             outputCollector.ack(tuple);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> Please any leads about that?
>>>>
>>>> Thank you.
>>>>
>>>
>>

Reply via email to