[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205051308
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 
6 and later versions.
+ */
+public class Elasticsearch6ApiCallBridge implements 
ElasticsearchApiCallBridge {
+
+   private static final long serialVersionUID = -5222683870097809633L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+   /**
+* User-provided HTTP Host.
+*/
+   private final List httpHosts;
+
+   Elasticsearch6ApiCallBridge(List httpHosts) {
+   Preconditions.checkArgument(httpHosts != null && 
!httpHosts.isEmpty());
+   this.httpHosts = httpHosts;
+   }
+
+   @Override
+   public RestHighLevelClient createClient(Map 
clientConfig) {
+   RestHighLevelClient rhlClient =
--- End diff --

might have been good to support:
- context path / path prefix in addition to host
- login/password for Elasticsearch instances protected

that's ok to not do it as soon as the user can by subclassing. Maybe to 
make it easier to subclass there should be two methods.

keep the public `createClient` one that returns the `RHLClient`. And add a 
protected method `createRestClientBuilder` which return the 
`RestClientBuilder`. This way one can just redefine the protected method and 
let the pubic one handle the actual `RHLClient` instanciation from the 
`RestClientBuilder` created by the protected method. 


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-07-19 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/6043
  
@tzulitai I'm unfortunately totally buried under work at the moment so I 
don't feel like I will have time in such a short deadline :( Sorry about that. 
If for some reason more delays is added please let me know again and I will see 
what I can do? Otherwise I should have time to do a quick review of whatever 
someone else would be doing.


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-07-18 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/6043
  
@tzulitai 

Happy to see progress is made on this!

> After merging this, I'll also try cherry-picking your 6.x REST-based ES 
connector on top. If that works well, will also merge that.

Note that since the initial ES PR (#5374 ) I made a couple of changes in 
our own copy of this. 

1. Elasticsearch REST API can have a context root in addition the to list 
of httpHosts, so I added the ability to have prefixPath, and calling:

```java
final RestClientBuilder builder = 
RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));

if (pathPrefix != null && !pathPrefix.isEmpty() && 
!pathPrefix.equals("/")) {
  builder.setPathPrefix(this.pathPrefix);
}
```

So that is set on the builder.

2. Elasticsearch REST can be protected by login/password so I added the 
ability to set username/password:

```java
  private CredentialsProvider getCredentialProvider() {
CredentialsProvider credentialsProvider = null;
if (userConfig.containsKey(CONFIG_KEY_ES_USERNAME) && 
userConfig.containsKey(CONFIG_KEY_ES_PASSWORD)) {
  credentialsProvider = new BasicCredentialsProvider();
  credentialsProvider.setCredentials(AuthScope.ANY,
  new 
UsernamePasswordCredentials(userConfig.get(CONFIG_KEY_ES_USERNAME), 
userConfig.get(CONFIG_KEY_ES_PASSWORD)));
}
return credentialsProvider;
  }
```
and 
then
```
builder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setDefaultCredentialsProvider(getCredentialProvider()))
```

More generally it should be easy for the user to change how the builder is 
configure to make sure people can customize this as they want (like configure 
SSL...).


---


[GitHub] flink issue #3034: [FLINK-5363] Fire timers when window state is currently e...

2018-07-11 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/3034
  
@aljoscha 

We regularly empty the window because we can get lengthy windows with a lot 
of elements, so it is good get rid of elements we already managed. But then if 
we have no more elements and the window event time is triggered it does fire 
the window function.

When we will get some spare cycle we are going to give a try to it as you 
seem to have fix that according to the JIRA issue?




---


[GitHub] flink issue #3034: [FLINK-5363] Fire timers when window state is currently e...

2018-06-13 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/3034
  
> It could, but I think we don't want to do this change right now.

we are getting into cases where this feature would be useful. Do you have 
any insights on why you finally don't want to make this feasible?


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-06-03 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/6043
  
> @cjolif do you think it would be possible that with a clean cut using a 
REST implementation, we no longer need to have separate modules anymore for ES 
6.x, 7.x, 8.x or so on?
i.e., it would only be a matter for the user of recompiling that REST-based 
implementation with a different ES client version.

@tzulitai I guess, in theory, there is alway a risk Elasticsearch breaks 
the compatibility between two major versions again even on the High Level REST 
Client APIs... My feeling is they are now trying to avoid that. But I did not 
find any wording that would allow us to "rely" on that "for sure".


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-29 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/6043
  
What I could do if that can help making progress, is to create a second PR 
based on this one and introducing the RestHighLevelClient implementation based 
on those APIs? This would allow to get an idea of what we would get?  


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-24 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/6043
  
> When planning to switch to REST, are we speaking of an implementation 
that works directly against Elasticsearch's REST API? Or are we thinking of 
using Elasticsearch's RestHighLevelClient?

To me at least, yes, when I say REST I mean RestHighLevelClient.


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-24 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/6043
  
Overall I think the most important thing to do is do something :) We can't 
let Flink elasticsearch in the broken state they are today. There must either 
be a purely REST-based solution or make sure the current code path is working 
and allowing people to build a REST-based solution if they want to on top of it.


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-24 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/6043
  
@StephanEwen that is I think more or less what I proposed here: 
https://lists.apache.org/thread.html/e7f584e0df9ce510fa5bee8d3a7e59b6df885f7deee36710a1cbb9b1@%3Cdev.flink.apache.org%3E

"In the hope of moving that forward I would like to propose for 1.6 a new
Elasticsearch 6.x+ sink that would follow the design of the previous ones
BUT only leverage the new REST API and not inherit from existing classes."

But @tzulitai hinted into a different direction, that I followed for this 
PR.

Personally I think both approaches make sense. I don't have a strong 
opinion. Even though for my personal use-cases just doing a separated REST 
API-based sink would be enough.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-23 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190364900
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
 
/**
 * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
 *
 * @param actionRequests The multiple {@link ActionRequest} to add.
+* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@Up}
 */
-   void add(ActionRequest... actionRequests);
+   @Deprecated
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (actionRequest instanceof IndexRequest) {
+   add((IndexRequest) actionRequest);
+   } else if (actionRequest instanceof DeleteRequest) {
+   add((DeleteRequest) actionRequest);
+   } else if (actionRequest instanceof UpdateRequest) {
+   add((UpdateRequest) actionRequest);
+   } else {
+   throw new 
IllegalArgumentException("RequestIndexer only supports Index, Delete and Update 
requests");
+   }
+   }
+   }
+
+   /**
+* Add multiple {@link DeleteRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+*
+* @param deleteRequests The multiple {@link DeleteRequest} to add.
+*/
+   public abstract void add(DeleteRequest... deleteRequests);
--- End diff --

I would say that we can finally hope that Elasticsearch, with promoting the 
new REST API, will have finished for quite some time breaking APIs. Also 
obviously having our own API would be one more thing for the user to learn 
(instead of just using the raw ES API) and one more object we would create for 
each request (I guess negligible but still if that is not a big gain, why doing 
it?). And finally, less intermediary layer is less risk of bug from my point of 
view (and as so as you said less maintenance work).

More generally I tend to always prefer less code than more code except when 
more code is definitely the way to go and here I'm not convinced. That said I 
can't indeed promise Elasticsearch won't break API again... So if people want 
that intermediary object I can try to look into it. 

Anyone else having opinion? :) 



---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-23 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190359388
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
 ---
@@ -45,12 +48,34 @@
}
 
