If your tuple times out then it will be failed at the spout. Think of it this way, after the spout emits, your tuple has to make it through the output queue, and finish processing before the timeout period occurs. Otherwise with KafkaSpout it will be re-emitted.
1) Spout emits to output queue (max size: max spout pending) 2) tuple drains through output queue (size n, worst case is max spout pending) -- output queue 0 -- -- output queue 1 -- -- ... -- -- output queue n -- 3) other processing? 4) http bolt For the purpose of illustration, let's assume parallelism is 1 across the board, max spout pending is 100, timeout is 30 seconds, and your http bolt is taking the worst case scenario of 1 second (from your code). If you are getting 1 input tuple every 2 seconds, then you are fine. Your spout output queue is empty, your processing takes ~1 second, and you're done. Now let's assume you get a flood of 100 tuples all of a sudden. Your spout pulls these off of Kafka, and fills the output queue. Now when it emits the next tuple, that tuple has to wait for the 100 tuples in the output queue ahead of it to drain before it can be processed. In this example, this will take 100 seconds. Not only will this tuple timeout, but tuples in the queue ahead of it will as well. These tuples will all need to be replayed, and will continue to timeout because there is too much in the queue ahead of them. You will see that the bolt is very busy but your topology is unable to make progress. At least, it sounds like this might be what is happening. On Tue, Mar 10, 2015 at 9:39 AM, Idan Fridman <[email protected]> wrote: > By the metrics I can see some errors yes. > > but if I use try and catch why would they timout in a loop? once they > timeout i am catching them logging them and thats it > > 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>: > >> Do you have a large number of errored tuples in this topology? You might >> run into a situation where tuples timeout in a loop >> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote: >> >>> My Topology including a bolt which opening Http Request to webservice. >>> The average response is 500 milliseconds (how-ever sometimes it takes >>> longer) >>> >>> * I added timeout functionality. and I am using KafkaSpout >>> >>> When I send messages one by one everything working fine but >>> >>> Under High throughput *that bolt is getting stuck and nothing get into >>> there anymore.* and the worst thing I am having a "reply" of the >>> messages >>> >>> The only way to get thru this is to reset kafka's offset. else the >>> zookeeper still logging kafka's offset and messages are still replying >>> >>> >>> 1. *Why Messages being replied? I dont need that* >>> 2. Here is my code example of the"ExternalServiceOutputBolt >>> >>> package com.mycompany.push.topology; >>> >>> import backtype.storm.metric.api.CountMetric; >>> import backtype.storm.task.TopologyContext; >>> import backtype.storm.topology.BasicOutputCollector; >>> import backtype.storm.topology.OutputFieldsDeclarer; >>> import backtype.storm.topology.base.BaseBasicBolt; >>> import backtype.storm.tuple.Fields; >>> import backtype.storm.tuple.Tuple; >>> import backtype.storm.tuple.Values; >>> import com.mycompany.push.dto.PushMessageRequestDTO; >>> import com.mycompany.push.dto.PushMessageResponseDTO; >>> import org.apache.http.NameValuePair; >>> import org.apache.http.client.config.RequestConfig; >>> import org.apache.http.client.entity.UrlEncodedFormEntity; >>> import org.apache.http.client.methods.CloseableHttpResponse; >>> import org.apache.http.client.methods.HttpPost; >>> import org.apache.http.impl.client.CloseableHttpClient; >>> import org.apache.http.impl.client.HttpClientBuilder; >>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; >>> import org.apache.http.message.BasicNameValuePair; >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> import java.io.IOException; >>> import java.net.SocketTimeoutException; >>> import java.util.ArrayList; >>> import java.util.List; >>> import java.util.Map; >>> import java.util.concurrent.LinkedBlockingQueue; >>> >>> >>> >>> public class ExternalServiceOutputBolt extends BaseBasicBolt { >>> >>> private CloseableHttpClient httpClient; >>> >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("pushMessageResponse")); >>> } >>> >>> >>> @Override >>> public void prepare(Map stormConf, TopologyContext context) { >>> externalServiceGraphUrl = (String) >>> stormConf.get("externalServiceGraphUrl"); >>> initMetrics(context); >>> httpClient = getHttpClientInstance(); >>> } >>> >>> >>> @Override >>> public void execute(Tuple tuple, BasicOutputCollector >>> basicOutputCollector) { >>> try { >>> received_message_counter.incr(); >>> final PushMessageRequestDTO pushMessageRequestDTO = >>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage"); >>> if (pushMessageRequestDTO != null) { >>> PushMessageResponseDTO pushMessageResponseDTO = >>> executePushNotificationRequest(pushMessageRequestDTO); >>> returned_from_externalService_counter.incr(); >>> System.out.println("externalServiceOutputBolt,emit tupple >>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + >>> pushMessageRequestDTO.getRefId()); >>> basicOutputCollector.emit(new >>> Values(pushMessageResponseDTO)); >>> } >>> } catch (Exception e) { >>> log.error("externalServiceOutputBolt. Error", e); >>> } >>> } >>> >>> private PushMessageResponseDTO >>> executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) >>> throws IOException { >>> PushMessageResponseDTO pushMessageResponseDTO = new >>> PushMessageResponseDTO(pushMessageRequestDTO); >>> CloseableHttpResponse response = null; >>> try { >>> >>> HttpPost post = new HttpPost("external.url"); >>> post.setEntity(new UrlEncodedFormEntity(urlParameters)); >>> response = httpClient.execute(post); >>> response.getEntity(); >>> if (response.getStatusLine().getStatusCode() != 200) { >>> received_not_status_200_counter.incr(); >>> } else { >>> received_status_200_counter.incr(); >>> } >>> log.debug("externalServiceOutputBolt.onCompleted, >>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", >>> responseBody=" + response.getStatusLine().getReasonPhrase()); >>> return pushMessageResponseDTO; >>> } catch (SocketTimeoutException e) { >>> received_time_out_counter.incr(); >>> log.error("externalServiceOutputBolt, TimeoutException", e); >>> >>> } catch (Throwable t) { >>> received_fail_status_counter.incr(); >>> >>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL); >>> if (t.getMessage() != null) { >>> log.error("externalServiceOutputBolt, error executing >>> externalService API. errorMsg=" + t.getMessage(), t); >>> } >>> } finally { >>> if (response != null) { >>> response.close(); >>> } >>> } >>> return pushMessageResponseDTO; >>> } >>> >>> private CloseableHttpClient getHttpClientInstance() { >>> PoolingHttpClientConnectionManager cm = new >>> PoolingHttpClientConnectionManager(); >>> cm.setDefaultMaxPerRoute(100); >>> cm.setMaxTotal(500); >>> int timeout = 4; >>> RequestConfig config = RequestConfig.custom() >>> .setConnectTimeout(timeout * 1000) //in millis >>> .setConnectionRequestTimeout(timeout * 1000) >>> .setSocketTimeout(timeout * 1000).build(); >>> return HttpClientBuilder.create(). >>> setDefaultRequestConfig(config). >>> setConnectionManager(cm). >>> build(); >>> } >>> } >>> >>> Thank you. >>> >> >
