Do you have a large number of errored tuples in this topology? You might
run into a situation where tuples timeout in a loop
On Mar 10, 2015 8:58 AM, "Idan Fridman" <[email protected]> wrote:
> My Topology including a bolt which opening Http Request to webservice.
> The average response is 500 milliseconds (how-ever sometimes it takes
> longer)
>
> * I added timeout functionality. and I am using KafkaSpout
>
> When I send messages one by one everything working fine but
>
> Under High throughput *that bolt is getting stuck and nothing get into
> there anymore.* and the worst thing I am having a "reply" of the messages
>
> The only way to get thru this is to reset kafka's offset. else the
> zookeeper still logging kafka's offset and messages are still replying
>
>
> 1. *Why Messages being replied? I dont need that*
> 2. Here is my code example of the"ExternalServiceOutputBolt
>
> package com.mycompany.push.topology;
>
> import backtype.storm.metric.api.CountMetric;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.BasicOutputCollector;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseBasicBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import com.mycompany.push.dto.PushMessageRequestDTO;
> import com.mycompany.push.dto.PushMessageResponseDTO;
> import org.apache.http.NameValuePair;
> import org.apache.http.client.config.RequestConfig;
> import org.apache.http.client.entity.UrlEncodedFormEntity;
> import org.apache.http.client.methods.CloseableHttpResponse;
> import org.apache.http.client.methods.HttpPost;
> import org.apache.http.impl.client.CloseableHttpClient;
> import org.apache.http.impl.client.HttpClientBuilder;
> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
> import org.apache.http.message.BasicNameValuePair;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.IOException;
> import java.net.SocketTimeoutException;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.LinkedBlockingQueue;
>
>
>
> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>
> private CloseableHttpClient httpClient;
>
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("pushMessageResponse"));
> }
>
>
> @Override
> public void prepare(Map stormConf, TopologyContext context) {
> externalServiceGraphUrl = (String)
> stormConf.get("externalServiceGraphUrl");
> initMetrics(context);
> httpClient = getHttpClientInstance();
> }
>
>
> @Override
> public void execute(Tuple tuple, BasicOutputCollector
> basicOutputCollector) {
> try {
> received_message_counter.incr();
> final PushMessageRequestDTO pushMessageRequestDTO =
> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
> if (pushMessageRequestDTO != null) {
> PushMessageResponseDTO pushMessageResponseDTO =
> executePushNotificationRequest(pushMessageRequestDTO);
> returned_from_externalService_counter.incr();
> System.out.println("externalServiceOutputBolt,emit tupple
> with snid= " + pushMessageRequestDTO.getSnid() + " refId=" +
> pushMessageRequestDTO.getRefId());
> basicOutputCollector.emit(new Values(pushMessageResponseDTO));
> }
> } catch (Exception e) {
> log.error("externalServiceOutputBolt. Error", e);
> }
> }
>
> private PushMessageResponseDTO
> executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO)
> throws IOException {
> PushMessageResponseDTO pushMessageResponseDTO = new
> PushMessageResponseDTO(pushMessageRequestDTO);
> CloseableHttpResponse response = null;
> try {
>
> HttpPost post = new HttpPost("external.url");
> post.setEntity(new UrlEncodedFormEntity(urlParameters));
> response = httpClient.execute(post);
> response.getEntity();
> if (response.getStatusLine().getStatusCode() != 200) {
> received_not_status_200_counter.incr();
> } else {
> received_status_200_counter.incr();
> }
> log.debug("externalServiceOutputBolt.onCompleted,
> pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ",
> responseBody=" + response.getStatusLine().getReasonPhrase());
> return pushMessageResponseDTO;
> } catch (SocketTimeoutException e) {
> received_time_out_counter.incr();
> log.error("externalServiceOutputBolt, TimeoutException", e);
>
> } catch (Throwable t) {
> received_fail_status_counter.incr();
>
> pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
> if (t.getMessage() != null) {
> log.error("externalServiceOutputBolt, error executing
> externalService API. errorMsg=" + t.getMessage(), t);
> }
> } finally {
> if (response != null) {
> response.close();
> }
> }
> return pushMessageResponseDTO;
> }
>
> private CloseableHttpClient getHttpClientInstance() {
> PoolingHttpClientConnectionManager cm = new
> PoolingHttpClientConnectionManager();
> cm.setDefaultMaxPerRoute(100);
> cm.setMaxTotal(500);
> int timeout = 4;
> RequestConfig config = RequestConfig.custom()
> .setConnectTimeout(timeout * 1000) //in millis
> .setConnectionRequestTimeout(timeout * 1000)
> .setSocketTimeout(timeout * 1000).build();
> return HttpClientBuilder.create().
> setDefaultRequestConfig(config).
> setConnectionManager(cm).
> build();
> }
> }
>
> Thank you.
>