I attached storm-ui screenshot which I took in peak-time. I replaced the
bolts names with numbers in order to be authentic

I can see some failed bolts within the spout. Any idea?

[image: תמונה מוטבעת 1]

2015-03-10 15:49 GMT+02:00 Nathan Leung <[email protected]>:

> You don't need to ack when extending BaseBasicBolt
>
> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <[email protected]>
> wrote:
>
>> try to replace: basicOutputCollector.emit(new
>> Values(pushMessageResponseDTO));
>> with
>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>             collector.ack(tuple);
>>
>>
>>
>> On 10 March 2015 at 13:43, Haralds Ulmanis <[email protected]> wrote:
>>
>>> And where do you ACK/FAIL tuples ?
>>>
>>> On 10 March 2015 at 13:39, Idan Fridman <[email protected]> wrote:
>>>
>>>> By the metrics I can see some errors yes.
>>>>
>>>> but if I use try and catch why would they timout in a loop? once they
>>>> timeout i am catching them logging them and thats it
>>>>
>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>:
>>>>
>>>>> Do you have a large number of errored tuples in this topology? You
>>>>> might run into a situation where tuples timeout in a loop
>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:
>>>>>
>>>>>> My Topology including a bolt which opening Http Request to
>>>>>> webservice.
>>>>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>>>>> longer)
>>>>>>
>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>
>>>>>> When I send messages one by one everything working fine but
>>>>>>
>>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>>> into there anymore.* and the worst thing I am having a "reply" of
>>>>>> the messages
>>>>>>
>>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>>
>>>>>>
>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>
>>>>>> package com.mycompany.push.topology;
>>>>>>
>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Tuple;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>> import org.apache.http.NameValuePair;
>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>> import org.slf4j.Logger;
>>>>>> import org.slf4j.LoggerFactory;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>> import java.net.SocketTimeoutException;
>>>>>> import java.util.ArrayList;
>>>>>> import java.util.List;
>>>>>> import java.util.Map;
>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>
>>>>>>
>>>>>>
>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>
>>>>>>     private CloseableHttpClient httpClient;
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>         externalServiceGraphUrl = (String) 
>>>>>> stormConf.get("externalServiceGraphUrl");
>>>>>>         initMetrics(context);
>>>>>>         httpClient = getHttpClientInstance();
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void execute(Tuple tuple, BasicOutputCollector 
>>>>>> basicOutputCollector) {
>>>>>>         try {
>>>>>>             received_message_counter.incr();
>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = 
>>>>>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = 
>>>>>> executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>                 System.out.println("externalServiceOutputBolt,emit 
>>>>>> tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + 
>>>>>> pushMessageRequestDTO.getRefId());
>>>>>>                 basicOutputCollector.emit(new 
>>>>>> Values(pushMessageResponseDTO));
>>>>>>             }
>>>>>>         } catch (Exception e) {
>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     private PushMessageResponseDTO 
>>>>>> executePushNotificationRequest(PushMessageRequestDTO 
>>>>>> pushMessageRequestDTO) throws IOException {
>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new 
>>>>>> PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>         CloseableHttpResponse response = null;
>>>>>>         try {
>>>>>>
>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>             response = httpClient.execute(post);
>>>>>>             response.getEntity();
>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>                 received_not_status_200_counter.incr();
>>>>>>             } else {
>>>>>>                 received_status_200_counter.incr();
>>>>>>             }
>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  
>>>>>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", 
>>>>>> responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>             return pushMessageResponseDTO;
>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>             received_time_out_counter.incr();
>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>
>>>>>>         } catch (Throwable t) {
>>>>>>             received_fail_status_counter.incr();
>>>>>>             
>>>>>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>             if (t.getMessage() != null) {
>>>>>>                 log.error("externalServiceOutputBolt, error executing 
>>>>>> externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>             }
>>>>>>         } finally {
>>>>>>             if (response != null) {
>>>>>>                 response.close();
>>>>>>             }
>>>>>>         }
>>>>>>         return pushMessageResponseDTO;
>>>>>>     }
>>>>>>
>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>         PoolingHttpClientConnectionManager cm = new 
>>>>>> PoolingHttpClientConnectionManager();
>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>         cm.setMaxTotal(500);
>>>>>>         int timeout = 4;
>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>         return HttpClientBuilder.create().
>>>>>>                 setDefaultRequestConfig(config).
>>>>>>                 setConnectionManager(cm).
>>>>>>                 build();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to