By the metrics I can see some errors yes. but if I use try and catch why would they timout in a loop? once they timeout i am catching them logging them and thats it
2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>: > Do you have a large number of errored tuples in this topology? You might > run into a situation where tuples timeout in a loop > On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote: > >> My Topology including a bolt which opening Http Request to webservice. >> The average response is 500 milliseconds (how-ever sometimes it takes >> longer) >> >> * I added timeout functionality. and I am using KafkaSpout >> >> When I send messages one by one everything working fine but >> >> Under High throughput *that bolt is getting stuck and nothing get into >> there anymore.* and the worst thing I am having a "reply" of the >> messages >> >> The only way to get thru this is to reset kafka's offset. else the >> zookeeper still logging kafka's offset and messages are still replying >> >> >> 1. *Why Messages being replied? I dont need that* >> 2. Here is my code example of the"ExternalServiceOutputBolt >> >> package com.mycompany.push.topology; >> >> import backtype.storm.metric.api.CountMetric; >> import backtype.storm.task.TopologyContext; >> import backtype.storm.topology.BasicOutputCollector; >> import backtype.storm.topology.OutputFieldsDeclarer; >> import backtype.storm.topology.base.BaseBasicBolt; >> import backtype.storm.tuple.Fields; >> import backtype.storm.tuple.Tuple; >> import backtype.storm.tuple.Values; >> import com.mycompany.push.dto.PushMessageRequestDTO; >> import com.mycompany.push.dto.PushMessageResponseDTO; >> import org.apache.http.NameValuePair; >> import org.apache.http.client.config.RequestConfig; >> import org.apache.http.client.entity.UrlEncodedFormEntity; >> import org.apache.http.client.methods.CloseableHttpResponse; >> import org.apache.http.client.methods.HttpPost; >> import org.apache.http.impl.client.CloseableHttpClient; >> import org.apache.http.impl.client.HttpClientBuilder; >> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; >> import org.apache.http.message.BasicNameValuePair; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import java.io.IOException; >> import java.net.SocketTimeoutException; >> import java.util.ArrayList; >> import java.util.List; >> import java.util.Map; >> import java.util.concurrent.LinkedBlockingQueue; >> >> >> >> public class ExternalServiceOutputBolt extends BaseBasicBolt { >> >> private CloseableHttpClient httpClient; >> >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> declarer.declare(new Fields("pushMessageResponse")); >> } >> >> >> @Override >> public void prepare(Map stormConf, TopologyContext context) { >> externalServiceGraphUrl = (String) >> stormConf.get("externalServiceGraphUrl"); >> initMetrics(context); >> httpClient = getHttpClientInstance(); >> } >> >> >> @Override >> public void execute(Tuple tuple, BasicOutputCollector >> basicOutputCollector) { >> try { >> received_message_counter.incr(); >> final PushMessageRequestDTO pushMessageRequestDTO = >> (PushMessageRequestDTO) tuple.getValueByField("pushMessage"); >> if (pushMessageRequestDTO != null) { >> PushMessageResponseDTO pushMessageResponseDTO = >> executePushNotificationRequest(pushMessageRequestDTO); >> returned_from_externalService_counter.incr(); >> System.out.println("externalServiceOutputBolt,emit tupple >> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + >> pushMessageRequestDTO.getRefId()); >> basicOutputCollector.emit(new >> Values(pushMessageResponseDTO)); >> } >> } catch (Exception e) { >> log.error("externalServiceOutputBolt. Error", e); >> } >> } >> >> private PushMessageResponseDTO >> executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) >> throws IOException { >> PushMessageResponseDTO pushMessageResponseDTO = new >> PushMessageResponseDTO(pushMessageRequestDTO); >> CloseableHttpResponse response = null; >> try { >> >> HttpPost post = new HttpPost("external.url"); >> post.setEntity(new UrlEncodedFormEntity(urlParameters)); >> response = httpClient.execute(post); >> response.getEntity(); >> if (response.getStatusLine().getStatusCode() != 200) { >> received_not_status_200_counter.incr(); >> } else { >> received_status_200_counter.incr(); >> } >> log.debug("externalServiceOutputBolt.onCompleted, >> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", >> responseBody=" + response.getStatusLine().getReasonPhrase()); >> return pushMessageResponseDTO; >> } catch (SocketTimeoutException e) { >> received_time_out_counter.incr(); >> log.error("externalServiceOutputBolt, TimeoutException", e); >> >> } catch (Throwable t) { >> received_fail_status_counter.incr(); >> >> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL); >> if (t.getMessage() != null) { >> log.error("externalServiceOutputBolt, error executing >> externalService API. errorMsg=" + t.getMessage(), t); >> } >> } finally { >> if (response != null) { >> response.close(); >> } >> } >> return pushMessageResponseDTO; >> } >> >> private CloseableHttpClient getHttpClientInstance() { >> PoolingHttpClientConnectionManager cm = new >> PoolingHttpClientConnectionManager(); >> cm.setDefaultMaxPerRoute(100); >> cm.setMaxTotal(500); >> int timeout = 4; >> RequestConfig config = RequestConfig.custom() >> .setConnectTimeout(timeout * 1000) //in millis >> .setConnectionRequestTimeout(timeout * 1000) >> .setSocketTimeout(timeout * 1000).build(); >> return HttpClientBuilder.create(). >> setDefaultRequestConfig(config). >> setConnectionManager(cm). >> build(); >> } >> } >> >> Thank you. >> >
