Hello.
I would like to ask question regarding to elasticsearch client for storm.
My I use below versions for storm and es client. Below are snippet from my
pom.xml.
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-elasticsearch</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
Problem is the nothing is indexed in elasticsearch although there are no
signs of error in storm log.
9303 [Thread-18-es-indexer-executor[2 2]] INFO o.a.s.d.task - Emitting:
es-indexer __ack_ack [-1840814225684307017 -2591074659832280914]
9303 [Thread-18-es-indexer-executor[2 2]] INFO o.a.s.d.executor -
TRANSFERING tuple [dest: 1 tuple: source: es-indexer:2, stream: __ack_ack,
id: {}, [-1840814225684307017 -2591074659832280914]]
9303 [Thread-18-es-indexer-executor[2 2]] INFO o.a.s.d.executor - BOLT ack
TASK: 2 TIME: -1 TUPLE: source: kafka-extractor:3, stream: default, id:
{-1840814225684307017=-2591074659832280914},
[4afa40db-37a8-4d3f-b762-ec6e765887da, myfirsttopology, blah,
{"message":"My first message","timestamp":"2018-04-30T12:22:45.463+09:00"}]
9303 [Thread-18-es-indexer-executor[2 2]] INFO o.a.s.d.executor - Execute
done TUPLE source: kafka-extractor:3, stream: default, id:
{-1840814225684307017=-2591074659832280914},
[4afa40db-37a8-4d3f-b762-ec6e765887da, myfirsttopology, blah,
{"message":"My first message","timestamp":"2018-04-30T12:22:45.463+09:00"}]
TASK: 2 DELTA: -1
My code is below.
public class ElasticsearchIndexer extends BaseBasicBolt {
private static final String ES_HOST = "http://localhost:9200";
@Override
public void declareOutputFields( OutputFieldsDeclarer declarer ) {
// Nothing to emit. No need for naming FIELD
}
@Override
public void execute( Tuple tuple,
BasicOutputCollector collector ) {
// Prepare elasticsearch connection parameter
EsConfig EsConfig = new EsConfig( new String[]{ ES_HOST } );
// Transoport client compatitable to elasticsearch 5.5.3
//
https://mvnrepository.com/artifact/org.apache.storm/storm-elasticsearch/1.2.1
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
// Fire Index request to elasticsearch data node
EsIndexBolt indexBolt = new EsIndexBolt(EsConfig , tupleMapper);
}
}
Am I missing any procedure to process EsIndexBolt ?
Best Regards,
Yu Watanabe
--
Yu Watanabe
渡辺 裕
LinkedIn : jp.linkedin.com/in/yuwatanabe1