[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user cjolif commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190364900 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { /** * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch. * * @param actionRequests The multiple {@link ActionRequest} to add. +* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@Up} */ - void add(ActionRequest... actionRequests); + @Deprecated + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (actionRequest instanceof IndexRequest) { + add((IndexRequest) actionRequest); + } else if (actionRequest instanceof DeleteRequest) { + add((DeleteRequest) actionRequest); + } else if (actionRequest instanceof UpdateRequest) { + add((UpdateRequest) actionRequest); + } else { + throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests"); + } + } + } + + /** +* Add multiple {@link DeleteRequest} to the indexer to prepare for sending requests to Elasticsearch. +* +* @param deleteRequests The multiple {@link DeleteRequest} to add. +*/ + public abstract void add(DeleteRequest... deleteRequests); --- End diff -- I would say that we can finally hope that Elasticsearch, with promoting the new REST API, will have finished for quite some time breaking APIs. Also obviously having our own API would be one more thing for the user to learn (instead of just using the raw ES API) and one more object we would create for each request (I guess negligible but still if that is not a big gain, why doing it?). And finally, less intermediary layer is less risk of bug from my point of view (and as so as you said less maintenance work). More generally I tend to always prefer less code than more code except when more code is definitely the way to go and here I'm not convinced. That said I can't indeed promise Elasticsearch won't break API again... So if people want that intermediary object I can try to look into it. Anyone else having opinion? :) ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user cjolif commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190359388 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java --- @@ -45,12 +48,34 @@ } @Override - public void add(ActionRequest... actionRequests) { - for (ActionRequest actionRequest : actionRequests) { + public void add(DeleteRequest... deleteRequests) { + for (DeleteRequest deleteRequest : deleteRequests) { if (flushOnCheckpoint) { numPendingRequestsRef.getAndIncrement(); } - this.bulkProcessor.add(actionRequest); + this.bulkProcessor.add(deleteRequest); + } + } + + @Override + public void add(IndexRequest... indexRequests) { + for (IndexRequest indexRequest : indexRequests) { + System.out.println("ir: " + indexRequest); --- End diff -- oups. fixed. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user cjolif commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190359413 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { --- End diff -- done. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user cjolif commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190359344 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { /** * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch. * * @param actionRequests The multiple {@link ActionRequest} to add. +* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@Up} --- End diff -- fixed ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190127059 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { --- End diff -- I think we can leave `RequestIndexer` as a interface, and make the `add(ActionRequest...)` a [default method](https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html). This would lessen the friction of this breaking change. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190127406 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { /** * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch. * * @param actionRequests The multiple {@link ActionRequest} to add. +* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@Up} */ - void add(ActionRequest... actionRequests); + @Deprecated + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (actionRequest instanceof IndexRequest) { + add((IndexRequest) actionRequest); + } else if (actionRequest instanceof DeleteRequest) { + add((DeleteRequest) actionRequest); + } else if (actionRequest instanceof UpdateRequest) { + add((UpdateRequest) actionRequest); + } else { + throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests"); + } + } + } + + /** +* Add multiple {@link DeleteRequest} to the indexer to prepare for sending requests to Elasticsearch. +* +* @param deleteRequests The multiple {@link DeleteRequest} to add. +*/ + public abstract void add(DeleteRequest... deleteRequests); --- End diff -- What would be your feeling on not exposing `DeleteRequest`, `IndexRequest`, `UpdateRequest` directly through user API? We could maintain our own way to specify requests, and only create the actual ES request instances internally. It would be more maintenance work for us, but might be safer from a future-proof API perspective. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190126862 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java --- @@ -45,12 +48,34 @@ } @Override - public void add(ActionRequest... actionRequests) { - for (ActionRequest actionRequest : actionRequests) { + public void add(DeleteRequest... deleteRequests) { + for (DeleteRequest deleteRequest : deleteRequests) { if (flushOnCheckpoint) { numPendingRequestsRef.getAndIncrement(); } - this.bulkProcessor.add(actionRequest); + this.bulkProcessor.add(deleteRequest); + } + } + + @Override + public void add(IndexRequest... indexRequests) { + for (IndexRequest indexRequest : indexRequests) { + System.out.println("ir: " + indexRequest); --- End diff -- Leftover print. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6043#discussion_r190126707 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java --- @@ -21,18 +21,56 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving -public interface RequestIndexer { +public abstract class RequestIndexer { /** * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch. * * @param actionRequests The multiple {@link ActionRequest} to add. +* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@Up} --- End diff -- typo at the end. ---
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
GitHub user cjolif opened a pull request: https://github.com/apache/flink/pull/6043 [FLINK-7386] evolve RequestIndexer API to make it working with Elastic 5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a possible RestHighLevelClient implementation ## What is the purpose of the change *The purpose of this PR is to make sure current Elasticsearch implementation is compatible with Elasticsearch 5.3+ fixing [FLINK-7386] and is also open to a future HighLevelRestClient implementation that could be used to provide elasticsearch 6 compatibility [FLINK-8101]* ## Brief change log * add specific IndexRequest, UpdateRequest and DeleteRequest add method on RequestIndexer so that it is compatible both with 5.2- and 5.3+ APIs (knowing that in 5.3+ Elasticsearch does not accept anymore ActionRequest in BulkProcessor). * make sure existing ActionRequest method on RequestIndexer is calling the new specific method based on actual type. * throw an exception for other types. * Change returned values of createClient method in ElasticsearchApiCallBridge. As TransportClient and HighLevelRestClient have only the AutoCloseable interface in common, this is what the method returns now. * Make ElasticsearchSinkBase agnostic to whether it is using a TransportClient or RestClient by adding a createBulkProcessorBuilder method on ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this method on all bridges. ## Verifying this change This change added tests and can be verified as follows: * Elasticsearch test base has also been reworked a little bit to make it compatible with the changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: a `@PublicEvolving` interface is now an abstract class. However typically the user does not extend/implement it but just call methods on it. - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs & javadocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/cjolif/flink es-5.3-apis Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6043.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6043 commit b1f2abc1d33b39c1fed4f370e5b21cbf477e0aa8 Author: Christophe JolifDate: 2018-05-17T22:17:04Z [FLINK-7386] evolve RequestIndexer API to make it working with Elastic 5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a possible RestHighLevelClient implementation. ---