@Override
-   public void add(ActionRequest... actionRequests) {
-   for (ActionRequest actionRequest : actionRequests) {
+   public void add(DeleteRequest... deleteRequests) {
+   for (DeleteRequest deleteRequest : deleteRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
-   this.bulkProcessor.add(actionRequest);
+   this.bulkProcessor.add(deleteRequest);
+   }
+   }
+
+   @Override
+   public void add(IndexRequest... indexRequests) {
+   for (IndexRequest indexRequest : indexRequests) {
+   System.out.println("ir: " + indexRequest);
--- End diff --

oups. fixed.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-23 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190359413
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
--- End diff --

done.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-23 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190359344
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
 
/**
 * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
 *
 * @param actionRequests The multiple {@link ActionRequest} to add.
+* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@Up}
--- End diff --

fixed


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-18 Thread cjolif
GitHub user cjolif opened a pull request:

https://github.com/apache/flink/pull/6043

[FLINK-7386] evolve RequestIndexer API to make it working with Elastic 
5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a 
possible RestHighLevelClient implementation 

## What is the purpose of the change

*The purpose of this PR is to make sure current Elasticsearch 
implementation is  compatible with Elasticsearch 5.3+ fixing [FLINK-7386]  and 
is also open to a future HighLevelRestClient implementation that could be used 
to provide elasticsearch 6 compatibility [FLINK-8101]*

## Brief change log


* add specific IndexRequest, UpdateRequest and DeleteRequest add method on 
RequestIndexer so that  it is compatible both with 5.2- and 5.3+ APIs (knowing 
that in 5.3+ Elasticsearch does not accept anymore ActionRequest in 
BulkProcessor).
* make sure existing ActionRequest method on RequestIndexer is calling the 
new specific method based on actual type.
* throw an exception for other types.
* Change returned values of createClient method in 
ElasticsearchApiCallBridge. As TransportClient and HighLevelRestClient have 
only the AutoCloseable interface in common, this is what the method returns now.
 * Make ElasticsearchSinkBase agnostic to whether it is using a 
TransportClient or RestClient by adding a createBulkProcessorBuilder method on 
ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this 
method on all bridges. 
  
## Verifying this change

This change added tests and can be verified as follows:
* Elasticsearch test base has also been reworked a little bit to make it 
compatible with the changes.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  a `@PublicEvolving` interface is now an abstract class. 
However typically the user does not extend/implement it but just call methods 
on it.
  - The serializers:  no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs & javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cjolif/flink es-5.3-apis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6043


commit b1f2abc1d33b39c1fed4f370e5b21cbf477e0aa8
Author: Christophe Jolif <cjolif@...>
Date:   2018-05-17T22:17:04Z

[FLINK-7386] evolve RequestIndexer API to make it working with Elastic
5.3+, evolve ElasticsearchApiCallBridge API to make it compatible
with a possible RestHighLevelClient implementation.




---


[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

2018-05-18 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/5374
  
@tzulitai sure. done.


---


[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

2018-05-18 Thread cjolif
Github user cjolif closed the pull request at:

https://github.com/apache/flink/pull/5374


---


[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

2018-05-09 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/5374
  
good news the issue reported by @jdewald is now fixed in Elasticsearch 
master. Not sure which version of Elasticsearch will contain the fix though. 


---


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-21 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169568752
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

Note that in order to keep interfaces identical there is another cast 
involved in 6.x (REST) route. See: 
https://github.com/cjolif/flink/blob/cf86ab787709baf46455f540c738cfabcaee7203/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java#L72

This is I guess less annoying as the class explicitly instanciate 
RestHighLevelClient but still wanted to make sure you have seen it.


---


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-20 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169490640
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

hmmm  @zentol  I thought I was commenting my PR #5374 because it is based 
on this one... But I guess your comment will apply to my PR as well.


---


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-19 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169144082
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

@zentol I'm not sure I have the time short term to fully rework *both* the 
5.3 and the 6.0 support in that direction. 

Let me however propose something I should have time for, what about adding 
an elasticsearch-rest module that would *not* inherit from elasticsearch-base 
and so not have any "strange" cast while still providing an ElasticsearchSink 
implementation based on Elasticsearch Java High-Level REST Client? This should 
work with any Elasticsearch version that provides both the Java High-Level Rest 
Client with BulkProcessor support. This would cover 6.x but not 5.3+ I'm 
afraid. But at least Flink would be "good" going forward as Elasticsearch 8.x 
intend to remove the API that is used today and the Java High-Level REST client 
is the new advertised way of doing things. Let me know what you think?


---


[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

2018-02-05 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/5374
  
@tzulitai did you have a chance to look at this? If you have any question 
please let me know?


---


[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

2018-01-31 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/5374#discussion_r165186422
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

This is actually from the commit I brought into the PR from orignal 
@zjureel's PR. That said I think the answer is definitely yes in the case that 
matters for Flink. Indeed:

* The ActionRequest values here are actually coming from the implementation 
of the `ElasticsearchSinkFunction.process` method which should create 
`ActionRequest` and add them to the indexer.
* The idea here is not to create any sort of `ActionRequest` you would 
possibly dream of but indexing requests?
* The way to create `ActionRequest` for indexing in Elasticsearch is to use 
`org.elasticsearch.action.index.IndexRequest` 
* starting with Elasticsearch 5.3 IndexRequest inherits from 
`DocWriteRequest` while it was not before 5.3.

See: 


![image](https://user-images.githubusercontent.com/623171/35646706-5723ab78-06d0-11e8-8d50-5b4545047a1f.png)

vs


![image](https://user-images.githubusercontent.com/623171/35646719-63d7f1b2-06d0-11e8-8308-c330b3c11dad.png)

So the only case I see where this could not be a `DocWriteRequest` would be 
if someone in the `ElasticsearchSinkFunction` would create something else than 
an index request. But I don't really see why? 

That said this raises the question of why from the origin the API was not 
typed against `IndexRequest` instead of `ActionRequest` as this would avoid 
those questions and force the user to return a `IndexRequest`?

In every case there is little choice because starting with 5.3 
Elasticsearch does not accept ActionRequest in BulkProcessor anymore but just 
IndexRequest/DocWriteRequest.

Do you have a suggestion on how to handle this better? Obviously I can add 
documentation saying starting with 5.3 the sink function MUST return 
DocWriteRequest? But is that enough for you?



---


[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

2018-01-31 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/5374#discussion_r165109918
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.transport.Netty3Plugin;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 
5.3 and later versions.
+ */
+public class Elasticsearch53ApiCallBridge extends 
ElasticsearchApiCallBridge {
+
+   private static final long serialVersionUID = -5222683870097809633L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch53ApiCallBridge.class);
+
+   /**
+* User-provided transport addresses.
+*
+* We are using {@link InetSocketAddress} because {@link 
TransportAddress} is not serializable in Elasticsearch 5.x.
+*/
+   private final List transportAddresses;
+
+   Elasticsearch53ApiCallBridge(List 
transportAddresses) {
+   Preconditions.checkArgument(transportAddresses != null && 
!transportAddresses.isEmpty());
+   this.transportAddresses = transportAddresses;
+   }
+
+   @Override
+   public AutoCloseable createClient(Map<String, String> clientConfig) {
+   Settings settings = Settings.builder().put(clientConfig)
+   .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
+   .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
+   .build();
+
+   TransportClient transportClient = new 
PreBuiltTransportClient(settings);
+   for (TransportAddress transport : 
ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
+   transportClient.addTransportAddress(transport);
+   }
+
+   // verify that we actually are connected to a cluster
+   if (transportClient.connectedNodes().isEmpty()) {
+   throw new RuntimeException("Elasticsearch client is not 
connected to any Elasticsearch nodes!");
+   }
+
+   if (LOG.isInfoEnabled()) {
--- End diff --

I think this is just a copy/paste of the practices in the other 
pre-exisiting bridges ;) But that should be fixed now.


---


[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

2018-01-31 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/5374#discussion_r164992369
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -32,7 +32,6 @@
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
--- End diff --

fixed.


---


[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

2018-01-29 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/5374
  
thanks @tzulitai. Let me know if you have any question.


---


[GitHub] flink issue #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not compati...

2018-01-29 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/4675
  
I have included the exact commits of @zjureel in #5374 as I wanted to build 
on top of them. So if #5374 gets in it shouldn't be needed anymore.


---


[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

2018-01-26 Thread cjolif
GitHub user cjolif opened a pull request:

https://github.com/apache/flink/pull/5374

[FLINK-8101][flink-connectors] Elasticsearch 5.3+ (TransportClient) and 6.x 
(RestHighLevelClient) support


 

## What is the purpose of the change

*The purpose of this PR is to add Elasticsearch 6.X support on top of the 
RestHighLevelClient. Indeed TransportClient is now deprecated and will be 
removed in 8.X. Also the hosted version of Elasticsearch often forbid the use 
of TransportClient.*

## Brief change log

* First a set of changes are borrowed from #4675:
  * Add createRequestIndex method in ElasticsearchApiCallBridge
  * Add flink-connector-elasticsearch5.3 project
  * Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert 
ActionRequest to DocWriteRequest
* Then on top of these changes and of being able to create a 
RestHighLevelClient instead of TransportClient:
   * Add createClient method in ElasticsearchApiCallBridge. As 
TransportClient and RestClient have only the AutoCloseable interface in common, 
this is what the method returns.
   * Make ElasticsearchSinkBase agnostic to whether it is using a 
TransportClient or RestClient by adding a createBulkProcessorBuilder method on 
ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this 
method on all bridges. 
  * Add flink-connector-elasticsearch6 project leveraging Rest Client while 
all the other ones still use TransportClient.

## Verifying this change

This change added tests and can be verified as follows:
* Elasticsearch test base has also been reworked a little bit to leverage 
it for testing the Rest client base implementation.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): if you use the 
elasticsearch6 project, this adds dependencies on elasticsearch 6
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers:  no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs & javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cjolif/flink es53-es6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5374


commit b6a2396b31ade29071c65efa72df9f8f1fab9af4
Author: zjureel <zjureel@...>
Date:   2017-09-15T03:51:35Z

[FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add 
createRequestIndex method

commit 1e5b21a331dfaed50844e89986c313f5fc40bdbe
Author: zjureel <zjureel@...>
Date:   2017-09-15T03:55:16Z

[FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and 
later versions

commit 5a6e840c316095dd4f65f559405b19dcda7a1ca0
Author: zjureel <zjureel@...>
Date:   2017-09-15T04:42:44Z

[FLINK-7386] add test case for ES53

commit 574818f0f56f6a2b644e271a05a0796d90598aef
Author: zjureel <zjureel@...>
Date:   2017-09-15T05:33:43Z

[FLINK-7386] add document for ES5.3

commit 14168825507ad98c49a63be8ceab23dc539ff977
Author: Christophe Jolif <cjolif@...>
Date:   2018-01-25T21:31:57Z

[FLINK-8101] Elasticsearch 6.X REST support




---


[GitHub] flink issue #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not compati...

2018-01-25 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/4675
  
FYI I rebased it and got working results on a sample of mine.


---


[GitHub] flink issue #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not compati...

2018-01-23 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/4675
  
@zjureel do you have by any chance an updated version of the PR against 
latest code (looks like current one conflicts)?


---