If your tuple times out then it will be failed at the spout.  Think of it
this way, after the spout emits, your tuple has to make it through the
output queue, and finish processing before the timeout period occurs.
Otherwise with KafkaSpout it will be re-emitted.


1) Spout emits to output queue (max size: max spout pending)

2) tuple drains through output queue (size n, worst case is max spout
pending)

-- output queue 0 --
-- output queue 1 --
-- ... --
-- output queue n --

3) other processing?

4) http bolt

For the purpose of illustration, let's assume parallelism is 1 across the
board, max spout pending is 100, timeout is 30 seconds, and your http bolt
is taking the worst case scenario of 1 second (from your code).  If you are
getting 1 input tuple every 2 seconds, then you are fine.  Your spout
output queue is empty, your processing takes ~1 second, and you're done.
Now let's assume you get a flood of 100 tuples all of a sudden.  Your spout
pulls these off of Kafka, and fills the output queue.  Now when it emits
the next tuple, that tuple has to wait for the 100 tuples in the output
queue ahead of it to drain before it can be processed.  In this example,
this will take 100 seconds.  Not only will this tuple timeout, but tuples
in the queue ahead of it will as well.  These tuples will all need to be
replayed, and will continue to timeout because there is too much in the
queue ahead of them.  You will see that the bolt is very busy but your
topology is unable to make progress.

At least, it sounds like this might be what is happening.

On Tue, Mar 10, 2015 at 9:39 AM, Idan Fridman <[email protected]> wrote:

> By the metrics I can see some errors yes.
>
> but if I use try and catch why would they timout in a loop? once they
> timeout i am catching them logging them and thats it
>
> 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>:
>
>> Do you have a large number of errored tuples in this topology? You might
>> run into a situation where tuples timeout in a loop
>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:
>>
>>> My Topology including a bolt which opening Http Request to webservice.
>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>> longer)
>>>
>>> * I added timeout functionality. and I am using KafkaSpout
>>>
>>> When I send messages one by one everything working fine but
>>>
>>> Under High throughput *that bolt is getting stuck and nothing get into
>>> there anymore.* and the worst thing I am having a "reply" of the
>>> messages
>>>
>>> The only way to get thru this is to reset kafka's offset. else the
>>> zookeeper still logging kafka's offset and messages are still replying
>>>
>>>
>>> 1. *Why Messages being replied? I dont need that*
>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>
>>> package com.mycompany.push.topology;
>>>
>>> import backtype.storm.metric.api.CountMetric;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>> import org.apache.http.NameValuePair;
>>> import org.apache.http.client.config.RequestConfig;
>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>> import org.apache.http.client.methods.HttpPost;
>>> import org.apache.http.impl.client.CloseableHttpClient;
>>> import org.apache.http.impl.client.HttpClientBuilder;
>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>> import org.apache.http.message.BasicNameValuePair;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.io.IOException;
>>> import java.net.SocketTimeoutException;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> import java.util.Map;
>>> import java.util.concurrent.LinkedBlockingQueue;
>>>
>>>
>>>
>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>
>>>     private CloseableHttpClient httpClient;
>>>
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>     }
>>>
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>         externalServiceGraphUrl = (String) 
>>> stormConf.get("externalServiceGraphUrl");
>>>         initMetrics(context);
>>>         httpClient = getHttpClientInstance();
>>>     }
>>>
>>>
>>>     @Override
>>>     public void execute(Tuple tuple, BasicOutputCollector 
>>> basicOutputCollector) {
>>>         try {
>>>             received_message_counter.incr();
>>>             final PushMessageRequestDTO pushMessageRequestDTO = 
>>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>             if (pushMessageRequestDTO != null) {
>>>                 PushMessageResponseDTO pushMessageResponseDTO = 
>>> executePushNotificationRequest(pushMessageRequestDTO);
>>>                 returned_from_externalService_counter.incr();
>>>                 System.out.println("externalServiceOutputBolt,emit tupple 
>>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + 
>>> pushMessageRequestDTO.getRefId());
>>>                 basicOutputCollector.emit(new 
>>> Values(pushMessageResponseDTO));
>>>             }
>>>         } catch (Exception e) {
>>>             log.error("externalServiceOutputBolt. Error", e);
>>>         }
>>>     }
>>>
>>>     private PushMessageResponseDTO 
>>> executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) 
>>> throws IOException {
>>>         PushMessageResponseDTO pushMessageResponseDTO = new 
>>> PushMessageResponseDTO(pushMessageRequestDTO);
>>>         CloseableHttpResponse response = null;
>>>         try {
>>>
>>>             HttpPost post = new HttpPost("external.url");
>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>             response = httpClient.execute(post);
>>>             response.getEntity();
>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>                 received_not_status_200_counter.incr();
>>>             } else {
>>>                 received_status_200_counter.incr();
>>>             }
>>>             log.debug("externalServiceOutputBolt.onCompleted,  
>>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", 
>>> responseBody=" + response.getStatusLine().getReasonPhrase());
>>>             return pushMessageResponseDTO;
>>>         } catch (SocketTimeoutException e) {
>>>             received_time_out_counter.incr();
>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>
>>>         } catch (Throwable t) {
>>>             received_fail_status_counter.incr();
>>>             
>>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>             if (t.getMessage() != null) {
>>>                 log.error("externalServiceOutputBolt, error executing 
>>> externalService API. errorMsg=" + t.getMessage(), t);
>>>             }
>>>         } finally {
>>>             if (response != null) {
>>>                 response.close();
>>>             }
>>>         }
>>>         return pushMessageResponseDTO;
>>>     }
>>>
>>>     private CloseableHttpClient getHttpClientInstance() {
>>>         PoolingHttpClientConnectionManager cm = new 
>>> PoolingHttpClientConnectionManager();
>>>         cm.setDefaultMaxPerRoute(100);
>>>         cm.setMaxTotal(500);
>>>         int timeout = 4;
>>>         RequestConfig config = RequestConfig.custom()
>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>                 .setSocketTimeout(timeout * 1000).build();
>>>         return HttpClientBuilder.create().
>>>                 setDefaultRequestConfig(config).
>>>                 setConnectionManager(cm).
>>>                 build();
>>>     }
>>> }
>>>
>>> Thank you.
>>>
>>
>

Reply via email to