[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-23 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190364900
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
 
/**
 * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
 *
 * @param actionRequests The multiple {@link ActionRequest} to add.
+* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@Up}
 */
-   void add(ActionRequest... actionRequests);
+   @Deprecated
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (actionRequest instanceof IndexRequest) {
+   add((IndexRequest) actionRequest);
+   } else if (actionRequest instanceof DeleteRequest) {
+   add((DeleteRequest) actionRequest);
+   } else if (actionRequest instanceof UpdateRequest) {
+   add((UpdateRequest) actionRequest);
+   } else {
+   throw new 
IllegalArgumentException("RequestIndexer only supports Index, Delete and Update 
requests");
+   }
+   }
+   }
+
+   /**
+* Add multiple {@link DeleteRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+*
+* @param deleteRequests The multiple {@link DeleteRequest} to add.
+*/
+   public abstract void add(DeleteRequest... deleteRequests);
--- End diff --

I would say that we can finally hope that Elasticsearch, with promoting the 
new REST API, will have finished for quite some time breaking APIs. Also 
obviously having our own API would be one more thing for the user to learn 
(instead of just using the raw ES API) and one more object we would create for 
each request (I guess negligible but still if that is not a big gain, why doing 
it?). And finally, less intermediary layer is less risk of bug from my point of 
view (and as so as you said less maintenance work).

More generally I tend to always prefer less code than more code except when 
more code is definitely the way to go and here I'm not convinced. That said I 
can't indeed promise Elasticsearch won't break API again... So if people want 
that intermediary object I can try to look into it. 

Anyone else having opinion? :) 



---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-23 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190359388
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
 ---
@@ -45,12 +48,34 @@
}
 
@Override
-   public void add(ActionRequest... actionRequests) {
-   for (ActionRequest actionRequest : actionRequests) {
+   public void add(DeleteRequest... deleteRequests) {
+   for (DeleteRequest deleteRequest : deleteRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
-   this.bulkProcessor.add(actionRequest);
+   this.bulkProcessor.add(deleteRequest);
+   }
+   }
+
+   @Override
+   public void add(IndexRequest... indexRequests) {
+   for (IndexRequest indexRequest : indexRequests) {
+   System.out.println("ir: " + indexRequest);
--- End diff --

oups. fixed.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-23 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190359413
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
--- End diff --

done.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-23 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190359344
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
 
/**
 * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
 *
 * @param actionRequests The multiple {@link ActionRequest} to add.
+* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@Up}
--- End diff --

fixed


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190127059
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
--- End diff --

I think we can leave `RequestIndexer` as a interface, and make the 
`add(ActionRequest...)` a [default 
method](https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html).

This would lessen the friction of this breaking change.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190127406
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
 
/**
 * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
 *
 * @param actionRequests The multiple {@link ActionRequest} to add.
+* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@Up}
 */
-   void add(ActionRequest... actionRequests);
+   @Deprecated
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (actionRequest instanceof IndexRequest) {
+   add((IndexRequest) actionRequest);
+   } else if (actionRequest instanceof DeleteRequest) {
+   add((DeleteRequest) actionRequest);
+   } else if (actionRequest instanceof UpdateRequest) {
+   add((UpdateRequest) actionRequest);
+   } else {
+   throw new 
IllegalArgumentException("RequestIndexer only supports Index, Delete and Update 
requests");
+   }
+   }
+   }
+
+   /**
+* Add multiple {@link DeleteRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+*
+* @param deleteRequests The multiple {@link DeleteRequest} to add.
+*/
+   public abstract void add(DeleteRequest... deleteRequests);
--- End diff --

What would be your feeling on not exposing `DeleteRequest`, `IndexRequest`, 
`UpdateRequest` directly through user API?

We could maintain our own way to specify requests, and only create the 
actual ES request instances internally.
It would be more maintenance work for us, but might be safer from a 
future-proof API perspective.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190126862
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
 ---
@@ -45,12 +48,34 @@
}
 
@Override
-   public void add(ActionRequest... actionRequests) {
-   for (ActionRequest actionRequest : actionRequests) {
+   public void add(DeleteRequest... deleteRequests) {
+   for (DeleteRequest deleteRequest : deleteRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
-   this.bulkProcessor.add(actionRequest);
+   this.bulkProcessor.add(deleteRequest);
+   }
+   }
+
+   @Override
+   public void add(IndexRequest... indexRequests) {
+   for (IndexRequest indexRequest : indexRequests) {
+   System.out.println("ir: " + indexRequest);
--- End diff --

Leftover print.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190126707
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
 
/**
 * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
 *
 * @param actionRequests The multiple {@link ActionRequest} to add.
+* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@Up}
--- End diff --

typo at the end.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-18 Thread cjolif
GitHub user cjolif opened a pull request:

https://github.com/apache/flink/pull/6043

[FLINK-7386] evolve RequestIndexer API to make it working with Elastic 
5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a 
possible RestHighLevelClient implementation 

## What is the purpose of the change

*The purpose of this PR is to make sure current Elasticsearch 
implementation is  compatible with Elasticsearch 5.3+ fixing [FLINK-7386]  and 
is also open to a future HighLevelRestClient implementation that could be used 
to provide elasticsearch 6 compatibility [FLINK-8101]*

## Brief change log


* add specific IndexRequest, UpdateRequest and DeleteRequest add method on 
RequestIndexer so that  it is compatible both with 5.2- and 5.3+ APIs (knowing 
that in 5.3+ Elasticsearch does not accept anymore ActionRequest in 
BulkProcessor).
* make sure existing ActionRequest method on RequestIndexer is calling the 
new specific method based on actual type.
* throw an exception for other types.
* Change returned values of createClient method in 
ElasticsearchApiCallBridge. As TransportClient and HighLevelRestClient have 
only the AutoCloseable interface in common, this is what the method returns now.
 * Make ElasticsearchSinkBase agnostic to whether it is using a 
TransportClient or RestClient by adding a createBulkProcessorBuilder method on 
ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this 
method on all bridges. 
  
## Verifying this change

This change added tests and can be verified as follows:
* Elasticsearch test base has also been reworked a little bit to make it 
compatible with the changes.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  a `@PublicEvolving` interface is now an abstract class. 
However typically the user does not extend/implement it but just call methods 
on it.
  - The serializers:  no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs & javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cjolif/flink es-5.3-apis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6043


commit b1f2abc1d33b39c1fed4f370e5b21cbf477e0aa8
Author: Christophe Jolif 
Date:   2018-05-17T22:17:04Z

[FLINK-7386] evolve RequestIndexer API to make it working with Elastic
5.3+, evolve ElasticsearchApiCallBridge API to make it compatible
with a possible RestHighLevelClient implementation.




---