And where do you ACK/FAIL tuples ?

On 10 March 2015 at 13:39, Idan Fridman <[email protected]> wrote:

> By the metrics I can see some errors yes.
>
> but if I use try and catch why would they timout in a loop? once they
> timeout i am catching them logging them and thats it
>
> 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>:
>
>> Do you have a large number of errored tuples in this topology? You might
>> run into a situation where tuples timeout in a loop
>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:
>>
>>> My Topology including a bolt which opening Http Request to webservice.
>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>> longer)
>>>
>>> * I added timeout functionality. and I am using KafkaSpout
>>>
>>> When I send messages one by one everything working fine but
>>>
>>> Under High throughput *that bolt is getting stuck and nothing get into
>>> there anymore.* and the worst thing I am having a "reply" of the
>>> messages
>>>
>>> The only way to get thru this is to reset kafka's offset. else the
>>> zookeeper still logging kafka's offset and messages are still replying
>>>
>>>
>>> 1. *Why Messages being replied? I dont need that*
>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>
>>> package com.mycompany.push.topology;
>>>
>>> import backtype.storm.metric.api.CountMetric;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>> import org.apache.http.NameValuePair;
>>> import org.apache.http.client.config.RequestConfig;
>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>> import org.apache.http.client.methods.HttpPost;
>>> import org.apache.http.impl.client.CloseableHttpClient;
>>> import org.apache.http.impl.client.HttpClientBuilder;
>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>> import org.apache.http.message.BasicNameValuePair;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.io.IOException;
>>> import java.net.SocketTimeoutException;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> import java.util.Map;
>>> import java.util.concurrent.LinkedBlockingQueue;
>>>
>>>
>>>
>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>
>>>     private CloseableHttpClient httpClient;
>>>
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>     }
>>>
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>         externalServiceGraphUrl = (String) 
>>> stormConf.get("externalServiceGraphUrl");
>>>         initMetrics(context);
>>>         httpClient = getHttpClientInstance();
>>>     }
>>>
>>>
>>>     @Override
>>>     public void execute(Tuple tuple, BasicOutputCollector 
>>> basicOutputCollector) {
>>>         try {
>>>             received_message_counter.incr();
>>>             final PushMessageRequestDTO pushMessageRequestDTO = 
>>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>             if (pushMessageRequestDTO != null) {
>>>                 PushMessageResponseDTO pushMessageResponseDTO = 
>>> executePushNotificationRequest(pushMessageRequestDTO);
>>>                 returned_from_externalService_counter.incr();
>>>                 System.out.println("externalServiceOutputBolt,emit tupple 
>>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + 
>>> pushMessageRequestDTO.getRefId());
>>>                 basicOutputCollector.emit(new 
>>> Values(pushMessageResponseDTO));
>>>             }
>>>         } catch (Exception e) {
>>>             log.error("externalServiceOutputBolt. Error", e);
>>>         }
>>>     }
>>>
>>>     private PushMessageResponseDTO 
>>> executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) 
>>> throws IOException {
>>>         PushMessageResponseDTO pushMessageResponseDTO = new 
>>> PushMessageResponseDTO(pushMessageRequestDTO);
>>>         CloseableHttpResponse response = null;
>>>         try {
>>>
>>>             HttpPost post = new HttpPost("external.url");
>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>             response = httpClient.execute(post);
>>>             response.getEntity();
>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>                 received_not_status_200_counter.incr();
>>>             } else {
>>>                 received_status_200_counter.incr();
>>>             }
>>>             log.debug("externalServiceOutputBolt.onCompleted,  
>>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", 
>>> responseBody=" + response.getStatusLine().getReasonPhrase());
>>>             return pushMessageResponseDTO;
>>>         } catch (SocketTimeoutException e) {
>>>             received_time_out_counter.incr();
>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>
>>>         } catch (Throwable t) {
>>>             received_fail_status_counter.incr();
>>>             
>>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>             if (t.getMessage() != null) {
>>>                 log.error("externalServiceOutputBolt, error executing 
>>> externalService API. errorMsg=" + t.getMessage(), t);
>>>             }
>>>         } finally {
>>>             if (response != null) {
>>>                 response.close();
>>>             }
>>>         }
>>>         return pushMessageResponseDTO;
>>>     }
>>>
>>>     private CloseableHttpClient getHttpClientInstance() {
>>>         PoolingHttpClientConnectionManager cm = new 
>>> PoolingHttpClientConnectionManager();
>>>         cm.setDefaultMaxPerRoute(100);
>>>         cm.setMaxTotal(500);
>>>         int timeout = 4;
>>>         RequestConfig config = RequestConfig.custom()
>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>                 .setSocketTimeout(timeout * 1000).build();
>>>         return HttpClientBuilder.create().
>>>                 setDefaultRequestConfig(config).
>>>                 setConnectionManager(cm).
>>>                 build();
>>>     }
>>> }
>>>
>>> Thank you.
>>>
>>
>

Reply via email to