[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-21 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169568752
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

Note that in order to keep interfaces identical there is another cast 
involved in 6.x (REST) route. See: 
https://github.com/cjolif/flink/blob/cf86ab787709baf46455f540c738cfabcaee7203/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java#L72

This is I guess less annoying as the class explicitly instanciate 
RestHighLevelClient but still wanted to make sure you have seen it.


---


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169564689
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

we can't resolve this separately for 5.3 and 6.0. Making 6.0 fully 
self-contained could mean that we end up having to maintain 3 separate copies 
if we decide to go the same route for 5.3. Given

We either accept the cast or create a `elasticsearch-base-5.3` module that 
both connectors depend on. It would be great for @tzulitai to weigh in here.


---


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-20 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169490640
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

hmmm  @zentol  I thought I was commenting my PR #5374 because it is based 
on this one... But I guess your comment will apply to my PR as well.


---


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-19 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169144082
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

@zentol I'm not sure I have the time short term to fully rework *both* the 
5.3 and the 6.0 support in that direction. 

Let me however propose something I should have time for, what about adding 
an elasticsearch-rest module that would *not* inherit from elasticsearch-base 
and so not have any "strange" cast while still providing an ElasticsearchSink 
implementation based on Elasticsearch Java High-Level REST Client? This should 
work with any Elasticsearch version that provides both the Java High-Level Rest 
Client with BulkProcessor support. This would cover 6.x but not 5.3+ I'm 
afraid. But at least Flink would be "good" going forward as Elasticsearch 8.x 
intend to remove the API that is used today and the Java High-Level REST client 
is the new advertised way of doing things. Let me know what you think?


---


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169139984
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

This unchecked cast makes me quite uncomfortable and contradicts the actual 
API. This is exactly the kind of thing that should not even be possible to fail 
at runtime.

`elasticsearch-base` was created with the assumption that interfaces it 
works on are compatible across all versions used, but 5.3 violates this 
assumption. In fact, if you were to update the version to 5.3 things wouldn't 
even compile due to another incompatible change in the constructor of 
`BulkItemResponse`.

I believe this is a case where duplicating the entire elasticsearch-base 
module and properly modifying the interfaces would be the appropriate solution 
(as either a new 5.3-base module or a completely self-contained one).


---


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2017-09-14 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4675

[FLINK-7386] FIx Elasticsearch 5 connector is not compatible with 
Elasticsearch 5.2+ client

## What is the purpose of the change

Add flink-connector-elasticsearch5.3 to support Elasticsearch 5.3 and later 
version

## Brief change log
  - *Add createRequestIndex method in ElasticsearchApiCallBridge*
  - *Add flink-connector-elasticsearch5.3 project*
  - *Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert 
ActionRequest to DocWriteRequest*

## Verifying this change
This change added tests and can be verified as follows:
  - *Add ElasticsearchSinkITCase test case*
  - *Add ELasticsearchSinkExample in connector-elasticsearch5.3 to send 
request to Elasticsearch 5.3 and later versions*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7386

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4675


commit 9dd30ccb5f5bd9940c8f1cfea4ffeb256d564862
Author: zjureel 
Date:   2017-09-15T03:51:35Z

[FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add 
createRequestIndex method

commit 5936bc7734557f75dc4b0c06cfc31b0b0e49a91a
Author: zjureel 
Date:   2017-09-15T03:55:16Z

[FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and 
later versions

commit dbc87bb1dd361a2e840bd382814a780aa96a45c2
Author: zjureel 
Date:   2017-09-15T04:42:44Z

[FLINK-7386] add test case for ES53

commit 74972ab9798e4e9173ad4cc4a6ec6bdf1390f98a
Author: zjureel 
Date:   2017-09-15T05:33:43Z

[FLINK-7386] add document for ES5.3




---