[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-12-08 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1043255251


##
pom.xml:
##
@@ -0,0 +1,440 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+
+   
+   io.github.zentol.flink
+   flink-connector-parent
+   1.0
+   
+
+   4.0.0
+
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   Flink : Connectors : Opensearch : Parent
+   pom
+   https://flink.apache.org
+   2022
+
+   
+   
+   The Apache Software License, Version 2.0
+   
https://www.apache.org/licenses/LICENSE-2.0.txt
+   repo
+   
+   
+
+   
+   https://github.com/apache/flink-connector-opensearch
+   
g...@github.com:apache/flink-connector-opensearch.git
+   
+   
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
+   
+   
+
+   
+   flink-connector-opensearch
+   flink-connector-opensearch-e2e-tests
+   flink-sql-connector-opensearch
+   
+
+   
+   
UTF-8
+   
UTF-8
+
+   1.16.0
+   15.0
+   
+   2.13.4.20221013
+   5.8.1
+   3.21.0
+   0.22.0
+   1.17.2
+   2.21.0
+
+   false
+   1.15.0
+
+   1.7.36
+   2.17.2
+
+   
+
+   
flink-connector-opensearch-parent
+   
+
+   
+   
+   org.apache.flink
+   flink-shaded-force-shading
+   ${flink.shaded.version}
+   
+
+   
+
+   
+   
+   org.slf4j
+   slf4j-api
+   provided
+   
+
+   
+   
+   com.google.code.findbugs
+   jsr305
+   provided
+   
+
+   
+   
+   org.junit.jupiter
+   junit-jupiter
+   test
+   
+
+   
+   org.junit.vintage
+   junit-vintage-engine
+   test
+   
+
+   
+   org.assertj
+   assertj-core
+   test
+   
+
+   
+   org.mockito
+   mockito-core
+   ${mockito.version}
+   jar
+   test
+   
+
+   
+   org.testcontainers
+   junit-jupiter
+   test
+   
+
+   
+
+   
+   org.apache.logging.log4j
+   log4j-slf4j-impl
+   test
+   
+
+   
+   org.apache.logging.log4j
+   log4j-api
+   test
+   
+
+   
+   org.apache.logging.log4j
+   log4j-core
+   test
+   
+
+   
+   
+   org.apache.logging.log4j
+   log4j-1.2-api
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   test
+   
+
+   
+   
+   org.apache.flink
+   flink-architecture-tests-test
+   test
+   
+   
+   org.apache.flink
+   
flink-architecture-tests-production
+   test
+   
+
+   
+
+   
+
+   
+
+   
+   
+
+   
+   org.apache.flink
+   flink-test-utils
+   ${flink.version}
+   test
+   
+   
+   log4j
+   log4j
+   
+   
+   org.slf4j
+   
slf4j-log4j12
+  

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-12-01 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1037226333


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-30 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1035727508


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -79,7 +82,15 @@ public void setUp() throws IOException {
 return (req, resp, context) -> resp.setStatusCode(200);
 } else if (method.equalsIgnoreCase("POST")) {
 // Bulk responses are configured per test case
-return (req, resp, context) -> 
responses.poll().accept(resp);
+return (req, resp, context) -> {
+lock.lock();
+try {
+responses.poll().accept(resp);
+flushed.signalAll();

Review Comment:
   Or you could just use a OneShotLatch which makes all of this logic here a 
one-liner _and_ gives you more control to wait for a specific message to be 
consumed :shrug: 
   (and you wouldn't need `awaitForCondition`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-30 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1035719128


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034832065


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034824729


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034673047


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-23 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1030595354


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java:
##
@@ -88,4 +88,8 @@ public SinkWriter createWriter(InitContext context) 
throws IOException {
 context.metricGroup(),
 context.getMailboxExecutor());
 }
+
+DeliveryGuarantee getDeliveryGuarantee() {

Review Comment:
   The architecture tests verify that these methods are only called in tests ;) 
It allows us to leak some implementation details into tests without impacting 
production code. Can make your life a lot easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-23 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1030199426


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java:
##
@@ -88,4 +88,8 @@ public SinkWriter createWriter(InitContext context) 
throws IOException {
 context.metricGroup(),
 context.getMailboxExecutor());
 }
+
+DeliveryGuarantee getDeliveryGuarantee() {

Review Comment:
   add `@VisibleForTesting`



##
flink-connector-opensearch/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.core.importer.Location;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+import java.util.regex.Pattern;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+packages = {
+"org.apache.flink.connector.opensearch",
+"org.apache.flink.streaming.connectors.opensearch"
+},
+importOptions = {
+ImportOption.OnlyIncludeTests.class,
+TestCodeArchitectureTest.IncludeES7ImportOption.class,

Review Comment:
   This isn't required; it's only needed when you have a `base` module like the 
ES connector does.



##
pom.xml:
##
@@ -0,0 +1,447 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+
+   
+   io.github.zentol.flink
+   flink-connector-parent
+   1.0
+   
+
+   4.0.0
+
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   Flink : Connectors : Opensearch : Parent
+   pom
+   https://flink.apache.org
+   2022
+
+   
+   
+   The Apache Software License, Version 2.0
+   
https://www.apache.org/licenses/LICENSE-2.0.txt
+   repo
+   
+   
+
+   
+   https://github.com/apache/flink-connector-opensearch
+   
g...@github.com:apache/flink-connector-opensearch.git
+   
+   
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
+   
+   
+
+   
+   flink-connector-opensearch
+   flink-connector-opensearch-e2e-tests
+   flink-sql-connector-opensearch
+   
+
+   
+   
UTF-8
+   
UTF-8
+
+   1.16.0
+   15.0
+   
+   2.13.4.20221013
+   4.13.2

Review Comment:
   JUnit4 should no longer be required.



##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/RequestIndexer.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.Internal;
+
+import org.opensearch.action.delete.DeleteRequest;
+import 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029448617


##
flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/util/IgnoringFailureHandler.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch.util;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.opensearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.opensearch.RequestIndexer;
+
+import org.opensearch.action.ActionRequest;
+
+/** Ignores all kinds of failures and drops the affected {@link 
ActionRequest}. */
+@Internal
+public class IgnoringFailureHandler implements ActionRequestFailureHandler {

Review Comment:
   Yes and no. A user _could_ set the failure handler, but this implementation 
here is marked as `@Internal`; presumably because they were targeting the Table 
API connector.
   See `ElasticsearchConfiguration#getFailureHandler`
   
   _If_ we want to be be a drop-in replacement for the ES table api connector, 
then we'd need to pull in all of FLINK-26638, which also includes going back to 
the old ES sink for the Table API.
   
   It's a bit of an unsolved question on the ES connector side as to what will 
happen with these failure handlers :/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029441900


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FORMAT_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.opensearch.common.Strings.capitalize;
+
+/** A {@link DynamicTableSinkFactory} for discovering OpensearchDynamicSink. */
+@Internal
+public class OpensearchDynamicSinkFactory implements DynamicTableSinkFactory {
+private static final String FACTORY_IDENTIFIER = "opensearch";
+
+private final OpensearchSinkBuilderSupplier sinkBuilderSupplier;
+
+public OpensearchDynamicSinkFactory() {
+this.sinkBuilderSupplier = 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029349138


##
flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.opensearch.test.DockerImageVersions;
+import 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
+import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.test.resources.ResourceTestUtils;
+
+import org.opensearch.testcontainers.OpensearchContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/** End to end test for OpensearchSink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class OpensearchSinkE2ECase extends 
SinkTestSuiteBase> {
+private static final Logger LOG = 
LoggerFactory.getLogger(OpensearchSinkE2ECase.class);
+private static final int READER_RETRY_ATTEMPTS = 10;
+private static final int READER_TIMEOUT = -1; // Not used
+
+@TestSemantics
+CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+@TestEnv
+protected FlinkContainerTestEnvironment flink = new 
FlinkContainerTestEnvironment(1, 6);
+
+public OpensearchSinkE2ECase() throws Exception {}
+

Review Comment:
   ```suggestion
   ```



##
flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.opensearch.test.DockerImageVersions;
+import 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
+import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029022182


##
pom.xml:
##
@@ -0,0 +1,463 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+
+   
+   io.github.zentol.flink
+   flink-connector-parent
+   1.0
+   
+
+   4.0.0
+
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   Flink : Connectors : Opensearch : Parent
+   pom
+   https://flink.apache.org
+   2022
+
+   
+   
+   The Apache Software License, Version 2.0
+   
https://www.apache.org/licenses/LICENSE-2.0.txt
+   repo
+   
+   
+
+   
+   https://github.com/apache/flink-connector-opensearch
+   
g...@github.com:apache/flink-connector-opensearch.git
+   
+   
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
+   
+   
+
+   
+   flink-connector-opensearch
+   flink-connector-opensearch-e2e-tests

Review Comment:
   Right, forgot that peculiarity. We should no longer need it; it was only 
done to save some time when building, but this is less of a problem here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-21 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1028176163


##
pom.xml:
##
@@ -0,0 +1,463 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+
+   
+   io.github.zentol.flink
+   flink-connector-parent
+   1.0
+   
+
+   4.0.0
+
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   Flink : Connectors : Opensearch : Parent
+   pom
+   https://flink.apache.org
+   2022
+
+   
+   
+   The Apache Software License, Version 2.0
+   
https://www.apache.org/licenses/LICENSE-2.0.txt
+   repo
+   
+   
+
+   
+   https://github.com/apache/flink-connector-opensearch
+   
g...@github.com:apache/flink-connector-opensearch.git
+   
+   
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
+   
+   
+
+   
+   flink-connector-opensearch
+   flink-connector-opensearch-e2e-tests

Review Comment:
   missing the sql connector module



##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import org.apache.http.HttpHost;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to insert or update data in an Opensearch index. The sink 
supports the following
+ * delivery guarantees.
+ *
+ * 
+ *   {@link DeliveryGuarantee#NONE} does not provide any guarantees: 
actions are flushed to
+ *   Opensearch only depending on the configurations of the bulk 
processor. In case of a
+ *   failure, it might happen that actions are lost if the bulk processor 
still has buffered
+ *   actions.
+ *   {@link DeliveryGuarantee#AT_LEAST_ONCE} on a checkpoint the sink will 
wait until all
+ *   buffered actions are flushed to and acknowledged by Opensearch. No 
actions will be lost but
+ *   actions might be sent to Opensearch multiple times when Flink 
restarts. These additional
+ *   requests may cause inconsistent data in Opensearch right after the 
restart, but eventually
+ *   everything will be consistent again.
+ * 
+ *
+ * @param  type of the records converted to Opensearch actions
+ * @see OpensearchSinkBuilder on how to construct a OpensearchSink
+ */
+@PublicEvolving
+public class OpensearchSink implements Sink {

Review Comment:
   Curious why this class lost the `getDeliveryGuarantee()`



##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the 

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-21 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1028111806


##
pom.xml:
##
@@ -327,6 +327,12 @@ under the License.

 

+   
+   net.java.dev.jna
+   jna

Review Comment:
   So could we exclude it in general? Or do we need it for some tests to work?
   
   What is the bootstrap licensing issue about?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-18 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1026411856


##
pom.xml:
##
@@ -327,6 +327,12 @@ under the License.

 

+   
+   net.java.dev.jna
+   jna

Review Comment:
   The sql-jar is supposed to bundle all dependencies required to interface 
with opensearch. This commonly (but not always!) means to bundled all 
transitive dependencies of whatever opensearch dependencies it has.
   
   One of these transitive dependencies is jna, which is not bundled in the sql 
jar.
   
   The question is, why isn't that bundled? Is it an optional thing? Is there a 
licensing issue that requires the user to provide it at runtime? Is it just not 
required for talking to opensearch?
   
   If it isn't required for our use-case, then we may be able to just exclude 
it instead of having to worry about converging dependencies and keeping it 
compatible with opensearch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-18 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1026404137


##
flink-sql-connector-opensearch/pom.xml:
##
@@ -0,0 +1,159 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-sql-connector-opensearch
+   Flink : Connectors : SQL : Opensearch
+
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
*:*
+   
+   
+   
+   
com.tdunning:t-digest
+   
joda-time:joda-time
+   
net.sf.jopt-simple:jopt-simple
+   
net.java.dev.jna:jna
+   
org.hdrhistogram:HdrHistogram
+   
org.yaml:snakeyaml
+   
+   
+   
+   
+   
+   
org.opensearch:opensearch
+   

+   
config/**
+   
modules.txt
+   
plugins.txt
+   
org/joda/**
+   

+   
org/opensearch/bootstrap/**
+   

+   
+   
+   
org.opensearch.client:opensearch-rest-high-level-client
+   

+   
forbidden/**
+   

+   
+   
+   
org.apache.httpcomponents:httpclient
+   

+   
mozilla/**
+   

+   
+   
+   
org.apache.lucene:lucene-analyzers-common
+   

+   
org/tartarus/**
+

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025211246


##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,190 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   
+
+   flink-connector-opensearch-e2e-tests
+   Flink : E2E Tests : Opensearch
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-base
+   ${flink.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java
+   ${flink.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   test-jar
+   
+   
+   
+   *
+   *
+   
+   
+   
+   
+   org.apache.logging.log4j
+   log4j-api
+   provided
+   
+   
+   org.apache.flink
+   flink-test-utils
+   test
+   
+   
+   org.apache.flink
+   flink-connector-test-utils
+   ${flink.version}
+   test
+   
+   
+   org.opensearch
+   opensearch-testcontainers
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-deploy-plugin
+   
+   true
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
opensearch-end-to-end-test
+   
dependencies
+   
+   
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:slf4j-api
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   
+   
+   
+   
+   
+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   
+   
+   copy
+   
pre-integration-test
+   
+   

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025205947


##
pom.xml:
##
@@ -327,6 +327,12 @@ under the License.

 

+   
+   net.java.dev.jna
+   jna

Review Comment:
   Why isnt it bundled in the sql jar?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025102141


##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,190 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   
+
+   flink-connector-opensearch-e2e-tests
+   Flink : E2E Tests : Opensearch
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-base
+   ${flink.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java
+   ${flink.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   test-jar
+   
+   
+   
+   *
+   *
+   
+   
+   
+   
+   org.apache.logging.log4j
+   log4j-api
+   provided
+   
+   
+   org.apache.flink
+   flink-test-utils
+   test
+   
+   
+   org.apache.flink
+   flink-connector-test-utils
+   ${flink.version}
+   test
+   
+   
+   org.opensearch
+   opensearch-testcontainers
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-deploy-plugin
+   
+   true
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
opensearch-end-to-end-test
+   
dependencies
+   
+   
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:slf4j-api
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   

Review Comment:
   If you'd set execution id to `shade-flink` this wouldn't be necessary.



##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,190 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   
+
+  

[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-15 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1022608823


##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..

Review Comment:
   ```suggestion
   ```



##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-connector-opensearch-e2e-tests
+   Flink : E2E Tests : Opensearch
+   jar
+
+   
+   
+   org.apache.flink
+   flink-streaming-java
+   ${flink.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+   org.apache.logging.log4j
+   log4j-api
+   ${log4j.version}
+   provided
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
opensearch-end-to-end-test
+   
dependencies
+   
+   
+   
+   
+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   
+   
+   copy
+   
pre-integration-test
+   
+   copy
+   
+   
+   
+   
+   
+   
+   
org.apache.flink
+   
flink-connector-test-utils
+   
${flink.version}
+   
flink-connector-test-utils.jar
+   jar
+   
${project.build.directory}/dependencies
+   

Review Comment:
   ```suggestion

${project.build.directory}/dependencies
   ```



##
flink-connector-opensearch/pom.xml:
##
@@ -0,0 +1,185 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-connector-opensearch
+   Flink : Connectors : Opensearch
+
+   jar
+
+   
+   
+   1.3.0

Review Comment:
   Can you give us some background of the opensearch versioning?
   I see 2.3.0 artifacts on maven central.



##
flink-connector-opensearch/pom.xml:
##
@@ -0,0 +1,185 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0