try to replace: basicOutputCollector.emit(new
Values(pushMessageResponseDTO));
with
basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
collector.ack(tuple);
On 10 March 2015 at 13:43, Haralds Ulmanis <[email protected]> wrote:
> And where do you ACK/FAIL tuples ?
>
> On 10 March 2015 at 13:39, Idan Fridman <[email protected]> wrote:
>
>> By the metrics I can see some errors yes.
>>
>> but if I use try and catch why would they timout in a loop? once they
>> timeout i am catching them logging them and thats it
>>
>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <[email protected]>:
>>
>>> Do you have a large number of errored tuples in this topology? You might
>>> run into a situation where tuples timeout in a loop
>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:
>>>
>>>> My Topology including a bolt which opening Http Request to webservice.
>>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>>> longer)
>>>>
>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>
>>>> When I send messages one by one everything working fine but
>>>>
>>>> Under High throughput *that bolt is getting stuck and nothing get into
>>>> there anymore.* and the worst thing I am having a "reply" of the
>>>> messages
>>>>
>>>> The only way to get thru this is to reset kafka's offset. else the
>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>
>>>>
>>>> 1. *Why Messages being replied? I dont need that*
>>>> 2. Here is my code example of the"ExternalServiceOutputBolt
>>>>
>>>> package com.mycompany.push.topology;
>>>>
>>>> import backtype.storm.metric.api.CountMetric;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.BasicOutputCollector;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>> import org.apache.http.NameValuePair;
>>>> import org.apache.http.client.config.RequestConfig;
>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>> import org.apache.http.client.methods.HttpPost;
>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>> import org.apache.http.message.BasicNameValuePair;
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>>
>>>> import java.io.IOException;
>>>> import java.net.SocketTimeoutException;
>>>> import java.util.ArrayList;
>>>> import java.util.List;
>>>> import java.util.Map;
>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>
>>>>
>>>>
>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>
>>>> private CloseableHttpClient httpClient;
>>>>
>>>>
>>>> @Override
>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>> declarer.declare(new Fields("pushMessageResponse"));
>>>> }
>>>>
>>>>
>>>> @Override
>>>> public void prepare(Map stormConf, TopologyContext context) {
>>>> externalServiceGraphUrl = (String)
>>>> stormConf.get("externalServiceGraphUrl");
>>>> initMetrics(context);
>>>> httpClient = getHttpClientInstance();
>>>> }
>>>>
>>>>
>>>> @Override
>>>> public void execute(Tuple tuple, BasicOutputCollector
>>>> basicOutputCollector) {
>>>> try {
>>>> received_message_counter.incr();
>>>> final PushMessageRequestDTO pushMessageRequestDTO =
>>>> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>> if (pushMessageRequestDTO != null) {
>>>> PushMessageResponseDTO pushMessageResponseDTO =
>>>> executePushNotificationRequest(pushMessageRequestDTO);
>>>> returned_from_externalService_counter.incr();
>>>> System.out.println("externalServiceOutputBolt,emit tupple
>>>> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" +
>>>> pushMessageRequestDTO.getRefId());
>>>> basicOutputCollector.emit(new
>>>> Values(pushMessageResponseDTO));
>>>> }
>>>> } catch (Exception e) {
>>>> log.error("externalServiceOutputBolt. Error", e);
>>>> }
>>>> }
>>>>
>>>> private PushMessageResponseDTO
>>>> executePushNotificationRequest(PushMessageRequestDTO
>>>> pushMessageRequestDTO) throws IOException {
>>>> PushMessageResponseDTO pushMessageResponseDTO = new
>>>> PushMessageResponseDTO(pushMessageRequestDTO);
>>>> CloseableHttpResponse response = null;
>>>> try {
>>>>
>>>> HttpPost post = new HttpPost("external.url");
>>>> post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>> response = httpClient.execute(post);
>>>> response.getEntity();
>>>> if (response.getStatusLine().getStatusCode() != 200) {
>>>> received_not_status_200_counter.incr();
>>>> } else {
>>>> received_status_200_counter.incr();
>>>> }
>>>> log.debug("externalServiceOutputBolt.onCompleted,
>>>> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ",
>>>> responseBody=" + response.getStatusLine().getReasonPhrase());
>>>> return pushMessageResponseDTO;
>>>> } catch (SocketTimeoutException e) {
>>>> received_time_out_counter.incr();
>>>> log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>
>>>> } catch (Throwable t) {
>>>> received_fail_status_counter.incr();
>>>>
>>>> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>> if (t.getMessage() != null) {
>>>> log.error("externalServiceOutputBolt, error executing
>>>> externalService API. errorMsg=" + t.getMessage(), t);
>>>> }
>>>> } finally {
>>>> if (response != null) {
>>>> response.close();
>>>> }
>>>> }
>>>> return pushMessageResponseDTO;
>>>> }
>>>>
>>>> private CloseableHttpClient getHttpClientInstance() {
>>>> PoolingHttpClientConnectionManager cm = new
>>>> PoolingHttpClientConnectionManager();
>>>> cm.setDefaultMaxPerRoute(100);
>>>> cm.setMaxTotal(500);
>>>> int timeout = 4;
>>>> RequestConfig config = RequestConfig.custom()
>>>> .setConnectTimeout(timeout * 1000) //in millis
>>>> .setConnectionRequestTimeout(timeout * 1000)
>>>> .setSocketTimeout(timeout * 1000).build();
>>>> return HttpClientBuilder.create().
>>>> setDefaultRequestConfig(config).
>>>> setConnectionManager(cm).
>>>> build();
>>>> }
>>>> }
>>>>
>>>> Thank you.
>>>>
>>>
>>
>