[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4605 ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139950435 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139685528 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139685362 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139660015 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -37,29 +41,49 @@ * * @param Type of the elements emitted by this sink */ -public abstract class CassandraSinkBaseextends RichSinkFunction { +public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); protected transient Cluster cluster; protected transient Session session; - protected transient volatile Throwable exception; + protected transient volatile Throwable asyncError; protected transient FutureCallback callback; - private final ClusterBuilder builder; + protected final ClusterBuilder builder; private final AtomicInteger updatesPending = new AtomicInteger(); + /** +* If true, the producer will wait until all outstanding action requests have been sent to C*. +*/ + private boolean flushOnCheckpoint = true; --- End diff -- Ideally all sinks should behave the same way, but that would be out of scope for PR in particular. ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139658146 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -37,29 +41,49 @@ * * @param Type of the elements emitted by this sink */ -public abstract class CassandraSinkBaseextends RichSinkFunction { +public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); protected transient Cluster cluster; protected transient Session session; - protected transient volatile Throwable exception; + protected transient volatile Throwable asyncError; protected transient FutureCallback callback; - private final ClusterBuilder builder; + protected final ClusterBuilder builder; private final AtomicInteger updatesPending = new AtomicInteger(); + /** +* If true, the producer will wait until all outstanding action requests have been sent to C*. +*/ + private boolean flushOnCheckpoint = true; --- End diff -- Got your points on the behaviors to subclasses to CassandraSinkBase, but I was wondering if some of these 'common behavior' should be or would be unified across different data sink, KafKa ES, etc. They all currently have slightly different implantation though. ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139632915 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -37,29 +41,49 @@ * * @param Type of the elements emitted by this sink */ -public abstract class CassandraSinkBaseextends RichSinkFunction { +public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); protected transient Cluster cluster; protected transient Session session; - protected transient volatile Throwable exception; + protected transient volatile Throwable asyncError; protected transient FutureCallback callback; - private final ClusterBuilder builder; + protected final ClusterBuilder builder; private final AtomicInteger updatesPending = new AtomicInteger(); + /** +* If true, the producer will wait until all outstanding action requests have been sent to C*. +*/ + private boolean flushOnCheckpoint = true; --- End diff -- The default should be true. Every CassandraSinkBase subclass should accept a constructor argument that is passed on to the CassandraSinkBase constructor. Additionally, the CassandraSinkBuilder should be extended to allow setting this flag to false if desired. ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139630504 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -37,29 +41,49 @@ * * @param Type of the elements emitted by this sink */ -public abstract class CassandraSinkBaseextends RichSinkFunction { +public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); protected transient Cluster cluster; protected transient Session session; - protected transient volatile Throwable exception; + protected transient volatile Throwable asyncError; protected transient FutureCallback callback; - private final ClusterBuilder builder; + protected final ClusterBuilder builder; private final AtomicInteger updatesPending = new AtomicInteger(); + /** +* If true, the producer will wait until all outstanding action requests have been sent to C*. +*/ + private boolean flushOnCheckpoint = true; --- End diff -- Should we default this setting as true then? Making it final does not allow to reset the flag after obj is constructed. Further, should this behavior be applied to all other SinkBase / Producer classes in the future? ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139615174 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139485608 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); --- End diff -- remove this line for clarity purposes as close() should never be called ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139484760 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -37,29 +41,49 @@ * * @param Type of the elements emitted by this sink */ -public abstract class CassandraSinkBaseextends RichSinkFunction { +public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); protected transient Cluster cluster; protected transient Session session; - protected transient volatile Throwable exception; + protected transient volatile Throwable asyncError; protected transient FutureCallback callback; - private final ClusterBuilder builder; + protected final ClusterBuilder builder; private final AtomicInteger updatesPending = new AtomicInteger(); + /** +* If true, the producer will wait until all outstanding action requests have been sent to C*. +*/ + private boolean flushOnCheckpoint = true; --- End diff -- this should be a constructor argument and final. ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139485057 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139483818 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139482771 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139484838 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139482708 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero --- End diff -- unnecessary comment ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139482639 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { --- End diff -- empty method that can be removed ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139482886 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139482849 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139483098 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139482897 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139482615 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { --- End diff -- empty method that can be removed ---
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4605#discussion_r139484328 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class); + + private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt"; + + private static final String DUMMY_MESSAGE = "Dummy_msg"; + + private static final int MAX_THREAD_NUM = 3; + + private static final int MAX_THREAD_POOL_SIZE = 2; + + @BeforeClass + public static void doSetUp() throws Exception { + + } + + @BeforeClass + public static void doTearDown() throws Exception { + + } + + /** +* Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist. +*/ + @Test(expected = NoHostAvailableException.class) + public void testCasHostNotFoundErrorHandling() throws Exception { + CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withoutJMXReporting() + .withoutMetrics().build(); + } + }); + + base.open(new Configuration()); + base.close(); + } + + + /// + // Single Thread Test + /// + + /** +* Test ensures the message could be delivered successfully to sink. +*/ + @Test + public void testSimpleSuccessfulPath() throws Exception { + ClusterBuilder builder = mock(ClusterBuilder.class); + MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder); + casSinkFunc.setFlushOnCheckpoint(true); + casSinkFunc.open(new Configuration()); + + casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS); + + casSinkFunc.close(); + + //Final pending updates should be zero + Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords()); + } + + /** +* Test ensures that an asyncError would be thrown on close() if previously message delivery failed. +*/ + @Test + public void
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
GitHub user mcfongtw opened a pull request: https://github.com/apache/flink/pull/4605 [FLINK-4500] [C* Connector] CassandraSinkBase implements CheckpointedFunction ## What is the purpose of the change Have CassandraSinkBase to implement CheckpointedFunction so that all in-flight mutation message could be sent to C* sink before a checkpoint performs. As a result, the checkpoint would be complete. ## Brief change log * Implement CheckpointedFunction to (optionally) wait on all pending records being flushed to the C* sink before checkpoint performs (or closing connection). * Add debugging message in CassandraSinkBase. * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios * Add unit tests for failure handling logics on errors thrown at different stages. * Add unit tests for flushing pending records when checkpoint performs. * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes. * Add CassandraBaseTest in suppression list to use guava imports * In log4j-test settings, change root log level to INFO and enable ALL level against some test classes. ## Verifying this change This change is already covered by existing tests, such as *CassandraBaseTest*. This change added tests and can be verified as follows: * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios * Add unit tests for failure handling logics on errors thrown at different stages. * Add unit tests for flushing pending records when checkpoint performs. * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes. * Add CassandraBaseTest in suppression list to use guava imports ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no** (maybe) ) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/mcfongtw/flink FLINK-4500 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4605.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4605 commit caefe390bf2aaa22d996cc24a31a3ba76241fb23 Author: Michael FongDate: 2017-08-14T12:57:06Z [FLINK-4500] CassandraSinkBase implements CheckpointedFunction * Implement CheckpointedFunction to (optionally) wait on all pending records being flushed to the C* sink before taking a snapshot (or closing connection). * Add debugging message in CassandraSinkBase. * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios * Add unit tests for failure handling logics on errors thrown at different stages. * Add unit tests for flushing pending records when checkpoint performs. * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes. * Add CassandraBaseTest in suppression list to use guava imports * In log4j-test settings, change root log level to INFO and enable ALL level against some test classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---