@Michael,@Natahan,@Itay, anyone else
I have modified the code by your suggestions so if anyone else also deal
with this case could know.
Please write me your comments on it if any:
public class MyClass extends BaseRichBolt {
{
..
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
asyncHttpClient = new AsyncHttpClient();
outputCollector = collector;
}
@Override
public void execute(final Tuple tuple) {
final PushMessage pushMessage = (PushMessage)
tuple.getValueByField("pushMessage");
String template = pushMessage.getMessageBody();
String url = "https://service_url/";
*synchronized **(outputCollector)* {
asyncHttpClient.preparePost(url).execute(new
AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response response) throws Exception {
outputCollector.emit(tuple, new Values(pushMessage));
outputCollector.ack(tuple);
return response;
}
@Override
public void onThrowable(Throwable t) {
t.printStackTrace();
outputCollector.emit(tuple, new Values(pushMessage));
outputCollector.ack(tuple);
}
});
}
}
Please check the synchronized execution and let me know if I should pay
attention to anything else
thank you.
2014-12-18 11:56 GMT+02:00 Idan Fridman <[email protected]>:
>
> @Michael,
> Why do you think that synchronize will avoid NPE's? (because Storm wont
> release the OutputCollector instance until all acks will be received ?)
>
> 2014-12-17 19:34 GMT+02:00 Michael Rose <[email protected]>:
>>
>> 1) Could you give me an example of any side effects that might occur
>> while using multiple threads in case I won’t synchronise OutputCollector?
>>
>> You'll get NPEs.
>>
>> 2) Timeouts
>>
>> AsyncHttpClient has timeouts built into the client config.
>> https://github.com/AsyncHttpClient/async-http-client/blob/036bc733ba8527d4700df048ba304f01241488ca/api/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java#L626-L636
>>
>>
>>
>>
>>
> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> [email protected]
>
> On Wed, Dec 17, 2014 at 10:14 AM, Nathan Leung <[email protected]> wrote:
>>
>> 1. You will get an exception. I forgot the exact type, but a call to
>> OutputCollector will typically be on the stack trace.
>>
>> 2. I'm not familiar enough with AsyncHttpClient to say. I know with
>> Apache HttpClient you can set a timeout which puts an upper bounds on the
>> amount of time an HTTP request will take. Maybe you can do something
>> similar in AsyncHttpClient.
>>
>> On Wed, Dec 17, 2014 at 12:10 PM, Idan Fridman <[email protected]>
>> wrote:
>>>
>>> Hi,
>>> 1. OutputCollector access must always be synchronized if you are using
>>> it from multiple threads
>>> Could you give me an example of any side effects that might occur
>>> while using multiple threads in case I won’t synchronise OutputCollector?
>>>
>>> 2. I guess I didn’t formulate my question right.
>>> I do need the emit execution after I get any response (success or fail)
>>> but what with “never-got-response” case? the next bolt will never emitted
>>> and I will never be able to tell what happened to that request? of course I
>>> can manage state before execute and after execute but than it becomes ugly
>>> isn’t it?
>>>
>>> thanks.
>>>
>>>
>>> I
>>>
>>> On Dec 17, 2014, at 7:02 PM, Nathan Leung <[email protected]> wrote:
>>>
>>> 1. Yes OutputCollector access must always be synchronized if you are
>>> using it from multiple threads
>>>
>>> 2. I guess it depends. If you need the web server response before the
>>> tuple gets sent to the next bolt, then you have to put something in your
>>> error handling to emit it. If you don't need the response before sending
>>> to the next bolt, then instead of putting emit in onCompleted you can put
>>> it in execute after you setup the HTTP request.
>>>
>>> On Wed, Dec 17, 2014 at 11:57 AM, Idan Fridman <[email protected]>
>>> wrote:
>>>>
>>>> Hi Nathan,
>>>> First of all I want to thank for your responding. it's really helpful
>>>> and increasing my knowledge in that technology.
>>>>
>>>> If you prefer I can replay back to you via the group list just I didn't
>>>> want to annoy the user list.
>>>>
>>>> hope it's ok if I could keep asking few more questions about my
>>>> scenario:
>>>>
>>>>
>>>> By your answers I understand that I should add the emit line within the
>>>> response clauses:
>>>>
>>>> check the following code: (I tried to make it short)
>>>>
>>>> @Override
>>>> public Response onCompleted(Response response) throws
>>>> basicOutputCollector.emit(new Values(pushMessage));
>>>>
>>>> return response;
>>>> }
>>>>
>>>> @Override
>>>> public void onThrowable(Throwable t) {
>>>> basicOutputCollector.emit(new Values(pushMessage));
>>>> }
>>>> });
>>>>
>>>> 1. If I use the BaseBasicBolt I shouldnt care about acking but only
>>>> care about synchronising the basicOutputCollector
>>>>
>>>> is that Right?
>>>>
>>>> 2. How would I take care a scenario where my webservice never respond
>>>> to a request? how could I "timeout" and emit to my next bolt? I mean if I
>>>> never got a response the emit line will never be executed
>>>>
>>>> Thanks again,
>>>> Idan
>>>>
>>>>
>>>> 2014-12-17 18:02 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>
>>>>> 1. My examples were assuming BaseRichBolt, not BaseBasicBolt
>>>>>
>>>>> 2. If you are using BaseBasicBolt, then you wouldn't ack manually
>>>>>
>>>>> 3. If you use BaseBasicBolt then your tuple will be acked immediately
>>>>> after execute and you won't have to worry about timeouts due to HTTP
>>>>> response time.
>>>>>
>>>>
>>>> On Wed, Dec 17, 2014 at 2:36 AM, Idan Fridman <[email protected]>
>>>> wrote:
>>>>>
>>>>> @Nathan:
>>>>>
>>>>> Right now the demand is to emit the tuple onFail and onComplete
>>>>> without retrying (as I wrote we emitting into a Persister Bolt)
>>>>>
>>>>> 1.If I am using BaseBasicBolt I still can ack manually?
>>>>> 2. why should I ack manually in the final response clauses while using
>>>>> BaseBasicBolt?
>>>>>
>>>>>
>>>>> * "..This means that if you don't get a response your tuple will get
>>>>> timed out.."
>>>>>
>>>>>
>>>>> 3. I didnt get the part how I can set timeout for specific bolt(so in
>>>>> my case if I never get response ill be able to handle this scenario
>>>>> without
>>>>> necessary retrying, maybe just logging)
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>
>>>>>> it's required that you synchronize on the output collector. If you
>>>>>> need wider throughput through this bolt and are limited by synchronizing
>>>>>> on
>>>>>> the output collector, you will need to add more tasks.
>>>>>>
>>>>>> I would emit / ack on the completion of the HTTP request. This means
>>>>>> that if you don't get a response your tuple will get timed out (and if
>>>>>> you
>>>>>> have reliable message handling the fail method will be called on the
>>>>>> spout). If you catch the error condition yourself you can always
>>>>>> explicitly fail the tuple using the output collector, or retry manually.
>>>>>>
>>>>>> I would reiterate that how you handle the failure states depends on
>>>>>> the requirements of your system. I'm assuming that you want to make use
>>>>>> of
>>>>>> storm's at least once semantics, but as mentioned if your requirements
>>>>>> don't demand these semantics, you can simplify your logic (and possibly
>>>>>> even use BaseBasicBolt as you were before).
>>>>>>
>>>>>
>>>>> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]>
>>>>> wrote:
>>>>>>
>>>>>> 1. I afraid that the synchronize on basicOutputCollector will harm
>>>>>> performances what do you think?
>>>>>>
>>>>>> 2.What do you think about emitting at onComplete of the async
>>>>>> response to the next bolt as shown below? That means the topology will
>>>>>> continue to the next bolt only after i get response from the external
>>>>>> service(what happen if I never gotten response.. need to solve this)
>>>>>> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote:
>>>>>>
>>>>>>> You would care if you wanted to replay the tuple tree in case of
>>>>>>> failure. Consider what happens if your bolt thread fails, or if the
>>>>>>> worker
>>>>>>> process fails. Will you recover properly? Is it a requirement to
>>>>>>> recover
>>>>>>> properly in this scenario? (it might not be, sometimes it's ok to lose
>>>>>>> data).
>>>>>>>
>>>>>>> Regarding synchronization and acking, I would ack after emit, and
>>>>>>> synchronize access to emit and ack only, not the entire execute method.
>>>>>>> Probably something like synchronized(basicOutputCollector) { ... }.
>>>>>>>
>>>>>>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> " It is better to manually control the acking so that it is not
>>>>>>>> done until the tuple is fully processed"
>>>>>>>> Why would I care about that? I get the response onCompleted or
>>>>>>>> onThrowable if anything happens ill take care for it there.
>>>>>>>>
>>>>>>>> btw: Apparently I do have another bolt after this call. it's a
>>>>>>>> 'Persistor' Bolt which responsible to persist the answer into a
>>>>>>>> datasource.
>>>>>>>> guess that make things even more complicated:
>>>>>>>>
>>>>>>>> Are you talking about something like this?
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public synchronized void execute(Tuple tuple, BasicOutputCollector
>>>>>>>> basicOutputCollector) {
>>>>>>>>
>>>>>>>>
>>>>>>>> PushMessage pushMessage = (PushMessage)
>>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>> final String messageId = pushMessage.getMessageId();
>>>>>>>> asyncHttpClient.preparePost("some_url").execute(new
>>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>> @Override
>>>>>>>> public Response onCompleted(Response response) throws
>>>>>>>> Exception {
>>>>>>>> String innerMessageId = messageId;
>>>>>>>> System.out.printf("\n messageId=" + innerMessageId +
>>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>>
>>>>>>>> basicOutputCollector.emit(new Values(pushMessage));
>>>>>>>>
>>>>>>>> return response;
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void onThrowable(Throwable t) {
>>>>>>>> t.printStackTrace();
>>>>>>>> }
>>>>>>>> });
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> check the synchronized and the basicOutputCollector.emit(new
>>>>>>>> Values(pushMessage)); additions
>>>>>>>>
>>>>>>>> Where would you add the acking(if needed) ?
>>>>>>>> thank you
>>>>>>>>
>>>>>>>>
>>>>>>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>>
>>>>>>>>> Sorry, I missed that. I would recommend you use IRichBolt (or
>>>>>>>>> BaseRichBolt). In BaseBasicBolt, you will ack once execute is
>>>>>>>>> finished,
>>>>>>>>> but this is no guarantee that your HTTP Request has actually
>>>>>>>>> completed. It
>>>>>>>>> is better to manually control the acking so that it is not done until
>>>>>>>>> the
>>>>>>>>> tuple is fully processed. This will allow you to catch scenarios
>>>>>>>>> where the
>>>>>>>>> request fails and replay accordingly, and will also allow for proper
>>>>>>>>> "back
>>>>>>>>> pressure" by preventing too many simultaneous HTTP requests. To
>>>>>>>>> elaborate,
>>>>>>>>> in BaseBasicBolt, since you ack the tuple immediately, you will
>>>>>>>>> indicate to
>>>>>>>>> the spout that you are done processing this tree (since your bolt is
>>>>>>>>> at the
>>>>>>>>> end of the topology), and the spout will be able to emit additional
>>>>>>>>> tuples. However, you may still be processing tuples that you have
>>>>>>>>> acked,
>>>>>>>>> since you are doing HTTP requests asynchronously. This may cause you
>>>>>>>>> to
>>>>>>>>> have more requests in flight that you anticipated.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hi Nathan,
>>>>>>>>>
>>>>>>>>> excuse if I miss something here. I dont understand why I need to
>>>>>>>>> ack at all? I am extending from BaseBasicBolt. and we know that it's
>>>>>>>>> automatically provides anchoring
>>>>>>>>> and acking for us.
>>>>>>>>>
>>>>>>>>> So you saying although I am using BaseBasicBolt I should take care
>>>>>>>>> of the Acking(and synchronizing it) because I am using async responses
>>>>>>>>> logic?
>>>>>>>>>
>>>>>>>>> thanks,
>>>>>>>>> Idan.
>>>>>>>>>
>>>>>>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>>>
>>>>>>>>>> OK. :) If you use a separate thread, just make sure to wrap all
>>>>>>>>>> accesses to the OutputCollector object with synchronized. From what
>>>>>>>>>> I see
>>>>>>>>>> it doesn't look like you use OutputCollector, but maybe, for
>>>>>>>>>> example, it's
>>>>>>>>>> best to ack the messages in your response handler. If many response
>>>>>>>>>> handlers can be running simultaneously, then you would need to
>>>>>>>>>> synchronize
>>>>>>>>>> the calls to ack() as they are made on the OutputCollector.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Oh I know what is all synchronized about. Just I wasn't sure if
>>>>>>>>>> the synchronization in my storm bolt is concerned only to the
>>>>>>>>>> execute method.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>>>>
>>>>>>>>>>> you should look up the synchronized keyword in java:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>>>>>>>>>>>
>>>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for your response Itay. I understood you.
>>>>>>>>>>> How would you modify the above code to make sure I am
>>>>>>>>>>> synchronizing/make
>>>>>>>>>>> calls from the same Thread ?
>>>>>>>>>>>
>>>>>>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>:
>>>>>>>>>>>>
>>>>>>>>>>>> Idan,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Consider you have 1000 concurrent tuples ... and the spout does
>>>>>>>>>>>> not throttle traffic. It means that the last bolt would be
>>>>>>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000
>>>>>>>>>>>> concurrent
>>>>>>>>>>>> tuples.... Eventually the operating system or the NIO buffer would
>>>>>>>>>>>> exhaust
>>>>>>>>>>>> its resources. You would have been better off with throtteling.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The output collector is the object that you perform "ack" or
>>>>>>>>>>>> "fail" the tuple. You probably call them from a future callback.
>>>>>>>>>>>> Make sure
>>>>>>>>>>>> that all of these callbacks are called from the same thread, or are
>>>>>>>>>>>> synchronized.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Itai
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ------------------------------
>>>>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM
>>>>>>>>>>>> *To:* [email protected]
>>>>>>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt
>>>>>>>>>>>> if it is out of resources. So you should consider using
>>>>>>>>>>>> max-spout-pending
>>>>>>>>>>>> for spout level throttling.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> @Itai,
>>>>>>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont
>>>>>>>>>>>> have this problem??
>>>>>>>>>>>>
>>>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector
>>>>>>>>>>>> when your NIO response workers handle the returned requests as
>>>>>>>>>>>> OutputCollector is not thread safe.
>>>>>>>>>>>>
>>>>>>>>>>>> @Michael,
>>>>>>>>>>>> I am not sure how the OutputCollector is concerned to my issue?
>>>>>>>>>>>>
>>>>>>>>>>>> This is my execution code.. that code could caouse me any
>>>>>>>>>>>> side-effects in my topology?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public void execute(Tuple tuple, BasicOutputCollector
>>>>>>>>>>>> basicOutputCollector) {
>>>>>>>>>>>>
>>>>>>>>>>>> PushMessage pushMessage = (PushMessage)
>>>>>>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>>>>>> final String messageId = pushMessage.getMessageId();
>>>>>>>>>>>> asyncHttpClient.preparePost("some_url").execute(new
>>>>>>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public Response onCompleted(Response response) throws
>>>>>>>>>>>> Exception {
>>>>>>>>>>>> String innerMessageId = messageId;
>>>>>>>>>>>> System.out.printf("\n messageId=" + innerMessageId +
>>>>>>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>>>>>> return response;
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public void onThrowable(Throwable t) {
>>>>>>>>>>>> t.printStackTrace();
>>>>>>>>>>>> }
>>>>>>>>>>>> });
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <
>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector
>>>>>>>>>>>>> when your NIO response workers handle the returned requests as
>>>>>>>>>>>>> OutputCollector is not thread safe.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>>>>>>>>> Senior Platform Engineer, FullContact
>>>>>>>>>>>> <http://www.fullcontact.com/>
>>>>>>>>>>>> [email protected]
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt
>>>>>>>>>>>>> if it is out of resources. So you should consider using
>>>>>>>>>>>>> max-spout-pending
>>>>>>>>>>>>> for spout level throttling.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Itai
>>>>>>>>>>>>> ------------------------------
>>>>>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM
>>>>>>>>>>>>> *To:* [email protected]
>>>>>>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>> My bolt need to dispatch async request to remote service.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using AsyncHttpReuest library(
>>>>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which
>>>>>>>>>>>>> based on NIO channels to get the response asynchronously while not
>>>>>>>>>>>>> allocating Thread for each request.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was wondering if any side-effects could cause this
>>>>>>>>>>>>> implementation within Storm Bolt ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> thank you.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>