And where do you ACK/FAIL tuples ? On 10 March 2015 at 13:39, Idan Fridman <[email protected]> wrote:
> By the metrics I can see some errors yes. > > but if I use try and catch why would they timout in a loop? once they > timeout i am catching them logging them and thats it > > 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>: > >> Do you have a large number of errored tuples in this topology? You might >> run into a situation where tuples timeout in a loop >> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote: >> >>> My Topology including a bolt which opening Http Request to webservice. >>> The average response is 500 milliseconds (how-ever sometimes it takes >>> longer) >>> >>> * I added timeout functionality. and I am using KafkaSpout >>> >>> When I send messages one by one everything working fine but >>> >>> Under High throughput *that bolt is getting stuck and nothing get into >>> there anymore.* and the worst thing I am having a "reply" of the >>> messages >>> >>> The only way to get thru this is to reset kafka's offset. else the >>> zookeeper still logging kafka's offset and messages are still replying >>> >>> >>> 1. *Why Messages being replied? I dont need that* >>> 2. Here is my code example of the"ExternalServiceOutputBolt >>> >>> package com.mycompany.push.topology; >>> >>> import backtype.storm.metric.api.CountMetric; >>> import backtype.storm.task.TopologyContext; >>> import backtype.storm.topology.BasicOutputCollector; >>> import backtype.storm.topology.OutputFieldsDeclarer; >>> import backtype.storm.topology.base.BaseBasicBolt; >>> import backtype.storm.tuple.Fields; >>> import backtype.storm.tuple.Tuple; >>> import backtype.storm.tuple.Values; >>> import com.mycompany.push.dto.PushMessageRequestDTO; >>> import com.mycompany.push.dto.PushMessageResponseDTO; >>> import org.apache.http.NameValuePair; >>> import org.apache.http.client.config.RequestConfig; >>> import org.apache.http.client.entity.UrlEncodedFormEntity; >>> import org.apache.http.client.methods.CloseableHttpResponse; >>> import org.apache.http.client.methods.HttpPost; >>> import org.apache.http.impl.client.CloseableHttpClient; >>> import org.apache.http.impl.client.HttpClientBuilder; >>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; >>> import org.apache.http.message.BasicNameValuePair; >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> import java.io.IOException; >>> import java.net.SocketTimeoutException; >>> import java.util.ArrayList; >>> import java.util.List; >>> import java.util.Map; >>> import java.util.concurrent.LinkedBlockingQueue; >>> >>> >>> >>> public class ExternalServiceOutputBolt extends BaseBasicBolt { >>> >>> private CloseableHttpClient httpClient; >>> >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("pushMessageResponse")); >>> } >>> >>> >>> @Override >>> public void prepare(Map stormConf, TopologyContext context) { >>> externalServiceGraphUrl = (String) >>> stormConf.get("externalServiceGraphUrl"); >>> initMetrics(context); >>> httpClient = getHttpClientInstance(); >>> } >>> >>> >>> @Override >>> public void execute(Tuple tuple, BasicOutputCollector >>> basicOutputCollector) { >>> try { >>> received_message_counter.incr(); >>> final PushMessageRequestDTO pushMessageRequestDTO = >>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage"); >>> if (pushMessageRequestDTO != null) { >>> PushMessageResponseDTO pushMessageResponseDTO = >>> executePushNotificationRequest(pushMessageRequestDTO); >>> returned_from_externalService_counter.incr(); >>> System.out.println("externalServiceOutputBolt,emit tupple >>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + >>> pushMessageRequestDTO.getRefId()); >>> basicOutputCollector.emit(new >>> Values(pushMessageResponseDTO)); >>> } >>> } catch (Exception e) { >>> log.error("externalServiceOutputBolt. Error", e); >>> } >>> } >>> >>> private PushMessageResponseDTO >>> executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) >>> throws IOException { >>> PushMessageResponseDTO pushMessageResponseDTO = new >>> PushMessageResponseDTO(pushMessageRequestDTO); >>> CloseableHttpResponse response = null; >>> try { >>> >>> HttpPost post = new HttpPost("external.url"); >>> post.setEntity(new UrlEncodedFormEntity(urlParameters)); >>> response = httpClient.execute(post); >>> response.getEntity(); >>> if (response.getStatusLine().getStatusCode() != 200) { >>> received_not_status_200_counter.incr(); >>> } else { >>> received_status_200_counter.incr(); >>> } >>> log.debug("externalServiceOutputBolt.onCompleted, >>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", >>> responseBody=" + response.getStatusLine().getReasonPhrase()); >>> return pushMessageResponseDTO; >>> } catch (SocketTimeoutException e) { >>> received_time_out_counter.incr(); >>> log.error("externalServiceOutputBolt, TimeoutException", e); >>> >>> } catch (Throwable t) { >>> received_fail_status_counter.incr(); >>> >>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL); >>> if (t.getMessage() != null) { >>> log.error("externalServiceOutputBolt, error executing >>> externalService API. errorMsg=" + t.getMessage(), t); >>> } >>> } finally { >>> if (response != null) { >>> response.close(); >>> } >>> } >>> return pushMessageResponseDTO; >>> } >>> >>> private CloseableHttpClient getHttpClientInstance() { >>> PoolingHttpClientConnectionManager cm = new >>> PoolingHttpClientConnectionManager(); >>> cm.setDefaultMaxPerRoute(100); >>> cm.setMaxTotal(500); >>> int timeout = 4; >>> RequestConfig config = RequestConfig.custom() >>> .setConnectTimeout(timeout * 1000) //in millis >>> .setConnectionRequestTimeout(timeout * 1000) >>> .setSocketTimeout(timeout * 1000).build(); >>> return HttpClientBuilder.create(). >>> setDefaultRequestConfig(config). >>> setConnectionManager(cm). >>> build(); >>> } >>> } >>> >>> Thank you. >>> >> >
