My Topology including a bolt which opening Http Request to webservice.
The average response is 500 milliseconds (how-ever sometimes it takes
longer)
* I added timeout functionality. and I am using KafkaSpout
When I send messages one by one everything working fine but
Under High throughput *that bolt is getting stuck and nothing get into
there anymore.* and the worst thing I am having a "reply" of the messages
The only way to get thru this is to reset kafka's offset. else the
zookeeper still logging kafka's offset and messages are still replying
1. *Why Messages being replied? I dont need that*
2. Here is my code example of the"ExternalServiceOutputBolt
package com.mycompany.push.topology;
import backtype.storm.metric.api.CountMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.mycompany.push.dto.PushMessageRequestDTO;
import com.mycompany.push.dto.PushMessageResponseDTO;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
public class ExternalServiceOutputBolt extends BaseBasicBolt {
private CloseableHttpClient httpClient;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("pushMessageResponse"));
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
externalServiceGraphUrl = (String)
stormConf.get("externalServiceGraphUrl");
initMetrics(context);
httpClient = getHttpClientInstance();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector
basicOutputCollector) {
try {
received_message_counter.incr();
final PushMessageRequestDTO pushMessageRequestDTO =
(PushMessageRequestDTO) tuple.getValueByField("pushMessage");
if (pushMessageRequestDTO != null) {
PushMessageResponseDTO pushMessageResponseDTO =
executePushNotificationRequest(pushMessageRequestDTO);
returned_from_externalService_counter.incr();
System.out.println("externalServiceOutputBolt,emit
tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" +
pushMessageRequestDTO.getRefId());
basicOutputCollector.emit(new Values(pushMessageResponseDTO));
}
} catch (Exception e) {
log.error("externalServiceOutputBolt. Error", e);
}
}
private PushMessageResponseDTO
executePushNotificationRequest(PushMessageRequestDTO
pushMessageRequestDTO) throws IOException {
PushMessageResponseDTO pushMessageResponseDTO = new
PushMessageResponseDTO(pushMessageRequestDTO);
CloseableHttpResponse response = null;
try {
HttpPost post = new HttpPost("external.url");
post.setEntity(new UrlEncodedFormEntity(urlParameters));
response = httpClient.execute(post);
response.getEntity();
if (response.getStatusLine().getStatusCode() != 200) {
received_not_status_200_counter.incr();
} else {
received_status_200_counter.incr();
}
log.debug("externalServiceOutputBolt.onCompleted,
pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ",
responseBody=" + response.getStatusLine().getReasonPhrase());
return pushMessageResponseDTO;
} catch (SocketTimeoutException e) {
received_time_out_counter.incr();
log.error("externalServiceOutputBolt, TimeoutException", e);
} catch (Throwable t) {
received_fail_status_counter.incr();
pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
if (t.getMessage() != null) {
log.error("externalServiceOutputBolt, error executing
externalService API. errorMsg=" + t.getMessage(), t);
}
} finally {
if (response != null) {
response.close();
}
}
return pushMessageResponseDTO;
}
private CloseableHttpClient getHttpClientInstance() {
PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager();
cm.setDefaultMaxPerRoute(100);
cm.setMaxTotal(500);
int timeout = 4;
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(timeout * 1000) //in millis
.setConnectionRequestTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000).build();
return HttpClientBuilder.create().
setDefaultRequestConfig(config).
setConnectionManager(cm).
build();
}
}
Thank you.