[ 
https://issues.apache.org/jira/browse/FLINK-8489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8489.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.4.1
                   1.5.0

1.4: dfa050c01adc559a3ed4df4c2c3273903a37ff79

User confirmed using separate configs resolved the issue.

> Data is not emitted by second ElasticSearch connector
> -----------------------------------------------------
>
>                 Key: FLINK-8489
>                 URL: https://issues.apache.org/jira/browse/FLINK-8489
>             Project: Flink
>          Issue Type: Bug
>          Components: ElasticSearch Connector
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Fabian Hueske
>            Assignee: Chesnay Schepler
>            Priority: Critical
>             Fix For: 1.5.0, 1.4.1
>
>
> A user reported [this 
> issue|https://lists.apache.org/thread.html/e91c71beb45d6df879effa16c52f2c71aa6ef1a54ef0a8ac4ccc72ee@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list.
> *Setup:*
>  * A program with two pipelines that write to ElasticSearch. The pipelines 
> can be connected or completely separate.
>  * ElasticSearch 5.6.4, connector {{flink-connector-elasticsearch5_2.11}}
> *Problem:*
>  Only one of the ES connectors correctly emits data. The other connector 
> writes a single record and then stops emitting data (or does not write any 
> data at all). The problem does not exist, if the second ES connector is 
> replaced by a different connector (for example Cassandra).
> Below is a program to reproduce the issue:
> {code:java}
> public class ElasticSearchTest1 {
>       public static void main(String[] args) throws Exception {
>               
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               
>               // set elasticsearch connection details 
>               Map<String, String> config = new HashMap<>();
>               config.put("bulk.flush.max.actions", "1");
>               config.put("cluster.name", "<cluster name>");
>               List<InetSocketAddress> transports = new ArrayList<>();         
>               transports.add(new 
> InetSocketAddress(InetAddress.getByName("<host ip>"), 9300));
>               
>               //Set properties for Kafka Streaming
>               Properties properties = new Properties();
>               properties.setProperty("bootstrap.servers", "<host 
> ip>"+":9092");
>               properties.setProperty("group.id", "testGroup");
>               properties.setProperty("auto.offset.reset", "latest");  
>                               
>               //Create consumer for log records
>               
>               FlinkKafkaConsumer011 inputConsumer1 = new 
> FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), 
> properties);
>                               
>               DataStream<RecordOne> firstStream = env
>                               .addSource(inputConsumer1)
>                               .flatMap(new CreateRecordOne());
>                       
>               firstStream             
>               .addSink(new ElasticsearchSink<RecordOne>(config, 
>                               transports, 
>                               new 
> ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
>               
>               FlinkKafkaConsumer011 inputConsumer2 = new 
> FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), 
> properties);
>               
>               DataStream<RecordTwo> secondStream = env
>                                       .addSource(inputConsumer2)              
>                                       .flatMap(new CreateRecordTwo());
>                       
>               secondStream            
>               .addSink(new ElasticsearchSink<RecordTwo>(config, 
>                               transports, 
>                               new 
> ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
>                               
>               env.execute("Elastic Search Test");
>       }
> }
> public class ElasticSearchOutputRecord implements 
> ElasticsearchSinkFunction<RecordOne> {
>       String index;
>       String type;
>     // Initialize filter function
>     public ElasticSearchOutputRecord(String index, String type) {
>         this.index = index;
>         this.type = type;
>     }
>       // construct index request
>       @Override
>       public void process(
>                       RecordOne record,
>               RuntimeContext ctx,
>               RequestIndexer indexer) {
>               // construct JSON document to index
>               Map<String, String> json = new HashMap<>();
>               
>               json.put("item_one", record.item1);      
>               json.put("item_two", record.item2);      
>                                               
>               IndexRequest rqst = Requests.indexRequest()
>                               .index(index)           // index name
>                               .type(type)     // mapping name
>                               .source(json);
>               indexer.add(rqst);
>       }
> }
> public class ElasticSearchOutputRecord2 implements 
> ElasticsearchSinkFunction<RecordTwo> {
>       String index;
>       String type;
>     // Initialize filter function
>     public ElasticSearchOutputRecord2(String index, String type) {
>         this.index = index;
>         this.type = type;
>     }
>       // construct index request
>       @Override
>       public void process(
>                       RecordTwo record,
>               RuntimeContext ctx,
>               RequestIndexer indexer) {
>               // construct JSON document to index
>               Map<String, String> json = new HashMap<>();
>               
>               json.put("item_three", record.item3);      
>               json.put("item_four", record.item4);      
>                                               
>               IndexRequest rqst = Requests.indexRequest()
>                               .index(index)           // index name
>                               .type(type)     // mapping name
>                               .source(json);
>               indexer.add(rqst);
>       }
> }
> public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> 
> {
>       
>       static final Logger log = 
> LoggerFactory.getLogger(CreateRecordOne.class);
>       
>       @Override
>       public void flatMap(ObjectNode value, Collector<RecordOne> out) throws 
> Exception {
>               try {
>                       out.collect(new 
> RecordOne(value.get("item1").asText(),value.get("item2").asText()));
>               }
>               catch(Exception e) {
>                       log.error("error while creating RecordOne", e);
>               }
>       }
> }
> public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> 
> {
>       
>       static final Logger log = 
> LoggerFactory.getLogger(CreateRecordTwo.class);
>       
>       @Override
>       public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws 
> Exception {
>               try {
>                       out.collect(new 
> RecordTwo(value.get("item1").asText(),value.get("item2").asText()));
>               }
>               catch(Exception e) {
>                       log.error("error while creating RecordTwo", e);
>               }
>       }
> }
> public class RecordOne {      
>       
>       public String item1;    
>       public String item2;    
>               
>       public RecordOne() {};
>       
>       public RecordOne (
>                       
>                       String item1,   
>                       String item2    
>                                                                       
>                       ) {     
>               
>                                this.item1 =   item1;
>                                this.item2 = item2;    
>                                
>       }               
> }
> public class RecordTwo {      
>       
>       public String item3;    
>       public String item4;    
>               
>       public RecordTwo() {};
>       
>       public RecordTwo (                      
>                       String item3,   
>                       String item4                                            
>                         
>                       ) {             
>                                this.item3 =   item3;
>                                this.item4 = item4;    
>                                
>       }               
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to