You don't need to ack when extending BaseBasicBolt

On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <[email protected]>
wrote:

> try to replace: basicOutputCollector.emit(new
> Values(pushMessageResponseDTO));
> with
>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>             collector.ack(tuple);
>
>
>
> On 10 March 2015 at 13:43, Haralds Ulmanis <[email protected]> wrote:
>
>> And where do you ACK/FAIL tuples ?
>>
>> On 10 March 2015 at 13:39, Idan Fridman <[email protected]> wrote:
>>
>>> By the metrics I can see some errors yes.
>>>
>>> but if I use try and catch why would they timout in a loop? once they
>>> timeout i am catching them logging them and thats it
>>>
>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>:
>>>
>>>> Do you have a large number of errored tuples in this topology? You
>>>> might run into a situation where tuples timeout in a loop
>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:
>>>>
>>>>> My Topology including a bolt which opening Http Request to webservice.
>>>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>>>> longer)
>>>>>
>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>
>>>>> When I send messages one by one everything working fine but
>>>>>
>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>> into there anymore.* and the worst thing I am having a "reply" of the
>>>>> messages
>>>>>
>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>
>>>>>
>>>>> 1. *Why Messages being replied? I dont need that*
>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>
>>>>> package com.mycompany.push.topology;
>>>>>
>>>>> import backtype.storm.metric.api.CountMetric;
>>>>> import backtype.storm.task.TopologyContext;
>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>> import backtype.storm.tuple.Fields;
>>>>> import backtype.storm.tuple.Tuple;
>>>>> import backtype.storm.tuple.Values;
>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>> import org.apache.http.NameValuePair;
>>>>> import org.apache.http.client.config.RequestConfig;
>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>> import org.apache.http.client.methods.HttpPost;
>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>> import org.slf4j.Logger;
>>>>> import org.slf4j.LoggerFactory;
>>>>>
>>>>> import java.io.IOException;
>>>>> import java.net.SocketTimeoutException;
>>>>> import java.util.ArrayList;
>>>>> import java.util.List;
>>>>> import java.util.Map;
>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>
>>>>>
>>>>>
>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>
>>>>>     private CloseableHttpClient httpClient;
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>         externalServiceGraphUrl = (String) 
>>>>> stormConf.get("externalServiceGraphUrl");
>>>>>         initMetrics(context);
>>>>>         httpClient = getHttpClientInstance();
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void execute(Tuple tuple, BasicOutputCollector 
>>>>> basicOutputCollector) {
>>>>>         try {
>>>>>             received_message_counter.incr();
>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = 
>>>>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>             if (pushMessageRequestDTO != null) {
>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = 
>>>>> executePushNotificationRequest(pushMessageRequestDTO);
>>>>>                 returned_from_externalService_counter.incr();
>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple 
>>>>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + 
>>>>> pushMessageRequestDTO.getRefId());
>>>>>                 basicOutputCollector.emit(new 
>>>>> Values(pushMessageResponseDTO));
>>>>>             }
>>>>>         } catch (Exception e) {
>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private PushMessageResponseDTO 
>>>>> executePushNotificationRequest(PushMessageRequestDTO 
>>>>> pushMessageRequestDTO) throws IOException {
>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new 
>>>>> PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>         CloseableHttpResponse response = null;
>>>>>         try {
>>>>>
>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>             response = httpClient.execute(post);
>>>>>             response.getEntity();
>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>                 received_not_status_200_counter.incr();
>>>>>             } else {
>>>>>                 received_status_200_counter.incr();
>>>>>             }
>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  
>>>>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", 
>>>>> responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>             return pushMessageResponseDTO;
>>>>>         } catch (SocketTimeoutException e) {
>>>>>             received_time_out_counter.incr();
>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>
>>>>>         } catch (Throwable t) {
>>>>>             received_fail_status_counter.incr();
>>>>>             
>>>>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>             if (t.getMessage() != null) {
>>>>>                 log.error("externalServiceOutputBolt, error executing 
>>>>> externalService API. errorMsg=" + t.getMessage(), t);
>>>>>             }
>>>>>         } finally {
>>>>>             if (response != null) {
>>>>>                 response.close();
>>>>>             }
>>>>>         }
>>>>>         return pushMessageResponseDTO;
>>>>>     }
>>>>>
>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>         PoolingHttpClientConnectionManager cm = new 
>>>>> PoolingHttpClientConnectionManager();
>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>         cm.setMaxTotal(500);
>>>>>         int timeout = 4;
>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>         return HttpClientBuilder.create().
>>>>>                 setDefaultRequestConfig(config).
>>>>>                 setConnectionManager(cm).
>>>>>                 build();
>>>>>     }
>>>>> }
>>>>>
>>>>> Thank you.
>>>>>
>>>>
>>>
>>
>

Reply via email to