My Topology including a bolt which opening Http Request to webservice.
The average response is 500 milliseconds (how-ever sometimes it takes
longer)

* I added timeout functionality. and I am using KafkaSpout

When I send messages one by one everything working fine but

Under High throughput *that bolt is getting stuck and nothing get into
there anymore.* and the worst thing I am having a "reply" of the messages

The only way to get thru this is to reset kafka's offset. else the
zookeeper still logging kafka's offset and messages are still replying


1. *Why Messages being replied? I dont need that*
2. Here is  my code example of the"ExternalServiceOutputBolt

package com.mycompany.push.topology;

import backtype.storm.metric.api.CountMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.mycompany.push.dto.PushMessageRequestDTO;
import com.mycompany.push.dto.PushMessageResponseDTO;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;



public class ExternalServiceOutputBolt extends BaseBasicBolt {

    private CloseableHttpClient httpClient;


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("pushMessageResponse"));
    }


    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        externalServiceGraphUrl = (String)
stormConf.get("externalServiceGraphUrl");
        initMetrics(context);
        httpClient = getHttpClientInstance();
    }


    @Override
    public void execute(Tuple tuple, BasicOutputCollector
basicOutputCollector) {
        try {
            received_message_counter.incr();
            final PushMessageRequestDTO pushMessageRequestDTO =
(PushMessageRequestDTO) tuple.getValueByField("pushMessage");
            if (pushMessageRequestDTO != null) {
                PushMessageResponseDTO pushMessageResponseDTO =
executePushNotificationRequest(pushMessageRequestDTO);
                returned_from_externalService_counter.incr();
                System.out.println("externalServiceOutputBolt,emit
tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" +
pushMessageRequestDTO.getRefId());
                basicOutputCollector.emit(new Values(pushMessageResponseDTO));
            }
        } catch (Exception e) {
            log.error("externalServiceOutputBolt. Error", e);
        }
    }

    private PushMessageResponseDTO
executePushNotificationRequest(PushMessageRequestDTO
pushMessageRequestDTO) throws IOException {
        PushMessageResponseDTO pushMessageResponseDTO = new
PushMessageResponseDTO(pushMessageRequestDTO);
        CloseableHttpResponse response = null;
        try {

            HttpPost post = new HttpPost("external.url");
            post.setEntity(new UrlEncodedFormEntity(urlParameters));
            response = httpClient.execute(post);
            response.getEntity();
            if (response.getStatusLine().getStatusCode() != 200) {
                received_not_status_200_counter.incr();
            } else {
                received_status_200_counter.incr();
            }
            log.debug("externalServiceOutputBolt.onCompleted,
pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ",
responseBody=" + response.getStatusLine().getReasonPhrase());
            return pushMessageResponseDTO;
        } catch (SocketTimeoutException e) {
            received_time_out_counter.incr();
            log.error("externalServiceOutputBolt, TimeoutException", e);

        } catch (Throwable t) {
            received_fail_status_counter.incr();
            
pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
            if (t.getMessage() != null) {
                log.error("externalServiceOutputBolt, error executing
externalService API. errorMsg=" + t.getMessage(), t);
            }
        } finally {
            if (response != null) {
                response.close();
            }
        }
        return pushMessageResponseDTO;
    }

    private CloseableHttpClient getHttpClientInstance() {
        PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager();
        cm.setDefaultMaxPerRoute(100);
        cm.setMaxTotal(500);
        int timeout = 4;
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(timeout * 1000) //in millis
                .setConnectionRequestTimeout(timeout * 1000)
                .setSocketTimeout(timeout * 1000).build();
        return HttpClientBuilder.create().
                setDefaultRequestConfig(config).
                setConnectionManager(cm).
                build();
    }
}

Thank you.

Reply via email to