Hello.

I would like to ask question regarding to elasticsearch client for storm.

My I use below versions for storm and es client. Below are snippet from my
pom.xml.

    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.2.1</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-elasticsearch</artifactId>
      <version>1.2.1</version>
    </dependency>
  </dependencies>

Problem is the nothing is indexed in elasticsearch although there are no
signs of error in storm log.

9303 [Thread-18-es-indexer-executor[2 2]] INFO  o.a.s.d.task - Emitting:
es-indexer __ack_ack [-1840814225684307017 -2591074659832280914]
9303 [Thread-18-es-indexer-executor[2 2]] INFO  o.a.s.d.executor -
TRANSFERING tuple [dest: 1 tuple: source: es-indexer:2, stream: __ack_ack,
id: {}, [-1840814225684307017 -2591074659832280914]]
9303 [Thread-18-es-indexer-executor[2 2]] INFO  o.a.s.d.executor - BOLT ack
TASK: 2 TIME: -1 TUPLE: source: kafka-extractor:3, stream: default, id:
{-1840814225684307017=-2591074659832280914},
[4afa40db-37a8-4d3f-b762-ec6e765887da, myfirsttopology, blah,
{"message":"My first message","timestamp":"2018-04-30T12:22:45.463+09:00"}]
9303 [Thread-18-es-indexer-executor[2 2]] INFO  o.a.s.d.executor - Execute
done TUPLE source: kafka-extractor:3, stream: default, id:
{-1840814225684307017=-2591074659832280914},
[4afa40db-37a8-4d3f-b762-ec6e765887da, myfirsttopology, blah,
{"message":"My first message","timestamp":"2018-04-30T12:22:45.463+09:00"}]
TASK: 2 DELTA: -1

My code is below.

public class ElasticsearchIndexer extends BaseBasicBolt {
  private static final String ES_HOST    = "http://localhost:9200";;

  @Override
  public void declareOutputFields( OutputFieldsDeclarer declarer ) {
    // Nothing to emit. No need for naming FIELD
  }

  @Override
  public void execute( Tuple tuple,
                       BasicOutputCollector collector ) {
    // Prepare elasticsearch connection parameter
    EsConfig EsConfig = new EsConfig( new String[]{ ES_HOST } );

    // Transoport client compatitable to elasticsearch 5.5.3
    //
https://mvnrepository.com/artifact/org.apache.storm/storm-elasticsearch/1.2.1
    EsTupleMapper tupleMapper = new DefaultEsTupleMapper();

    // Fire Index request to elasticsearch data node
    EsIndexBolt indexBolt = new EsIndexBolt(EsConfig , tupleMapper);
  }
}

Am I missing any procedure to process EsIndexBolt ?

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
渡辺 裕

LinkedIn : jp.linkedin.com/in/yuwatanabe1

Reply via email to