You don't need to ack when extending BaseBasicBolt On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <[email protected]> wrote:
> try to replace: basicOutputCollector.emit(new > Values(pushMessageResponseDTO)); > with > basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO)); > collector.ack(tuple); > > > > On 10 March 2015 at 13:43, Haralds Ulmanis <[email protected]> wrote: > >> And where do you ACK/FAIL tuples ? >> >> On 10 March 2015 at 13:39, Idan Fridman <[email protected]> wrote: >> >>> By the metrics I can see some errors yes. >>> >>> but if I use try and catch why would they timout in a loop? once they >>> timeout i am catching them logging them and thats it >>> >>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>: >>> >>>> Do you have a large number of errored tuples in this topology? You >>>> might run into a situation where tuples timeout in a loop >>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote: >>>> >>>>> My Topology including a bolt which opening Http Request to webservice. >>>>> The average response is 500 milliseconds (how-ever sometimes it takes >>>>> longer) >>>>> >>>>> * I added timeout functionality. and I am using KafkaSpout >>>>> >>>>> When I send messages one by one everything working fine but >>>>> >>>>> Under High throughput *that bolt is getting stuck and nothing get >>>>> into there anymore.* and the worst thing I am having a "reply" of the >>>>> messages >>>>> >>>>> The only way to get thru this is to reset kafka's offset. else the >>>>> zookeeper still logging kafka's offset and messages are still replying >>>>> >>>>> >>>>> 1. *Why Messages being replied? I dont need that* >>>>> 2. Here is my code example of the"ExternalServiceOutputBolt >>>>> >>>>> package com.mycompany.push.topology; >>>>> >>>>> import backtype.storm.metric.api.CountMetric; >>>>> import backtype.storm.task.TopologyContext; >>>>> import backtype.storm.topology.BasicOutputCollector; >>>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>>> import backtype.storm.topology.base.BaseBasicBolt; >>>>> import backtype.storm.tuple.Fields; >>>>> import backtype.storm.tuple.Tuple; >>>>> import backtype.storm.tuple.Values; >>>>> import com.mycompany.push.dto.PushMessageRequestDTO; >>>>> import com.mycompany.push.dto.PushMessageResponseDTO; >>>>> import org.apache.http.NameValuePair; >>>>> import org.apache.http.client.config.RequestConfig; >>>>> import org.apache.http.client.entity.UrlEncodedFormEntity; >>>>> import org.apache.http.client.methods.CloseableHttpResponse; >>>>> import org.apache.http.client.methods.HttpPost; >>>>> import org.apache.http.impl.client.CloseableHttpClient; >>>>> import org.apache.http.impl.client.HttpClientBuilder; >>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; >>>>> import org.apache.http.message.BasicNameValuePair; >>>>> import org.slf4j.Logger; >>>>> import org.slf4j.LoggerFactory; >>>>> >>>>> import java.io.IOException; >>>>> import java.net.SocketTimeoutException; >>>>> import java.util.ArrayList; >>>>> import java.util.List; >>>>> import java.util.Map; >>>>> import java.util.concurrent.LinkedBlockingQueue; >>>>> >>>>> >>>>> >>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt { >>>>> >>>>> private CloseableHttpClient httpClient; >>>>> >>>>> >>>>> @Override >>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>>> declarer.declare(new Fields("pushMessageResponse")); >>>>> } >>>>> >>>>> >>>>> @Override >>>>> public void prepare(Map stormConf, TopologyContext context) { >>>>> externalServiceGraphUrl = (String) >>>>> stormConf.get("externalServiceGraphUrl"); >>>>> initMetrics(context); >>>>> httpClient = getHttpClientInstance(); >>>>> } >>>>> >>>>> >>>>> @Override >>>>> public void execute(Tuple tuple, BasicOutputCollector >>>>> basicOutputCollector) { >>>>> try { >>>>> received_message_counter.incr(); >>>>> final PushMessageRequestDTO pushMessageRequestDTO = >>>>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage"); >>>>> if (pushMessageRequestDTO != null) { >>>>> PushMessageResponseDTO pushMessageResponseDTO = >>>>> executePushNotificationRequest(pushMessageRequestDTO); >>>>> returned_from_externalService_counter.incr(); >>>>> System.out.println("externalServiceOutputBolt,emit tupple >>>>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + >>>>> pushMessageRequestDTO.getRefId()); >>>>> basicOutputCollector.emit(new >>>>> Values(pushMessageResponseDTO)); >>>>> } >>>>> } catch (Exception e) { >>>>> log.error("externalServiceOutputBolt. Error", e); >>>>> } >>>>> } >>>>> >>>>> private PushMessageResponseDTO >>>>> executePushNotificationRequest(PushMessageRequestDTO >>>>> pushMessageRequestDTO) throws IOException { >>>>> PushMessageResponseDTO pushMessageResponseDTO = new >>>>> PushMessageResponseDTO(pushMessageRequestDTO); >>>>> CloseableHttpResponse response = null; >>>>> try { >>>>> >>>>> HttpPost post = new HttpPost("external.url"); >>>>> post.setEntity(new UrlEncodedFormEntity(urlParameters)); >>>>> response = httpClient.execute(post); >>>>> response.getEntity(); >>>>> if (response.getStatusLine().getStatusCode() != 200) { >>>>> received_not_status_200_counter.incr(); >>>>> } else { >>>>> received_status_200_counter.incr(); >>>>> } >>>>> log.debug("externalServiceOutputBolt.onCompleted, >>>>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", >>>>> responseBody=" + response.getStatusLine().getReasonPhrase()); >>>>> return pushMessageResponseDTO; >>>>> } catch (SocketTimeoutException e) { >>>>> received_time_out_counter.incr(); >>>>> log.error("externalServiceOutputBolt, TimeoutException", e); >>>>> >>>>> } catch (Throwable t) { >>>>> received_fail_status_counter.incr(); >>>>> >>>>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL); >>>>> if (t.getMessage() != null) { >>>>> log.error("externalServiceOutputBolt, error executing >>>>> externalService API. errorMsg=" + t.getMessage(), t); >>>>> } >>>>> } finally { >>>>> if (response != null) { >>>>> response.close(); >>>>> } >>>>> } >>>>> return pushMessageResponseDTO; >>>>> } >>>>> >>>>> private CloseableHttpClient getHttpClientInstance() { >>>>> PoolingHttpClientConnectionManager cm = new >>>>> PoolingHttpClientConnectionManager(); >>>>> cm.setDefaultMaxPerRoute(100); >>>>> cm.setMaxTotal(500); >>>>> int timeout = 4; >>>>> RequestConfig config = RequestConfig.custom() >>>>> .setConnectTimeout(timeout * 1000) //in millis >>>>> .setConnectionRequestTimeout(timeout * 1000) >>>>> .setSocketTimeout(timeout * 1000).build(); >>>>> return HttpClientBuilder.create(). >>>>> setDefaultRequestConfig(config). >>>>> setConnectionManager(cm). >>>>> build(); >>>>> } >>>>> } >>>>> >>>>> Thank you. >>>>> >>>> >>> >> >
