try to replace: basicOutputCollector.emit(new
Values(pushMessageResponseDTO));
with
 basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
            collector.ack(tuple);



On 10 March 2015 at 13:43, Haralds Ulmanis <[email protected]> wrote:

> And where do you ACK/FAIL tuples ?
>
> On 10 March 2015 at 13:39, Idan Fridman <[email protected]> wrote:
>
>> By the metrics I can see some errors yes.
>>
>> but if I use try and catch why would they timout in a loop? once they
>> timeout i am catching them logging them and thats it
>>
>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>:
>>
>>> Do you have a large number of errored tuples in this topology? You might
>>> run into a situation where tuples timeout in a loop
>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:
>>>
>>>> My Topology including a bolt which opening Http Request to webservice.
>>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>>> longer)
>>>>
>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>
>>>> When I send messages one by one everything working fine but
>>>>
>>>> Under High throughput *that bolt is getting stuck and nothing get into
>>>> there anymore.* and the worst thing I am having a "reply" of the
>>>> messages
>>>>
>>>> The only way to get thru this is to reset kafka's offset. else the
>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>
>>>>
>>>> 1. *Why Messages being replied? I dont need that*
>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>
>>>> package com.mycompany.push.topology;
>>>>
>>>> import backtype.storm.metric.api.CountMetric;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.BasicOutputCollector;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>> import org.apache.http.NameValuePair;
>>>> import org.apache.http.client.config.RequestConfig;
>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>> import org.apache.http.client.methods.HttpPost;
>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>> import org.apache.http.message.BasicNameValuePair;
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>>
>>>> import java.io.IOException;
>>>> import java.net.SocketTimeoutException;
>>>> import java.util.ArrayList;
>>>> import java.util.List;
>>>> import java.util.Map;
>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>
>>>>
>>>>
>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>
>>>>     private CloseableHttpClient httpClient;
>>>>
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>         externalServiceGraphUrl = (String) 
>>>> stormConf.get("externalServiceGraphUrl");
>>>>         initMetrics(context);
>>>>         httpClient = getHttpClientInstance();
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public void execute(Tuple tuple, BasicOutputCollector 
>>>> basicOutputCollector) {
>>>>         try {
>>>>             received_message_counter.incr();
>>>>             final PushMessageRequestDTO pushMessageRequestDTO = 
>>>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>             if (pushMessageRequestDTO != null) {
>>>>                 PushMessageResponseDTO pushMessageResponseDTO = 
>>>> executePushNotificationRequest(pushMessageRequestDTO);
>>>>                 returned_from_externalService_counter.incr();
>>>>                 System.out.println("externalServiceOutputBolt,emit tupple 
>>>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + 
>>>> pushMessageRequestDTO.getRefId());
>>>>                 basicOutputCollector.emit(new 
>>>> Values(pushMessageResponseDTO));
>>>>             }
>>>>         } catch (Exception e) {
>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>         }
>>>>     }
>>>>
>>>>     private PushMessageResponseDTO 
>>>> executePushNotificationRequest(PushMessageRequestDTO 
>>>> pushMessageRequestDTO) throws IOException {
>>>>         PushMessageResponseDTO pushMessageResponseDTO = new 
>>>> PushMessageResponseDTO(pushMessageRequestDTO);
>>>>         CloseableHttpResponse response = null;
>>>>         try {
>>>>
>>>>             HttpPost post = new HttpPost("external.url");
>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>             response = httpClient.execute(post);
>>>>             response.getEntity();
>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>                 received_not_status_200_counter.incr();
>>>>             } else {
>>>>                 received_status_200_counter.incr();
>>>>             }
>>>>             log.debug("externalServiceOutputBolt.onCompleted,  
>>>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", 
>>>> responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>             return pushMessageResponseDTO;
>>>>         } catch (SocketTimeoutException e) {
>>>>             received_time_out_counter.incr();
>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>
>>>>         } catch (Throwable t) {
>>>>             received_fail_status_counter.incr();
>>>>             
>>>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>             if (t.getMessage() != null) {
>>>>                 log.error("externalServiceOutputBolt, error executing 
>>>> externalService API. errorMsg=" + t.getMessage(), t);
>>>>             }
>>>>         } finally {
>>>>             if (response != null) {
>>>>                 response.close();
>>>>             }
>>>>         }
>>>>         return pushMessageResponseDTO;
>>>>     }
>>>>
>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>         PoolingHttpClientConnectionManager cm = new 
>>>> PoolingHttpClientConnectionManager();
>>>>         cm.setDefaultMaxPerRoute(100);
>>>>         cm.setMaxTotal(500);
>>>>         int timeout = 4;
>>>>         RequestConfig config = RequestConfig.custom()
>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>         return HttpClientBuilder.create().
>>>>                 setDefaultRequestConfig(config).
>>>>                 setConnectionManager(cm).
>>>>                 build();
>>>>     }
>>>> }
>>>>
>>>> Thank you.
>>>>
>>>
>>
>

Reply via email to