[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4605


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-20 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139950435
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-19 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139685528
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-19 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139685362
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139660015
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -37,29 +41,49 @@
  *
  * @param  Type of the elements emitted by this sink
  */
-public abstract class CassandraSinkBase extends 
RichSinkFunction {
+public abstract class CassandraSinkBase extends 
RichSinkFunction implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected transient Cluster cluster;
protected transient Session session;
 
-   protected transient volatile Throwable exception;
+   protected transient volatile Throwable asyncError;
protected transient FutureCallback callback;
 
-   private final ClusterBuilder builder;
+   protected final ClusterBuilder builder;
 
private final AtomicInteger updatesPending = new AtomicInteger();
 
+   /**
+* If true, the producer will wait until all outstanding action 
requests have been sent to C*.
+*/
+   private boolean flushOnCheckpoint = true;
--- End diff --

Ideally all sinks should behave the same way, but that would be out of 
scope for PR in particular.


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-19 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139658146
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -37,29 +41,49 @@
  *
  * @param  Type of the elements emitted by this sink
  */
-public abstract class CassandraSinkBase extends 
RichSinkFunction {
+public abstract class CassandraSinkBase extends 
RichSinkFunction implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected transient Cluster cluster;
protected transient Session session;
 
-   protected transient volatile Throwable exception;
+   protected transient volatile Throwable asyncError;
protected transient FutureCallback callback;
 
-   private final ClusterBuilder builder;
+   protected final ClusterBuilder builder;
 
private final AtomicInteger updatesPending = new AtomicInteger();
 
+   /**
+* If true, the producer will wait until all outstanding action 
requests have been sent to C*.
+*/
+   private boolean flushOnCheckpoint = true;
--- End diff --

Got your points on the behaviors to subclasses to CassandraSinkBase, but I 
was wondering if some of these 'common behavior' should be or would be unified 
across different data sink, KafKa ES, etc.  They all currently have slightly 
different implantation though. 


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139632915
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -37,29 +41,49 @@
  *
  * @param  Type of the elements emitted by this sink
  */
-public abstract class CassandraSinkBase extends 
RichSinkFunction {
+public abstract class CassandraSinkBase extends 
RichSinkFunction implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected transient Cluster cluster;
protected transient Session session;
 
-   protected transient volatile Throwable exception;
+   protected transient volatile Throwable asyncError;
protected transient FutureCallback callback;
 
-   private final ClusterBuilder builder;
+   protected final ClusterBuilder builder;
 
private final AtomicInteger updatesPending = new AtomicInteger();
 
+   /**
+* If true, the producer will wait until all outstanding action 
requests have been sent to C*.
+*/
+   private boolean flushOnCheckpoint = true;
--- End diff --

The default should be true. Every CassandraSinkBase subclass should accept 
a constructor argument that is passed on to the CassandraSinkBase constructor. 
Additionally, the CassandraSinkBuilder should be extended to allow setting this 
flag to false if desired.




---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-19 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139630504
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -37,29 +41,49 @@
  *
  * @param  Type of the elements emitted by this sink
  */
-public abstract class CassandraSinkBase extends 
RichSinkFunction {
+public abstract class CassandraSinkBase extends 
RichSinkFunction implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected transient Cluster cluster;
protected transient Session session;
 
-   protected transient volatile Throwable exception;
+   protected transient volatile Throwable asyncError;
protected transient FutureCallback callback;
 
-   private final ClusterBuilder builder;
+   protected final ClusterBuilder builder;
 
private final AtomicInteger updatesPending = new AtomicInteger();
 
+   /**
+* If true, the producer will wait until all outstanding action 
requests have been sent to C*.
+*/
+   private boolean flushOnCheckpoint = true;
--- End diff --

Should we default this setting as true then? Making it final does not allow 
to reset the flag after obj is constructed. Further, should this behavior be 
applied to all other SinkBase / Producer classes in the future?


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-19 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139615174
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139485608
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
--- End diff --

remove this line for clarity purposes as close() should never be called


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139484760
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -37,29 +41,49 @@
  *
  * @param  Type of the elements emitted by this sink
  */
-public abstract class CassandraSinkBase extends 
RichSinkFunction {
+public abstract class CassandraSinkBase extends 
RichSinkFunction implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected transient Cluster cluster;
protected transient Session session;
 
-   protected transient volatile Throwable exception;
+   protected transient volatile Throwable asyncError;
protected transient FutureCallback callback;
 
-   private final ClusterBuilder builder;
+   protected final ClusterBuilder builder;
 
private final AtomicInteger updatesPending = new AtomicInteger();
 
+   /**
+* If true, the producer will wait until all outstanding action 
requests have been sent to C*.
+*/
+   private boolean flushOnCheckpoint = true;
--- End diff --

this should be a constructor argument and final.


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139485057
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139483818
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139482771
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139484838
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139482708
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
--- End diff --

unnecessary comment


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139482639
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
--- End diff --

empty method that can be removed


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139482886
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139482849
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139483098
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139482897
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139482615
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
--- End diff --

empty method that can be removed


---


[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-09-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4605#discussion_r139484328
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+   private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+   private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+   private static final int MAX_THREAD_NUM = 3;
+
+   private static final int MAX_THREAD_POOL_SIZE = 2;
+
+   @BeforeClass
+   public static void doSetUp() throws Exception {
+
+   }
+
+   @BeforeClass
+   public static void doTearDown() throws Exception {
+
+   }
+
+   /**
+* Test ensures a NoHostAvailableException would be thrown if a contact 
point added does not exist.
+*/
+   @Test(expected = NoHostAvailableException.class)
+   public void testCasHostNotFoundErrorHandling() throws Exception {
+   CassandraSinkBase base = new DummyCassandraSinkBase<>(new 
ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder
+   .addContactPoint("127.0.0.1")
+   .withoutJMXReporting()
+   .withoutMetrics().build();
+   }
+   });
+
+   base.open(new Configuration());
+   base.close();
+   }
+
+
+   ///
+   // Single Thread Test
+   ///
+
+   /**
+* Test ensures the message could be delivered successfully to sink.
+*/
+   @Test
+   public void testSimpleSuccessfulPath() throws Exception {
+   ClusterBuilder builder = mock(ClusterBuilder.class);
+   MockCassandraSinkBase casSinkFunc = new 
MockCassandraSinkBase<>(builder);
+   casSinkFunc.setFlushOnCheckpoint(true);
+   casSinkFunc.open(new Configuration());
+
+   casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, 
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+   casSinkFunc.close();
+
+   //Final pending updates should be zero
+   Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+   }
+
+   /**
+* Test ensures that an asyncError would be thrown on close() if 
previously message delivery failed.
+*/
+   @Test
+   public void 

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

2017-08-26 Thread mcfongtw
GitHub user mcfongtw opened a pull request:

https://github.com/apache/flink/pull/4605

[FLINK-4500] [C* Connector] CassandraSinkBase implements 
CheckpointedFunction

## What is the purpose of the change
Have CassandraSinkBase to implement CheckpointedFunction so that all 
in-flight mutation message could be sent to C* sink before a checkpoint 
performs. As a result, the checkpoint would be complete. 

## Brief change log

* Implement CheckpointedFunction to (optionally) wait on all pending 
records being flushed to the C* sink before checkpoint performs (or closing 
connection).
* Add debugging message in CassandraSinkBase.
* Add unit tests for simple / multi-threaded message dispatching for 
successful / failed scenarios
* Add unit tests for failure handling logics on errors thrown at different 
stages.
* Add unit tests for flushing pending records when checkpoint performs.
* Provide a Immediate / Delayed type of ResultSetFuture for testing 
purposes.
* Add CassandraBaseTest in suppression list to use guava imports
* In log4j-test settings, change root log level to INFO and enable ALL 
level against some test classes.


## Verifying this change

This change is already covered by existing tests, such as 
*CassandraBaseTest*.

This change added tests and can be verified as follows:

* Add unit tests for simple / multi-threaded message dispatching for 
successful / failed scenarios
* Add unit tests for failure handling logics on errors thrown at different 
stages.
* Add unit tests for flushing pending records when checkpoint performs.
* Provide a Immediate / Delayed type of ResultSetFuture for testing 
purposes.
* Add CassandraBaseTest in suppression list to use guava imports

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no** (maybe) )
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mcfongtw/flink FLINK-4500

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4605.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4605


commit caefe390bf2aaa22d996cc24a31a3ba76241fb23
Author: Michael Fong 
Date:   2017-08-14T12:57:06Z

[FLINK-4500] CassandraSinkBase implements CheckpointedFunction

* Implement CheckpointedFunction to (optionally) wait on all pending 
records being flushed to the C* sink before taking a snapshot (or closing 
connection).

* Add debugging message in CassandraSinkBase.

* Add unit tests for simple / multi-threaded message dispatching for 
successful / failed scenarios

* Add unit tests for failure handling logics on errors thrown at different 
stages.

* Add unit tests for flushing pending records when checkpoint performs.

* Provide a Immediate / Delayed type of ResultSetFuture for testing 
purposes.

* Add CassandraBaseTest in suppression list to use guava imports

* In log4j-test settings, change root log level to INFO and enable ALL 
level against some test classes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---