I attached storm-ui screenshot which I took in peak-time. I replaced the bolts names with numbers in order to be authentic
I can see some failed bolts within the spout. Any idea? [image: תמונה מוטבעת 1] 2015-03-10 15:49 GMT+02:00 Nathan Leung <[email protected]>: > You don't need to ack when extending BaseBasicBolt > > On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <[email protected]> > wrote: > >> try to replace: basicOutputCollector.emit(new >> Values(pushMessageResponseDTO)); >> with >> basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO)); >> collector.ack(tuple); >> >> >> >> On 10 March 2015 at 13:43, Haralds Ulmanis <[email protected]> wrote: >> >>> And where do you ACK/FAIL tuples ? >>> >>> On 10 March 2015 at 13:39, Idan Fridman <[email protected]> wrote: >>> >>>> By the metrics I can see some errors yes. >>>> >>>> but if I use try and catch why would they timout in a loop? once they >>>> timeout i am catching them logging them and thats it >>>> >>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>: >>>> >>>>> Do you have a large number of errored tuples in this topology? You >>>>> might run into a situation where tuples timeout in a loop >>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote: >>>>> >>>>>> My Topology including a bolt which opening Http Request to >>>>>> webservice. >>>>>> The average response is 500 milliseconds (how-ever sometimes it takes >>>>>> longer) >>>>>> >>>>>> * I added timeout functionality. and I am using KafkaSpout >>>>>> >>>>>> When I send messages one by one everything working fine but >>>>>> >>>>>> Under High throughput *that bolt is getting stuck and nothing get >>>>>> into there anymore.* and the worst thing I am having a "reply" of >>>>>> the messages >>>>>> >>>>>> The only way to get thru this is to reset kafka's offset. else the >>>>>> zookeeper still logging kafka's offset and messages are still replying >>>>>> >>>>>> >>>>>> 1. *Why Messages being replied? I dont need that* >>>>>> 2. Here is my code example of the"ExternalServiceOutputBolt >>>>>> >>>>>> package com.mycompany.push.topology; >>>>>> >>>>>> import backtype.storm.metric.api.CountMetric; >>>>>> import backtype.storm.task.TopologyContext; >>>>>> import backtype.storm.topology.BasicOutputCollector; >>>>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>>>> import backtype.storm.topology.base.BaseBasicBolt; >>>>>> import backtype.storm.tuple.Fields; >>>>>> import backtype.storm.tuple.Tuple; >>>>>> import backtype.storm.tuple.Values; >>>>>> import com.mycompany.push.dto.PushMessageRequestDTO; >>>>>> import com.mycompany.push.dto.PushMessageResponseDTO; >>>>>> import org.apache.http.NameValuePair; >>>>>> import org.apache.http.client.config.RequestConfig; >>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity; >>>>>> import org.apache.http.client.methods.CloseableHttpResponse; >>>>>> import org.apache.http.client.methods.HttpPost; >>>>>> import org.apache.http.impl.client.CloseableHttpClient; >>>>>> import org.apache.http.impl.client.HttpClientBuilder; >>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; >>>>>> import org.apache.http.message.BasicNameValuePair; >>>>>> import org.slf4j.Logger; >>>>>> import org.slf4j.LoggerFactory; >>>>>> >>>>>> import java.io.IOException; >>>>>> import java.net.SocketTimeoutException; >>>>>> import java.util.ArrayList; >>>>>> import java.util.List; >>>>>> import java.util.Map; >>>>>> import java.util.concurrent.LinkedBlockingQueue; >>>>>> >>>>>> >>>>>> >>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt { >>>>>> >>>>>> private CloseableHttpClient httpClient; >>>>>> >>>>>> >>>>>> @Override >>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>>>> declarer.declare(new Fields("pushMessageResponse")); >>>>>> } >>>>>> >>>>>> >>>>>> @Override >>>>>> public void prepare(Map stormConf, TopologyContext context) { >>>>>> externalServiceGraphUrl = (String) >>>>>> stormConf.get("externalServiceGraphUrl"); >>>>>> initMetrics(context); >>>>>> httpClient = getHttpClientInstance(); >>>>>> } >>>>>> >>>>>> >>>>>> @Override >>>>>> public void execute(Tuple tuple, BasicOutputCollector >>>>>> basicOutputCollector) { >>>>>> try { >>>>>> received_message_counter.incr(); >>>>>> final PushMessageRequestDTO pushMessageRequestDTO = >>>>>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage"); >>>>>> if (pushMessageRequestDTO != null) { >>>>>> PushMessageResponseDTO pushMessageResponseDTO = >>>>>> executePushNotificationRequest(pushMessageRequestDTO); >>>>>> returned_from_externalService_counter.incr(); >>>>>> System.out.println("externalServiceOutputBolt,emit >>>>>> tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + >>>>>> pushMessageRequestDTO.getRefId()); >>>>>> basicOutputCollector.emit(new >>>>>> Values(pushMessageResponseDTO)); >>>>>> } >>>>>> } catch (Exception e) { >>>>>> log.error("externalServiceOutputBolt. Error", e); >>>>>> } >>>>>> } >>>>>> >>>>>> private PushMessageResponseDTO >>>>>> executePushNotificationRequest(PushMessageRequestDTO >>>>>> pushMessageRequestDTO) throws IOException { >>>>>> PushMessageResponseDTO pushMessageResponseDTO = new >>>>>> PushMessageResponseDTO(pushMessageRequestDTO); >>>>>> CloseableHttpResponse response = null; >>>>>> try { >>>>>> >>>>>> HttpPost post = new HttpPost("external.url"); >>>>>> post.setEntity(new UrlEncodedFormEntity(urlParameters)); >>>>>> response = httpClient.execute(post); >>>>>> response.getEntity(); >>>>>> if (response.getStatusLine().getStatusCode() != 200) { >>>>>> received_not_status_200_counter.incr(); >>>>>> } else { >>>>>> received_status_200_counter.incr(); >>>>>> } >>>>>> log.debug("externalServiceOutputBolt.onCompleted, >>>>>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", >>>>>> responseBody=" + response.getStatusLine().getReasonPhrase()); >>>>>> return pushMessageResponseDTO; >>>>>> } catch (SocketTimeoutException e) { >>>>>> received_time_out_counter.incr(); >>>>>> log.error("externalServiceOutputBolt, TimeoutException", e); >>>>>> >>>>>> } catch (Throwable t) { >>>>>> received_fail_status_counter.incr(); >>>>>> >>>>>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL); >>>>>> if (t.getMessage() != null) { >>>>>> log.error("externalServiceOutputBolt, error executing >>>>>> externalService API. errorMsg=" + t.getMessage(), t); >>>>>> } >>>>>> } finally { >>>>>> if (response != null) { >>>>>> response.close(); >>>>>> } >>>>>> } >>>>>> return pushMessageResponseDTO; >>>>>> } >>>>>> >>>>>> private CloseableHttpClient getHttpClientInstance() { >>>>>> PoolingHttpClientConnectionManager cm = new >>>>>> PoolingHttpClientConnectionManager(); >>>>>> cm.setDefaultMaxPerRoute(100); >>>>>> cm.setMaxTotal(500); >>>>>> int timeout = 4; >>>>>> RequestConfig config = RequestConfig.custom() >>>>>> .setConnectTimeout(timeout * 1000) //in millis >>>>>> .setConnectionRequestTimeout(timeout * 1000) >>>>>> .setSocketTimeout(timeout * 1000).build(); >>>>>> return HttpClientBuilder.create(). >>>>>> setDefaultRequestConfig(config). >>>>>> setConnectionManager(cm). >>>>>> build(); >>>>>> } >>>>>> } >>>>>> >>>>>> Thank you. >>>>>> >>>>> >>>> >>> >> >
