Do you have a large number of errored tuples in this topology? You might
run into a situation where tuples timeout in a loop
On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:

> My Topology including a bolt which opening Http Request to webservice.
> The average response is 500 milliseconds (how-ever sometimes it takes
> longer)
>
> * I added timeout functionality. and I am using KafkaSpout
>
> When I send messages one by one everything working fine but
>
> Under High throughput *that bolt is getting stuck and nothing get into
> there anymore.* and the worst thing I am having a "reply" of the messages
>
> The only way to get thru this is to reset kafka's offset. else the
> zookeeper still logging kafka's offset and messages are still replying
>
>
> 1. *Why Messages being replied? I dont need that*
> 2. Here is  my code example of the"ExternalServiceOutputBolt
>
> package com.mycompany.push.topology;
>
> import backtype.storm.metric.api.CountMetric;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.BasicOutputCollector;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseBasicBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import com.mycompany.push.dto.PushMessageRequestDTO;
> import com.mycompany.push.dto.PushMessageResponseDTO;
> import org.apache.http.NameValuePair;
> import org.apache.http.client.config.RequestConfig;
> import org.apache.http.client.entity.UrlEncodedFormEntity;
> import org.apache.http.client.methods.CloseableHttpResponse;
> import org.apache.http.client.methods.HttpPost;
> import org.apache.http.impl.client.CloseableHttpClient;
> import org.apache.http.impl.client.HttpClientBuilder;
> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
> import org.apache.http.message.BasicNameValuePair;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.IOException;
> import java.net.SocketTimeoutException;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.LinkedBlockingQueue;
>
>
>
> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>
>     private CloseableHttpClient httpClient;
>
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("pushMessageResponse"));
>     }
>
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context) {
>         externalServiceGraphUrl = (String) 
> stormConf.get("externalServiceGraphUrl");
>         initMetrics(context);
>         httpClient = getHttpClientInstance();
>     }
>
>
>     @Override
>     public void execute(Tuple tuple, BasicOutputCollector 
> basicOutputCollector) {
>         try {
>             received_message_counter.incr();
>             final PushMessageRequestDTO pushMessageRequestDTO = 
> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>             if (pushMessageRequestDTO != null) {
>                 PushMessageResponseDTO pushMessageResponseDTO = 
> executePushNotificationRequest(pushMessageRequestDTO);
>                 returned_from_externalService_counter.incr();
>                 System.out.println("externalServiceOutputBolt,emit tupple 
> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + 
> pushMessageRequestDTO.getRefId());
>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>             }
>         } catch (Exception e) {
>             log.error("externalServiceOutputBolt. Error", e);
>         }
>     }
>
>     private PushMessageResponseDTO 
> executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) 
> throws IOException {
>         PushMessageResponseDTO pushMessageResponseDTO = new 
> PushMessageResponseDTO(pushMessageRequestDTO);
>         CloseableHttpResponse response = null;
>         try {
>
>             HttpPost post = new HttpPost("external.url");
>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>             response = httpClient.execute(post);
>             response.getEntity();
>             if (response.getStatusLine().getStatusCode() != 200) {
>                 received_not_status_200_counter.incr();
>             } else {
>                 received_status_200_counter.incr();
>             }
>             log.debug("externalServiceOutputBolt.onCompleted,  
> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", 
> responseBody=" + response.getStatusLine().getReasonPhrase());
>             return pushMessageResponseDTO;
>         } catch (SocketTimeoutException e) {
>             received_time_out_counter.incr();
>             log.error("externalServiceOutputBolt, TimeoutException", e);
>
>         } catch (Throwable t) {
>             received_fail_status_counter.incr();
>             
> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>             if (t.getMessage() != null) {
>                 log.error("externalServiceOutputBolt, error executing 
> externalService API. errorMsg=" + t.getMessage(), t);
>             }
>         } finally {
>             if (response != null) {
>                 response.close();
>             }
>         }
>         return pushMessageResponseDTO;
>     }
>
>     private CloseableHttpClient getHttpClientInstance() {
>         PoolingHttpClientConnectionManager cm = new 
> PoolingHttpClientConnectionManager();
>         cm.setDefaultMaxPerRoute(100);
>         cm.setMaxTotal(500);
>         int timeout = 4;
>         RequestConfig config = RequestConfig.custom()
>                 .setConnectTimeout(timeout * 1000) //in millis
>                 .setConnectionRequestTimeout(timeout * 1000)
>                 .setSocketTimeout(timeout * 1000).build();
>         return HttpClientBuilder.create().
>                 setDefaultRequestConfig(config).
>                 setConnectionManager(cm).
>                 build();
>     }
> }
>
> Thank you.
>

Reply via email to