[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-12-01 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1037233003


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-30 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1035955510


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -79,7 +82,15 @@ public void setUp() throws IOException {
 return (req, resp, context) -> resp.setStatusCode(200);
 } else if (method.equalsIgnoreCase("POST")) {
 // Bulk responses are configured per test case
-return (req, resp, context) -> 
responses.poll().accept(resp);
+return (req, resp, context) -> {
+lock.lock();
+try {
+responses.poll().accept(resp);
+flushed.signalAll();

Review Comment:
   `awaitForCondition` is not checking the flushing part, it is checking the 
`numPendingRequest`, which will be eventually updated, for the test it is 
sufficient to just know that flush has happened (at least, I don't see the 
reasons to complicate this part with per-response latches).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-30 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1035956928


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-30 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1035955510


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -79,7 +82,15 @@ public void setUp() throws IOException {
 return (req, resp, context) -> resp.setStatusCode(200);
 } else if (method.equalsIgnoreCase("POST")) {
 // Bulk responses are configured per test case
-return (req, resp, context) -> 
responses.poll().accept(resp);
+return (req, resp, context) -> {
+lock.lock();
+try {
+responses.poll().accept(resp);
+flushed.signalAll();

Review Comment:
   `awaitForCondition` is not checking the flushing part, it is checking the 
`numPendingRequest`, which will be eventually updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034930175


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+private final Lock lock = new ReentrantLock();
+private final Condition flushed = lock.newCondition();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> {
+lock.lock();
+try {
+responses.poll().accept(resp);
+flushed.signalAll();
+} finally {
+lock.unlock();
+}
+};
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034922738


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034741956


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034755652


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034741956


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-29 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034734061


##
flink-connector-opensearch/src/test/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.java:
##
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.bootstrap.HttpServer;
+import org.apache.http.impl.bootstrap.ServerBootstrap;
+import org.apache.http.protocol.HttpRequestHandlerMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Requests;
+import org.opensearch.common.xcontent.ToXContent;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.index.shard.ShardId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Suite of tests for {@link OpensearchSink}. */
+public class OpensearchSinkTest {
+private HttpServer server;
+private final Deque> responses = new 
ConcurrentLinkedDeque<>();
+
+@BeforeEach
+public void setUp() throws IOException {
+final HttpRequestHandlerMapper handlers =
+(request) -> {
+final String method = request.getRequestLine().getMethod();
+if (method.equalsIgnoreCase("HEAD")) {
+// Connection request always OKed
+return (req, resp, context) -> resp.setStatusCode(200);
+} else if (method.equalsIgnoreCase("POST")) {
+// Bulk responses are configured per test case
+return (req, resp, context) -> 
responses.poll().accept(resp);
+} else {
+return null;
+}
+};
+server = 
ServerBootstrap.bootstrap().setHandlerMapper(handlers).create();
+server.start();
+}
+
+@AfterEach
+public void tearDown() {
+server.stop();
+server = null;
+responses.clear();
+}
+
+/**
+ * Tests that any item failure in the listener callbacks is rethrown on an 
immediately following
+ * invoke call.
+ */
+@Test
+public void testItemFailureRethrownOnInvoke() throws Throwable {
+final OpensearchSink.Builder builder =
+new OpensearchSink.Builder<>(
+Arrays.asList(new HttpHost("localhost", 
server.getLocalPort())),
+new SimpleSinkFunction());
+builder.setBulkFlushMaxActions(1);
+builder.setFailureHandler(new NoOpFailureHandler());
+
+final OpensearchSink sink = builder.build();
+final 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-28 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1034107205


##
flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java:
##
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.common.unit.ByteSizeUnit;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Opensearch Sinks.
+ *
+ * This class implements the common behaviour across Opensearch versions, 
such as the use of an
+ * internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s 
before sending the
+ * requests to the cluster, as well as passing input records to the user 
provided {@link
+ * OpensearchSinkFunction} for processing.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+@Internal
+public class OpensearchSink extends RichSinkFunction implements 
CheckpointedFunction {
+private static final long serialVersionUID = -1007596293618451942L;
+private static final Logger LOG = 
LoggerFactory.getLogger(OpensearchSink.class);
+
+// 
+//  Internal bulk processor configuration
+// 
+
+public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
+
+/** Used to control whether the retry delay should increase exponentially 
or remain constant. */
+@PublicEvolving
+public enum FlushBackoffType {
+CONSTANT,
+EXPONENTIAL
+}
+
+/**
+ * Provides a backoff policy for bulk requests. Whenever a bulk request is 
rejected due to
+ * 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-28 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1033604718


##
flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java:
##
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.common.unit.ByteSizeUnit;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Opensearch Sinks.
+ *
+ * This class implements the common behaviour across Opensearch versions, 
such as the use of an
+ * internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s 
before sending the
+ * requests to the cluster, as well as passing input records to the user 
provided {@link
+ * OpensearchSinkFunction} for processing.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+@Internal
+public class OpensearchSink extends RichSinkFunction implements 
CheckpointedFunction {
+private static final long serialVersionUID = -1007596293618451942L;
+private static final Logger LOG = 
LoggerFactory.getLogger(OpensearchSink.class);
+
+// 
+//  Internal bulk processor configuration
+// 
+
+public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
+
+/** Used to control whether the retry delay should increase exponentially 
or remain constant. */
+@PublicEvolving
+public enum FlushBackoffType {
+CONSTANT,
+EXPONENTIAL
+}
+
+/**
+ * Provides a backoff policy for bulk requests. Whenever a bulk request is 
rejected due to
+ * 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-28 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1033552733


##
flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java:
##
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.common.unit.ByteSizeUnit;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Opensearch Sinks.
+ *
+ * This class implements the common behaviour across Opensearch versions, 
such as the use of an
+ * internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s 
before sending the
+ * requests to the cluster, as well as passing input records to the user 
provided {@link
+ * OpensearchSinkFunction} for processing.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+@Internal
+public class OpensearchSink extends RichSinkFunction implements 
CheckpointedFunction {

Review Comment:
   Similarly to 
`org.apache.flink.streaming.connectors.elasticsearch*.ElasticsearchSink`? 
Probably makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-23 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1030736561


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java:
##
@@ -88,4 +88,8 @@ public SinkWriter createWriter(InitContext context) 
throws IOException {
 context.metricGroup(),
 context.getMailboxExecutor());
 }
+
+DeliveryGuarantee getDeliveryGuarantee() {

Review Comment:
   I believe in APIs :) I think there are number of ways to approach testing of 
implementation details, but that's certainly very opinionated subjects, thanks 
for sharing your point of view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-23 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1030438140


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java:
##
@@ -88,4 +88,8 @@ public SinkWriter createWriter(InitContext context) 
throws IOException {
 context.metricGroup(),
 context.getMailboxExecutor());
 }
+
+DeliveryGuarantee getDeliveryGuarantee() {

Review Comment:
   I have never understood the idea of `@VisibleForTesting` to workaround Java 
visibility rules, hopefully it makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-23 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1030434054


##
flink-connector-opensearch/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.core.importer.Location;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+import java.util.regex.Pattern;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+packages = {
+"org.apache.flink.connector.opensearch",
+"org.apache.flink.streaming.connectors.opensearch"
+},
+importOptions = {
+ImportOption.OnlyIncludeTests.class,
+TestCodeArchitectureTest.IncludeES7ImportOption.class,

Review Comment:
   Ah ... definitely the copy/paste leftovers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029752612


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/table/IndexGeneratorTest.java:
##
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.UnsupportedTemporalTypeException;
+import java.util.Arrays;
+import java.util.List;
+
+/** Suite tests for {@link IndexGenerator}. */
+class IndexGeneratorTest {
+
+private static final List fieldNames =
+Arrays.asList(
+"id",
+"item",
+"log_ts",
+"log_date",
+"order_timestamp",
+"log_time",
+"local_datetime",
+"local_date",
+"local_time",
+"note",
+"status");
+
+private static final List dataTypes =
+Arrays.asList(
+DataTypes.INT(),
+DataTypes.STRING(),
+DataTypes.BIGINT(),
+DataTypes.DATE().bridgedTo(Date.class),
+DataTypes.TIMESTAMP().bridgedTo(Timestamp.class),
+DataTypes.TIME().bridgedTo(Time.class),
+DataTypes.TIMESTAMP().bridgedTo(LocalDateTime.class),
+DataTypes.DATE().bridgedTo(LocalDate.class),
+DataTypes.TIME().bridgedTo(LocalTime.class),
+DataTypes.STRING(),
+DataTypes.BOOLEAN());
+
+private static final List rows =
+Arrays.asList(
+GenericRowData.of(
+1,
+StringData.fromString("apple"),
+Timestamp.valueOf("2020-03-18 12:12:14").getTime(),
+(int) 
Date.valueOf("2020-03-18").toLocalDate().toEpochDay(),
+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-18 12:12:14")),
+(int)
+
(Time.valueOf("12:12:14").toLocalTime().toNanoOfDay()
+/ 1_000_000L),
+TimestampData.fromLocalDateTime(
+LocalDateTime.of(2020, 3, 18, 12, 12, 14, 
1000)),
+(int) LocalDate.of(2020, 3, 18).toEpochDay(),
+(int) (LocalTime.of(12, 13, 14, 
2000).toNanoOfDay() / 1_000_000L),
+"test1",
+true),
+GenericRowData.of(
+2,
+StringData.fromString("peanut"),
+Timestamp.valueOf("2020-03-19 12:22:14").getTime(),
+(int) 
Date.valueOf("2020-03-19").toLocalDate().toEpochDay(),
+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-19 12:22:21")),
+(int)
+
(Time.valueOf("12:22:21").toLocalTime().toNanoOfDay()
+/ 1_000_000L),
+TimestampData.fromLocalDateTime(
+LocalDateTime.of(2020, 3, 19, 12, 22, 14, 
1000)),
+(int) LocalDate.of(2020, 3, 19).toEpochDay(),
+(int) (LocalTime.of(12, 13, 14, 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029500963


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FORMAT_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.opensearch.common.Strings.capitalize;
+
+/** A {@link DynamicTableSinkFactory} for discovering OpensearchDynamicSink. */
+@Internal
+public class OpensearchDynamicSinkFactory implements DynamicTableSinkFactory {
+private static final String FACTORY_IDENTIFIER = "opensearch";
+
+private final OpensearchSinkBuilderSupplier sinkBuilderSupplier;
+
+public OpensearchDynamicSinkFactory() {
+this.sinkBuilderSupplier = 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029481239


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java:
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.ActionListener;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.common.unit.ByteSizeUnit;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Builder to construct an Opensearch compatible {@link OpensearchSink}.
+ *
+ * The following example shows the minimal setup to create a OpensearchSink 
that submits actions
+ * on checkpoint or the default number of actions was buffered (1000).
+ *
+ * {@code
+ * OpensearchSink sink = new OpensearchSinkBuilder()
+ * .setHosts(new HttpHost("localhost:9200")
+ * .setEmitter((element, context, indexer) -> {
+ *  indexer.add(
+ *  new IndexRequest("my-index")
+ *  .id(element.f0.toString())
+ *  .source(element.f1)
+ *  );
+ *  })
+ * .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ * .build();
+ * }
+ *
+ * @param  type of the records converted to Opensearch actions
+ */
+@PublicEvolving
+public class OpensearchSinkBuilder {
+
+private int bulkFlushMaxActions = 1000;
+private int bulkFlushMaxMb = -1;
+private long bulkFlushInterval = -1;
+private FlushBackoffType bulkFlushBackoffType = FlushBackoffType.NONE;
+private int bulkFlushBackoffRetries = -1;
+private long bulkFlushBackOffDelay = -1;
+private DeliveryGuarantee deliveryGuarantee = 
DeliveryGuarantee.AT_LEAST_ONCE;
+private List hosts;
+protected OpensearchEmitter emitter;
+private String username;
+private String password;
+private String connectionPathPrefix;
+private Integer connectionTimeout;
+private Integer connectionRequestTimeout;
+private Integer socketTimeout;
+private Boolean allowInsecure;
+
+public OpensearchSinkBuilder() {}
+
+@SuppressWarnings("unchecked")
+protected > S self() {
+return (S) this;
+}
+
+/**
+ * Sets the emitter which is invoked on every record to convert it to 
Opensearch actions.
+ *
+ * @param emitter to process records into Opensearch actions.
+ * @return this builder
+ */
+public  OpensearchSinkBuilder setEmitter(
+OpensearchEmitter emitter) {
+checkNotNull(emitter);
+checkState(
+InstantiationUtil.isSerializable(emitter),
+"The Opensearch emitter must be serializable.");
+
+final OpensearchSinkBuilder self = self();
+self.emitter = emitter;
+return self;
+}
+
+/**
+ * Sets the hosts where the Opensearch cluster nodes are reachable.
+ *
+ * @param hosts http addresses describing the node locations
+ * @return this builder
+ */
+public OpensearchSinkBuilder setHosts(HttpHost... hosts) {
+checkNotNull(hosts);
+checkState(hosts.length > 0, "Hosts cannot be empty.");
+this.hosts = Arrays.asList(hosts);
+return self();
+}
+
+/**
+ * Sets the wanted {@link DeliveryGuarantee}. The default delivery 
guarantee is 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029467392


##
flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/util/IgnoringFailureHandler.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch.util;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.opensearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.opensearch.RequestIndexer;
+
+import org.opensearch.action.ActionRequest;
+
+/** Ignores all kinds of failures and drops the affected {@link 
ActionRequest}. */
+@Internal
+public class IgnoringFailureHandler implements ActionRequestFailureHandler {

Review Comment:
   Sure, thanks, I will drop them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029431451


##
flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.connector.opensearch.test.DockerImageVersions;
+import 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
+import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.test.resources.ResourceTestUtils;
+
+import org.opensearch.testcontainers.OpensearchContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/** End to end test for OpensearchSink based on connector testing framework. */
+@SuppressWarnings("unused")
+public class OpensearchSinkE2ECase extends 
SinkTestSuiteBase> {
+private static final Logger LOG = 
LoggerFactory.getLogger(OpensearchSinkE2ECase.class);
+private static final int READER_RETRY_ATTEMPTS = 10;
+private static final int READER_TIMEOUT = -1; // Not used
+
+@TestSemantics
+CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+@TestEnv
+protected FlinkContainerTestEnvironment flink = new 
FlinkContainerTestEnvironment(1, 6);
+
+public OpensearchSinkE2ECase() throws Exception {}
+

Review Comment:
   This is needed, the reason for that is that initialization for fields in 
this class could throw `Exception` (which is not allowed by default), the 
default constructor with `throws Exception` spec hints the compiler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029422867


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/table/IndexGeneratorTest.java:
##
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.UnsupportedTemporalTypeException;
+import java.util.Arrays;
+import java.util.List;
+
+/** Suite tests for {@link IndexGenerator}. */
+class IndexGeneratorTest {
+
+private static final List fieldNames =
+Arrays.asList(
+"id",
+"item",
+"log_ts",
+"log_date",
+"order_timestamp",
+"log_time",
+"local_datetime",
+"local_date",
+"local_time",
+"note",
+"status");
+
+private static final List dataTypes =
+Arrays.asList(
+DataTypes.INT(),
+DataTypes.STRING(),
+DataTypes.BIGINT(),
+DataTypes.DATE().bridgedTo(Date.class),
+DataTypes.TIMESTAMP().bridgedTo(Timestamp.class),
+DataTypes.TIME().bridgedTo(Time.class),
+DataTypes.TIMESTAMP().bridgedTo(LocalDateTime.class),
+DataTypes.DATE().bridgedTo(LocalDate.class),
+DataTypes.TIME().bridgedTo(LocalTime.class),
+DataTypes.STRING(),
+DataTypes.BOOLEAN());
+
+private static final List rows =
+Arrays.asList(
+GenericRowData.of(
+1,
+StringData.fromString("apple"),
+Timestamp.valueOf("2020-03-18 12:12:14").getTime(),
+(int) 
Date.valueOf("2020-03-18").toLocalDate().toEpochDay(),
+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-18 12:12:14")),
+(int)
+
(Time.valueOf("12:12:14").toLocalTime().toNanoOfDay()
+/ 1_000_000L),
+TimestampData.fromLocalDateTime(
+LocalDateTime.of(2020, 3, 18, 12, 12, 14, 
1000)),
+(int) LocalDate.of(2020, 3, 18).toEpochDay(),
+(int) (LocalTime.of(12, 13, 14, 
2000).toNanoOfDay() / 1_000_000L),
+"test1",
+true),
+GenericRowData.of(
+2,
+StringData.fromString("peanut"),
+Timestamp.valueOf("2020-03-19 12:22:14").getTime(),
+(int) 
Date.valueOf("2020-03-19").toLocalDate().toEpochDay(),
+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-19 12:22:21")),
+(int)
+
(Time.valueOf("12:22:21").toLocalTime().toNanoOfDay()
+/ 1_000_000L),
+TimestampData.fromLocalDateTime(
+LocalDateTime.of(2020, 3, 19, 12, 22, 14, 
1000)),
+(int) LocalDate.of(2020, 3, 19).toEpochDay(),
+(int) (LocalTime.of(12, 13, 14, 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029420381


##
flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/util/RetryRejectedExecutionFailureHandler.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.streaming.connectors.opensearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.opensearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.opensearch.action.ActionRequest;
+import 
org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that failed 
due to temporary {@link
+ * OpenSearchRejectedExecutionException}s (which means that Opensearch node 
queues are currently
+ * full), and fails for all other failures.
+ *
+ * @deprecated This hase been deprecated and will be removed in the future.
+ */
+@Deprecated
+@PublicEvolving
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {

Review Comment:
   Same as above - for users to use



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029419750


##
flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/util/IgnoringFailureHandler.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch.util;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.opensearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.opensearch.RequestIndexer;
+
+import org.opensearch.action.ActionRequest;
+
+/** Ignores all kinds of failures and drops the affected {@link 
ActionRequest}. */
+@Internal
+public class IgnoringFailureHandler implements ActionRequestFailureHandler {

Review Comment:
   I believe these 2 handlers (IgnoringFailureHandler and 
RetryRejectedExecutionFailureHandler) are provided as an alternatives for 
configuring OpensearchSink::setFailureHandler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029414934


##
flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java:
##
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.opensearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.common.unit.ByteSizeUnit;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Opensearch Sinks.
+ *
+ * This class implements the common behaviour across Opensearch versions, 
such as the use of an
+ * internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s 
before sending the
+ * requests to the cluster, as well as passing input records to the user 
provided {@link
+ * OpensearchSinkFunction} for processing.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+@Internal
+public class OpensearchSink extends RichSinkFunction implements 
CheckpointedFunction {
+private static final long serialVersionUID = -1007596293618451942L;
+private static final Logger LOG = 
LoggerFactory.getLogger(OpensearchSink.class);
+
+// 
+//  Internal bulk processor configuration
+// 
+
+public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
+public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
+
+/** Used to control whether the retry delay should increase exponentially 
or remain constant. */
+@PublicEvolving
+public enum FlushBackoffType {
+CONSTANT,
+EXPONENTIAL
+}
+
+/**
+ * Provides a backoff policy for bulk requests. Whenever a bulk request is 
rejected due to
+ * 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029404923


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FORMAT_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.opensearch.common.Strings.capitalize;
+
+/** A {@link DynamicTableSinkFactory} for discovering OpensearchDynamicSink. */
+@Internal
+public class OpensearchDynamicSinkFactory implements DynamicTableSinkFactory {
+private static final String FACTORY_IDENTIFIER = "opensearch";
+
+private final OpensearchSinkBuilderSupplier sinkBuilderSupplier;
+
+public OpensearchDynamicSinkFactory() {
+this.sinkBuilderSupplier = 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029395474


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FORMAT_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.opensearch.common.Strings.capitalize;
+
+/** A {@link DynamicTableSinkFactory} for discovering OpensearchDynamicSink. */
+@Internal
+public class OpensearchDynamicSinkFactory implements DynamicTableSinkFactory {
+private static final String FACTORY_IDENTIFIER = "opensearch";
+
+private final OpensearchSinkBuilderSupplier sinkBuilderSupplier;
+
+public OpensearchDynamicSinkFactory() {
+this.sinkBuilderSupplier = 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029389149


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FORMAT_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.opensearch.common.Strings.capitalize;
+
+/** A {@link DynamicTableSinkFactory} for discovering OpensearchDynamicSink. */
+@Internal
+public class OpensearchDynamicSinkFactory implements DynamicTableSinkFactory {
+private static final String FACTORY_IDENTIFIER = "opensearch";
+
+private final OpensearchSinkBuilderSupplier sinkBuilderSupplier;
+
+public OpensearchDynamicSinkFactory() {
+this.sinkBuilderSupplier = 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029386429


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSink.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.opensearch.sink.FlushBackoffType;
+import org.apache.flink.connector.opensearch.sink.OpensearchSink;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+import org.opensearch.common.xcontent.XContentType;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link 
OpensearchSink} from a logical
+ * description.
+ */
+@Internal
+class OpensearchDynamicSink implements DynamicTableSink {
+
+final EncodingFormat> format;
+final DataType physicalRowDataType;
+final List primaryKeyLogicalTypesWithIndex;
+final OpensearchConfiguration config;

Review Comment:
   Pull requests shows its age, brought FLINK-24571 in there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029363404


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConfiguration.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.opensearch.sink.FlushBackoffType;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Opensearch base configuration. */
+@Internal
+class OpensearchConfiguration {
+protected final ReadableConfig config;
+
+OpensearchConfiguration(ReadableConfig config) {
+this.config = checkNotNull(config);
+}
+
+public int getBulkFlushMaxActions() {
+return config.get(BULK_FLUSH_MAX_ACTIONS_OPTION);
+}
+
+public MemorySize getBulkFlushMaxByteSize() {
+return config.get(BULK_FLUSH_MAX_SIZE_OPTION);
+}
+
+public long getBulkFlushInterval() {
+return config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+}
+
+public DeliveryGuarantee getDeliveryGuarantee() {
+return config.get(DELIVERY_GUARANTEE_OPTION);
+}
+
+public Optional getUsername() {
+return config.getOptional(USERNAME_OPTION);
+}
+
+public Optional getPassword() {
+return config.getOptional(PASSWORD_OPTION);
+}
+
+public Optional getBulkFlushBackoffType() {
+return config.getOptional(BULK_FLUSH_BACKOFF_TYPE_OPTION);
+}
+
+public Optional getBulkFlushBackoffRetries() {
+return config.getOptional(BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION);
+}
+
+public Optional 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029363404


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConfiguration.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.opensearch.sink.FlushBackoffType;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT;
+import static 
org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Opensearch base configuration. */
+@Internal
+class OpensearchConfiguration {
+protected final ReadableConfig config;
+
+OpensearchConfiguration(ReadableConfig config) {
+this.config = checkNotNull(config);
+}
+
+public int getBulkFlushMaxActions() {
+return config.get(BULK_FLUSH_MAX_ACTIONS_OPTION);
+}
+
+public MemorySize getBulkFlushMaxByteSize() {
+return config.get(BULK_FLUSH_MAX_SIZE_OPTION);
+}
+
+public long getBulkFlushInterval() {
+return config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+}
+
+public DeliveryGuarantee getDeliveryGuarantee() {
+return config.get(DELIVERY_GUARANTEE_OPTION);
+}
+
+public Optional getUsername() {
+return config.getOptional(USERNAME_OPTION);
+}
+
+public Optional getPassword() {
+return config.getOptional(PASSWORD_OPTION);
+}
+
+public Optional getBulkFlushBackoffType() {
+return config.getOptional(BULK_FLUSH_BACKOFF_TYPE_OPTION);
+}
+
+public Optional getBulkFlushBackoffRetries() {
+return config.getOptional(BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION);
+}
+
+public Optional 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029343888


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/NetworkClientConfig.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+class NetworkClientConfig implements Serializable {
+
+@Nullable private final String username;
+@Nullable private final String password;
+@Nullable private final String connectionPathPrefix;
+@Nullable private final Integer connectionRequestTimeout;
+@Nullable private final Integer connectionTimeout;
+@Nullable private final Integer socketTimeout;
+@Nullable private final Boolean allowInsecure;
+
+NetworkClientConfig(
+@Nullable String username,
+@Nullable String password,
+@Nullable String connectionPathPrefix,
+@Nullable Integer connectionRequestTimeout,
+@Nullable Integer connectionTimeout,
+@Nullable Integer socketTimeout,
+@Nullable Boolean allowInsecure) {
+this.username = username;
+this.password = password;
+this.connectionPathPrefix = connectionPathPrefix;
+this.connectionRequestTimeout = connectionRequestTimeout;
+this.connectionTimeout = connectionTimeout;
+this.socketTimeout = socketTimeout;
+this.allowInsecure = allowInsecure;
+}
+
+@Nullable
+public String getUsername() {
+return username;
+}
+
+@Nullable
+public String getPassword() {
+return password;
+}
+
+@Nullable
+public Integer getConnectionRequestTimeout() {
+return connectionRequestTimeout;
+}
+
+@Nullable
+public Integer getConnectionTimeout() {
+return connectionTimeout;
+}
+
+@Nullable
+public Integer getSocketTimeout() {
+return socketTimeout;
+}
+
+@Nullable
+public String getConnectionPathPrefix() {
+return connectionPathPrefix;
+}
+
+@Nullable
+public Boolean isAllowInsecure() {
+return allowInsecure;
+}

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029334988


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import org.apache.http.HttpHost;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to insert or update data in an Opensearch index. The sink 
supports the following
+ * delivery guarantees.
+ *
+ * 
+ *   {@link DeliveryGuarantee#NONE} does not provide any guarantees: 
actions are flushed to
+ *   Opensearch only depending on the configurations of the bulk 
processor. In case of a
+ *   failure, it might happen that actions are lost if the bulk processor 
still has buffered
+ *   actions.
+ *   {@link DeliveryGuarantee#AT_LEAST_ONCE} on a checkpoint the sink will 
wait until all
+ *   buffered actions are flushed to and acknowledged by Opensearch. No 
actions will be lost but
+ *   actions might be sent to Opensearch multiple times when Flink 
restarts. These additional
+ *   requests may cause inconsistent data in Opensearch right after the 
restart, but eventually
+ *   everything will be consistent again.
+ * 
+ *
+ * @param  type of the records converted to Opensearch actions
+ * @see OpensearchSinkBuilder on how to construct a OpensearchSink
+ */
+@PublicEvolving
+public class OpensearchSink implements Sink {

Review Comment:
   If you mean the equivalent from `ElasticseachSink` :
   ```
   @VisibleForTesting
   DeliveryGuarantee getDeliveryGuarantee() {
   return deliveryGuarantee;
   }
   ```
   
   The default for `deliveryGuarantee` was changed recently and the method was 
added to tests specifically for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-22 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029330763


##
pom.xml:
##
@@ -0,0 +1,463 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+
+   
+   io.github.zentol.flink
+   flink-connector-parent
+   1.0
+   
+
+   4.0.0
+
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   Flink : Connectors : Opensearch : Parent
+   pom
+   https://flink.apache.org
+   2022
+
+   
+   
+   The Apache Software License, Version 2.0
+   
https://www.apache.org/licenses/LICENSE-2.0.txt
+   repo
+   
+   
+
+   
+   https://github.com/apache/flink-connector-opensearch
+   
g...@github.com:apache/flink-connector-opensearch.git
+   
+   
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
+   
+   
+
+   
+   flink-connector-opensearch
+   flink-connector-opensearch-e2e-tests

Review Comment:
   Sure, thanks for clarification, removing the profile



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-21 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1028262896


##
pom.xml:
##
@@ -0,0 +1,463 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+
+   
+   io.github.zentol.flink
+   flink-connector-parent
+   1.0
+   
+
+   4.0.0
+
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   Flink : Connectors : Opensearch : Parent
+   pom
+   https://flink.apache.org
+   2022
+
+   
+   
+   The Apache Software License, Version 2.0
+   
https://www.apache.org/licenses/LICENSE-2.0.txt
+   repo
+   
+   
+
+   
+   https://github.com/apache/flink-connector-opensearch
+   
g...@github.com:apache/flink-connector-opensearch.git
+   
+   
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
+   
+   
+
+   
+   flink-connector-opensearch
+   flink-connector-opensearch-e2e-tests

Review Comment:
   It is not missing, driven by profile [1] which is active by default. Reason: 
it comes from Flink itself [2], they way SQL connectors are built
   
   [1] 
https://github.com/apache/flink-connector-opensearch/pull/1/files/41cc7bb681aede94c543c28cc5629802874aac70#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8R385
   [2] https://github.com/apache/flink/blob/master/flink-connectors/pom.xml#L83



##
pom.xml:
##
@@ -0,0 +1,463 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+
+   
+   io.github.zentol.flink
+   flink-connector-parent
+   1.0
+   
+
+   4.0.0
+
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   Flink : Connectors : Opensearch : Parent
+   pom
+   https://flink.apache.org
+   2022
+
+   
+   
+   The Apache Software License, Version 2.0
+   
https://www.apache.org/licenses/LICENSE-2.0.txt
+   repo
+   
+   
+
+   
+   https://github.com/apache/flink-connector-opensearch
+   
g...@github.com:apache/flink-connector-opensearch.git
+   
+   
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
+   
+   
+
+   
+   flink-connector-opensearch
+   flink-connector-opensearch-e2e-tests

Review Comment:
   It is not missing, driven by profile [1] which is active by default. Reason: 
it comes from Flink itself [2], the way SQL connectors are built
   
   [1] 
https://github.com/apache/flink-connector-opensearch/pull/1/files/41cc7bb681aede94c543c28cc5629802874aac70#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8R385
   [2] https://github.com/apache/flink/blob/master/flink-connectors/pom.xml#L83



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-21 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1028262896


##
pom.xml:
##
@@ -0,0 +1,463 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+
+   
+   io.github.zentol.flink
+   flink-connector-parent
+   1.0
+   
+
+   4.0.0
+
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   Flink : Connectors : Opensearch : Parent
+   pom
+   https://flink.apache.org
+   2022
+
+   
+   
+   The Apache Software License, Version 2.0
+   
https://www.apache.org/licenses/LICENSE-2.0.txt
+   repo
+   
+   
+
+   
+   https://github.com/apache/flink-connector-opensearch
+   
g...@github.com:apache/flink-connector-opensearch.git
+   
+   
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
+   
+   
+
+   
+   flink-connector-opensearch
+   flink-connector-opensearch-e2e-tests

Review Comment:
   It is not missing, driven by profile [1] which is active by default. Reason: 
it comes from Flink itself [2]
   
   [1] 
https://github.com/apache/flink-connector-opensearch/pull/1/files/41cc7bb681aede94c543c28cc5629802874aac70#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8R385
   [2] https://github.com/apache/flink/blob/master/flink-connectors/pom.xml#L83



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-21 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1028121584


##
pom.xml:
##
@@ -327,6 +327,12 @@ under the License.

 

+   
+   net.java.dev.jna
+   jna

Review Comment:
   > So could we exclude it in general? Or do we need it for some tests to work?
   
   I believe tests may need it, it is still required dependency by and large 
(unless repacked).
   
   > What is the bootstrap licensing issue about?
   
   Not sure this is license, the packaging is mirroring the Elasticsearch 
Connector [1], in this particular case following its conventions (or issues 
discovered in the past?)
   
   [1] 
https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-sql-connector-elasticsearch7/pom.xml#L93



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-21 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1028121584


##
pom.xml:
##
@@ -327,6 +327,12 @@ under the License.

 

+   
+   net.java.dev.jna
+   jna

Review Comment:
   > So could we exclude it in general? Or do we need it for some tests to work?
   
   I believe tests may need it, it is still required dependency by and large 
(unless repacked).
   
   > What is the bootstrap licensing issue about?
   
   Not sure this is license, the packaging is mirroring the Elasticsearch 
Connector [1], in this particular case following the conventions 
   
   [1] 
https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-sql-connector-elasticsearch7/pom.xml#L93



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-18 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1026434221


##
pom.xml:
##
@@ -327,6 +327,12 @@ under the License.

 

+   
+   net.java.dev.jna
+   jna

Review Comment:
   OK, now I got your question, I was confused by the location of this comment 
(in `pom.xml`), so if we look at the JNA for SQL jar, there are 2 pieces there:
   1. Exclusions of the bootstrap classes: 
https://github.com/apache/flink-connector-opensearch/pull/1/files#diff-01d20faddfa10d1695142e0f51f1c3ec81396425359ab688cbc8078a3cdb8712R83
 (they are not needed on the client at all)
   2. Because only bootstrap classes used JNA, this dependency is also 
excluded: 
https://github.com/apache/flink-connector-opensearch/pull/1/files#diff-01d20faddfa10d1695142e0f51f1c3ec81396425359ab688cbc8078a3cdb8712R67
   
   Does it answer your question?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025247156


##
flink-sql-connector-opensearch/pom.xml:
##
@@ -0,0 +1,159 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-sql-connector-opensearch
+   Flink : Connectors : SQL : Opensearch
+
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
*:*
+   
+   
+   
+   
com.tdunning:t-digest
+   
joda-time:joda-time
+   
net.sf.jopt-simple:jopt-simple
+   
net.java.dev.jna:jna
+   
org.hdrhistogram:HdrHistogram
+   
org.yaml:snakeyaml
+   
+   
+   
+   
+   
+   
org.opensearch:opensearch
+   

+   
config/**
+   
modules.txt
+   
plugins.txt
+   
org/joda/**
+   

+   
org/opensearch/bootstrap/**
+   

+   
+   
+   
org.opensearch.client:opensearch-rest-high-level-client
+   

+   
forbidden/**
+   

+   
+   
+   
org.apache.httpcomponents:httpclient
+   

+   
mozilla/**
+   

+   
+   
+   
org.apache.lucene:lucene-analyzers-common
+   

+   
org/tartarus/**
+  

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025247156


##
flink-sql-connector-opensearch/pom.xml:
##
@@ -0,0 +1,159 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-sql-connector-opensearch
+   Flink : Connectors : SQL : Opensearch
+
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
*:*
+   
+   
+   
+   
com.tdunning:t-digest
+   
joda-time:joda-time
+   
net.sf.jopt-simple:jopt-simple
+   
net.java.dev.jna:jna
+   
org.hdrhistogram:HdrHistogram
+   
org.yaml:snakeyaml
+   
+   
+   
+   
+   
+   
org.opensearch:opensearch
+   

+   
config/**
+   
modules.txt
+   
plugins.txt
+   
org/joda/**
+   

+   
org/opensearch/bootstrap/**
+   

+   
+   
+   
org.opensearch.client:opensearch-rest-high-level-client
+   

+   
forbidden/**
+   

+   
+   
+   
org.apache.httpcomponents:httpclient
+   

+   
mozilla/**
+   

+   
+   
+   
org.apache.lucene:lucene-analyzers-common
+   

+   
org/tartarus/**
+  

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025226180


##
pom.xml:
##
@@ -327,6 +327,12 @@ under the License.

 

+   
+   net.java.dev.jna
+   jna

Review Comment:
   Sorry, this is not about bundling but picking the right version to bundle 
(`dependencyManagement`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025222309


##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,190 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   
+
+   flink-connector-opensearch-e2e-tests
+   Flink : E2E Tests : Opensearch
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-base
+   ${flink.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java
+   ${flink.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   test-jar
+   
+   
+   
+   *
+   *
+   
+   
+   
+   
+   org.apache.logging.log4j
+   log4j-api
+   provided
+   
+   
+   org.apache.flink
+   flink-test-utils
+   test
+   
+   
+   org.apache.flink
+   flink-connector-test-utils
+   ${flink.version}
+   test
+   
+   
+   org.opensearch
+   opensearch-testcontainers
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-deploy-plugin
+   
+   true
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
opensearch-end-to-end-test
+   
dependencies
+   
+   
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:slf4j-api
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   
+   
+   
+   
+   
+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   
+   
+   copy
+   
pre-integration-test
+   
+ 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025220389


##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,190 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   
+
+   flink-connector-opensearch-e2e-tests
+   Flink : E2E Tests : Opensearch
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-base
+   ${flink.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java
+   ${flink.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   test-jar
+   
+   
+   
+   *
+   *
+   
+   
+   
+   
+   org.apache.logging.log4j
+   log4j-api
+   provided
+   
+   
+   org.apache.flink
+   flink-test-utils
+   test
+   
+   
+   org.apache.flink
+   flink-connector-test-utils
+   ${flink.version}
+   test
+   
+   
+   org.opensearch
+   opensearch-testcontainers
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-deploy-plugin
+   
+   true
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
opensearch-end-to-end-test
+   
dependencies
+   
+   
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:slf4j-api
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   

Review Comment:
   It seems like even `shade-flink` is not needed, removed all that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025210263


##
flink-sql-connector-opensearch/pom.xml:
##
@@ -0,0 +1,159 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-sql-connector-opensearch
+   Flink : Connectors : SQL : Opensearch
+
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
*:*
+   
+   
+   
+   
com.tdunning:t-digest
+   
joda-time:joda-time
+   
net.sf.jopt-simple:jopt-simple
+   
net.java.dev.jna:jna
+   
org.hdrhistogram:HdrHistogram
+   
org.yaml:snakeyaml
+   
+   
+   
+   
+   
+   
org.opensearch:opensearch
+   

+   
config/**
+   
modules.txt
+   
plugins.txt
+   
org/joda/**
+   

+   
org/opensearch/bootstrap/**
+   

+   
+   
+   
org.opensearch.client:opensearch-rest-high-level-client
+   

+   
forbidden/**
+   

+   
+   
+   
org.apache.httpcomponents:httpclient
+   

+   
mozilla/**
+   

+   
+   
+   
org.apache.lucene:lucene-analyzers-common
+   

+   
org/tartarus/**
+  

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025147669


##
pom.xml:
##
@@ -327,6 +327,12 @@ under the License.

 

+   
+   net.java.dev.jna
+   jna

Review Comment:
   Dependency convergence, the OpenSearch brings older version 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-17 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1025146467


##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,190 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   
+
+   flink-connector-opensearch-e2e-tests
+   Flink : E2E Tests : Opensearch
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-base
+   ${flink.version}
+   
+   
+   org.apache.flink
+   flink-streaming-java
+   ${flink.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   test-jar
+   
+   
+   
+   *
+   *
+   
+   
+   
+   
+   org.apache.logging.log4j
+   log4j-api
+   provided
+   
+   
+   org.apache.flink
+   flink-test-utils
+   test
+   
+   
+   org.apache.flink
+   flink-connector-test-utils
+   ${flink.version}
+   test
+   
+   
+   org.opensearch
+   opensearch-testcontainers
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-deploy-plugin
+   
+   true
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
opensearch-end-to-end-test
+   
dependencies
+   
+   
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:slf4j-api
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   
+   
+   
+   
+   
+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   
+   
+   copy
+   
pre-integration-test
+   
+ 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-15 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1022824989


##
flink-sql-connector-opensearch/pom.xml:
##
@@ -0,0 +1,159 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-sql-connector-opensearch
+   Flink : Connectors : SQL : Opensearch
+
+   jar
+
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
*:*
+   
+   
+   
+   
com.tdunning:t-digest
+   
joda-time:joda-time
+   
net.sf.jopt-simple:jopt-simple
+   
net.java.dev.jna:jna
+   
org.hdrhistogram:HdrHistogram
+   
org.yaml:snakeyaml
+   
+   
+   
+   
+   
+   
org.opensearch:opensearch
+   

+   
config/**
+   
modules.txt
+   
plugins.txt
+   
org/joda/**
+   

+   
org/opensearch/bootstrap/**
+   

+   
+   
+   
org.opensearch.client:opensearch-rest-high-level-client
+   

+   
forbidden/**
+   

+   
+   
+   
org.apache.httpcomponents:httpclient
+   

+   
mozilla/**
+   

+   
+   
+   
org.apache.lucene:lucene-analyzers-common
+   

+   
org/tartarus/**
+  

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-15 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1022821639


##
flink-connector-opensearch/pom.xml:
##
@@ -0,0 +1,185 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-connector-opensearch
+   Flink : Connectors : Opensearch
+
+   jar
+
+   
+   
+   1.3.0
+   
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-connector-base
+   ${flink.version}
+   

Review Comment:
   > best to set this to provided to avoid compatibility issues between 
connector-base and Flink.
   
   Needs adjustment here 
https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-connector-elasticsearch-base/pom.xml#L47



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-15 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1022819356


##
flink-connector-opensearch/pom.xml:
##
@@ -0,0 +1,185 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-connector-opensearch
+   Flink : Connectors : Opensearch
+
+   jar
+
+   
+   
+   1.3.0

Review Comment:
   > Can you give us some background of the opensearch versioning? I see 2.3.0 
artifacts on maven central.
   
   Sure, so we have 2 maintainable release trains: 1.x and 2.x. The major 
difference between two (leaving all features aside) is that 1.x has JDK-8 
baseline while 2.x has JDK-11 baseline. Since Flink still has JDK-8 baseline, 
we are using 1.x client BUT it will work with 2.x servers. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-11-15 Thread GitBox


reta commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1022816496


##
flink-connector-opensearch-e2e-tests/pom.xml:
##
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connector-opensearch-parent
+   1.0.0-SNAPSHOT
+   ..
+   
+
+   flink-connector-opensearch-e2e-tests
+   Flink : E2E Tests : Opensearch
+   jar
+
+   
+   
+   org.apache.flink
+   flink-streaming-java
+   ${flink.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-connector-opensearch
+   ${project.version}
+   
+   
+   org.apache.logging.log4j
+   log4j-api
+   ${log4j.version}
+   provided
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
opensearch-end-to-end-test
+   
dependencies
+   
+   
+   
+   
+   
+   org.apache.maven.plugins
+   maven-dependency-plugin
+   
+   
+   copy
+   
pre-integration-test
+   
+   copy
+   
+   
+   
+   

Review Comment:
   > can we move this configuration into the execution?
   
   Sure, may need need same adjustment here 
https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml#L109
 and 
https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml#L108



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org