By the metrics I can see some errors yes.

but if I use try and catch why would they timout in a loop? once they
timeout i am catching them logging them and thats it

2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>:

> Do you have a large number of errored tuples in this topology? You might
> run into a situation where tuples timeout in a loop
> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:
>
>> My Topology including a bolt which opening Http Request to webservice.
>> The average response is 500 milliseconds (how-ever sometimes it takes
>> longer)
>>
>> * I added timeout functionality. and I am using KafkaSpout
>>
>> When I send messages one by one everything working fine but
>>
>> Under High throughput *that bolt is getting stuck and nothing get into
>> there anymore.* and the worst thing I am having a "reply" of the
>> messages
>>
>> The only way to get thru this is to reset kafka's offset. else the
>> zookeeper still logging kafka's offset and messages are still replying
>>
>>
>> 1. *Why Messages being replied? I dont need that*
>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>
>> package com.mycompany.push.topology;
>>
>> import backtype.storm.metric.api.CountMetric;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.BasicOutputCollector;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseBasicBolt;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Tuple;
>> import backtype.storm.tuple.Values;
>> import com.mycompany.push.dto.PushMessageRequestDTO;
>> import com.mycompany.push.dto.PushMessageResponseDTO;
>> import org.apache.http.NameValuePair;
>> import org.apache.http.client.config.RequestConfig;
>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>> import org.apache.http.client.methods.CloseableHttpResponse;
>> import org.apache.http.client.methods.HttpPost;
>> import org.apache.http.impl.client.CloseableHttpClient;
>> import org.apache.http.impl.client.HttpClientBuilder;
>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>> import org.apache.http.message.BasicNameValuePair;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.IOException;
>> import java.net.SocketTimeoutException;
>> import java.util.ArrayList;
>> import java.util.List;
>> import java.util.Map;
>> import java.util.concurrent.LinkedBlockingQueue;
>>
>>
>>
>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>
>>     private CloseableHttpClient httpClient;
>>
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         declarer.declare(new Fields("pushMessageResponse"));
>>     }
>>
>>
>>     @Override
>>     public void prepare(Map stormConf, TopologyContext context) {
>>         externalServiceGraphUrl = (String) 
>> stormConf.get("externalServiceGraphUrl");
>>         initMetrics(context);
>>         httpClient = getHttpClientInstance();
>>     }
>>
>>
>>     @Override
>>     public void execute(Tuple tuple, BasicOutputCollector 
>> basicOutputCollector) {
>>         try {
>>             received_message_counter.incr();
>>             final PushMessageRequestDTO pushMessageRequestDTO = 
>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>             if (pushMessageRequestDTO != null) {
>>                 PushMessageResponseDTO pushMessageResponseDTO = 
>> executePushNotificationRequest(pushMessageRequestDTO);
>>                 returned_from_externalService_counter.incr();
>>                 System.out.println("externalServiceOutputBolt,emit tupple 
>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + 
>> pushMessageRequestDTO.getRefId());
>>                 basicOutputCollector.emit(new 
>> Values(pushMessageResponseDTO));
>>             }
>>         } catch (Exception e) {
>>             log.error("externalServiceOutputBolt. Error", e);
>>         }
>>     }
>>
>>     private PushMessageResponseDTO 
>> executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) 
>> throws IOException {
>>         PushMessageResponseDTO pushMessageResponseDTO = new 
>> PushMessageResponseDTO(pushMessageRequestDTO);
>>         CloseableHttpResponse response = null;
>>         try {
>>
>>             HttpPost post = new HttpPost("external.url");
>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>             response = httpClient.execute(post);
>>             response.getEntity();
>>             if (response.getStatusLine().getStatusCode() != 200) {
>>                 received_not_status_200_counter.incr();
>>             } else {
>>                 received_status_200_counter.incr();
>>             }
>>             log.debug("externalServiceOutputBolt.onCompleted,  
>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", 
>> responseBody=" + response.getStatusLine().getReasonPhrase());
>>             return pushMessageResponseDTO;
>>         } catch (SocketTimeoutException e) {
>>             received_time_out_counter.incr();
>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>
>>         } catch (Throwable t) {
>>             received_fail_status_counter.incr();
>>             
>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>             if (t.getMessage() != null) {
>>                 log.error("externalServiceOutputBolt, error executing 
>> externalService API. errorMsg=" + t.getMessage(), t);
>>             }
>>         } finally {
>>             if (response != null) {
>>                 response.close();
>>             }
>>         }
>>         return pushMessageResponseDTO;
>>     }
>>
>>     private CloseableHttpClient getHttpClientInstance() {
>>         PoolingHttpClientConnectionManager cm = new 
>> PoolingHttpClientConnectionManager();
>>         cm.setDefaultMaxPerRoute(100);
>>         cm.setMaxTotal(500);
>>         int timeout = 4;
>>         RequestConfig config = RequestConfig.custom()
>>                 .setConnectTimeout(timeout * 1000) //in millis
>>                 .setConnectionRequestTimeout(timeout * 1000)
>>                 .setSocketTimeout(timeout * 1000).build();
>>         return HttpClientBuilder.create().
>>                 setDefaultRequestConfig(config).
>>                 setConnectionManager(cm).
>>                 build();
>>     }
>> }
>>
>> Thank you.
>>
>

Reply via email to