[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...
Github user cjolif commented on a diff in the pull request: https://github.com/apache/flink/pull/4675#discussion_r169568752 --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. + * {@link ActionRequest ActionRequests} will be converted to {@link DocWriteRequest} + * and will be buffered before sending a bulk request to the Elasticsearch cluster. + */ +public class BulkProcessorIndexer implements RequestIndexer { + + private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; + + public BulkProcessorIndexer(BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequests) { + this.bulkProcessor = bulkProcessor; + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = numPendingRequests; + } + + @Override + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add((DocWriteRequest) actionRequest); --- End diff -- Note that in order to keep interfaces identical there is another cast involved in 6.x (REST) route. See: https://github.com/cjolif/flink/blob/cf86ab787709baf46455f540c738cfabcaee7203/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java#L72 This is I guess less annoying as the class explicitly instanciate RestHighLevelClient but still wanted to make sure you have seen it. ---
[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4675#discussion_r169564689 --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. + * {@link ActionRequest ActionRequests} will be converted to {@link DocWriteRequest} + * and will be buffered before sending a bulk request to the Elasticsearch cluster. + */ +public class BulkProcessorIndexer implements RequestIndexer { + + private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; + + public BulkProcessorIndexer(BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequests) { + this.bulkProcessor = bulkProcessor; + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = numPendingRequests; + } + + @Override + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add((DocWriteRequest) actionRequest); --- End diff -- we can't resolve this separately for 5.3 and 6.0. Making 6.0 fully self-contained could mean that we end up having to maintain 3 separate copies if we decide to go the same route for 5.3. Given We either accept the cast or create a `elasticsearch-base-5.3` module that both connectors depend on. It would be great for @tzulitai to weigh in here. ---
[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...
Github user cjolif commented on a diff in the pull request: https://github.com/apache/flink/pull/4675#discussion_r169490640 --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. + * {@link ActionRequest ActionRequests} will be converted to {@link DocWriteRequest} + * and will be buffered before sending a bulk request to the Elasticsearch cluster. + */ +public class BulkProcessorIndexer implements RequestIndexer { + + private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; + + public BulkProcessorIndexer(BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequests) { + this.bulkProcessor = bulkProcessor; + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = numPendingRequests; + } + + @Override + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add((DocWriteRequest) actionRequest); --- End diff -- hmmm @zentol I thought I was commenting my PR #5374 because it is based on this one... But I guess your comment will apply to my PR as well. ---
[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...
Github user cjolif commented on a diff in the pull request: https://github.com/apache/flink/pull/4675#discussion_r169144082 --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. + * {@link ActionRequest ActionRequests} will be converted to {@link DocWriteRequest} + * and will be buffered before sending a bulk request to the Elasticsearch cluster. + */ +public class BulkProcessorIndexer implements RequestIndexer { + + private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; + + public BulkProcessorIndexer(BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequests) { + this.bulkProcessor = bulkProcessor; + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = numPendingRequests; + } + + @Override + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add((DocWriteRequest) actionRequest); --- End diff -- @zentol I'm not sure I have the time short term to fully rework *both* the 5.3 and the 6.0 support in that direction. Let me however propose something I should have time for, what about adding an elasticsearch-rest module that would *not* inherit from elasticsearch-base and so not have any "strange" cast while still providing an ElasticsearchSink implementation based on Elasticsearch Java High-Level REST Client? This should work with any Elasticsearch version that provides both the Java High-Level Rest Client with BulkProcessor support. This would cover 6.x but not 5.3+ I'm afraid. But at least Flink would be "good" going forward as Elasticsearch 8.x intend to remove the API that is used today and the Java High-Level REST client is the new advertised way of doing things. Let me know what you think? ---
[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4675#discussion_r169139984 --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. + * {@link ActionRequest ActionRequests} will be converted to {@link DocWriteRequest} + * and will be buffered before sending a bulk request to the Elasticsearch cluster. + */ +public class BulkProcessorIndexer implements RequestIndexer { + + private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; + + public BulkProcessorIndexer(BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequests) { + this.bulkProcessor = bulkProcessor; + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = numPendingRequests; + } + + @Override + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add((DocWriteRequest) actionRequest); --- End diff -- This unchecked cast makes me quite uncomfortable and contradicts the actual API. This is exactly the kind of thing that should not even be possible to fail at runtime. `elasticsearch-base` was created with the assumption that interfaces it works on are compatible across all versions used, but 5.3 violates this assumption. In fact, if you were to update the version to 5.3 things wouldn't even compile due to another incompatible change in the constructor of `BulkItemResponse`. I believe this is a case where duplicating the entire elasticsearch-base module and properly modifying the interfaces would be the appropriate solution (as either a new 5.3-base module or a completely self-contained one). ---
[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4675 [FLINK-7386] FIx Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client ## What is the purpose of the change Add flink-connector-elasticsearch5.3 to support Elasticsearch 5.3 and later version ## Brief change log - *Add createRequestIndex method in ElasticsearchApiCallBridge* - *Add flink-connector-elasticsearch5.3 project* - *Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert ActionRequest to DocWriteRequest* ## Verifying this change This change added tests and can be verified as follows: - *Add ElasticsearchSinkITCase test case* - *Add ELasticsearchSinkExample in connector-elasticsearch5.3 to send request to Elasticsearch 5.3 and later versions* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-7386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4675 commit 9dd30ccb5f5bd9940c8f1cfea4ffeb256d564862 Author: zjureel Date: 2017-09-15T03:51:35Z [FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add createRequestIndex method commit 5936bc7734557f75dc4b0c06cfc31b0b0e49a91a Author: zjureel Date: 2017-09-15T03:55:16Z [FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and later versions commit dbc87bb1dd361a2e840bd382814a780aa96a45c2 Author: zjureel Date: 2017-09-15T04:42:44Z [FLINK-7386] add test case for ES53 commit 74972ab9798e4e9173ad4cc4a6ec6bdf1390f98a Author: zjureel Date: 2017-09-15T05:33:43Z [FLINK-7386] add document for ES5.3 ---