This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 6039e11  [FLINK-14976][cassandra] Release semaphore on all Throwable's 
in send()
6039e11 is described below

commit 6039e11c1cad20fe3468715ff594a49cbdc8d95e
Author: Mads Chr. Olesen <m...@trackunit.com>
AuthorDate: Wed Nov 27 14:56:16 2019 +0100

    [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
---
 .../connectors/cassandra/CassandraSinkBase.java    |  2 +-
 .../cassandra/CassandraSinkBaseTest.java           | 51 ++++++++++++++--------
 2 files changed, 34 insertions(+), 19 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 0e7eb6f..76ed9c7 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -132,7 +132,7 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> impl
                final ListenableFuture<V> result;
                try {
                        result = send(value);
-               } catch (Exception e) {
+               } catch (Throwable e) {
                        semaphore.release();
                        throw e;
                }
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 3ce9742..5c0a431 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Assert;
@@ -42,9 +41,13 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
@@ -81,7 +84,7 @@ public class CassandraSinkBaseTest {
                        
casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
 
                        final int originalPermits = 
casSinkFunc.getAvailablePermits();
-                       Assert.assertThat(originalPermits, greaterThan(0));
+                       assertThat(originalPermits, greaterThan(0));
                        Assert.assertEquals(0, 
casSinkFunc.getAcquiredPermits());
 
                        casSinkFunc.invoke("hello");
@@ -277,21 +280,29 @@ public class CassandraSinkBaseTest {
        }
 
        @Test(timeout = DEFAULT_TEST_TIMEOUT)
-       public void testReleaseOnSendException() throws Exception {
+       public void testReleaseOnThrowingSend() throws Exception {
                final CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder()
                        .setMaxConcurrentRequests(1)
                        .build();
 
-               try (TestCassandraSink testCassandraSink = 
createOpenedSendExceptionTestCassandraSink(config)) {
-                       Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
-                       Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+               Function<String, ListenableFuture<ResultSet>> 
failingSendFunction = ignoredMessage -> {
+                       throwCheckedAsUnchecked(new Throwable("expected"));
+                       //noinspection ReturnOfNull
+                       return null;
+               };
 
+               try (TestCassandraSink testCassandraSink = new 
MockCassandraSink(config, failingSendFunction)) {
+                       testCassandraSink.open(new Configuration());
+                       assertThat(testCassandraSink.getAvailablePermits(), 
is(1));
+                       assertThat(testCassandraSink.getAcquiredPermits(), 
is(0));
+
+                       //noinspection OverlyBroadCatchBlock,NestedTryStatement
                        try {
-                               testCassandraSink.invoke("N/A");
-                       } catch (Exception e) {
-                               Assert.assertTrue(e instanceof 
InvalidQueryException);
-                               Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
-                               Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+                               testCassandraSink.invoke("none");
+                       } catch (Throwable e) {
+                               assertThat(e, instanceOf(Throwable.class));
+                               
assertThat(testCassandraSink.getAvailablePermits(), is(1));
+                               
assertThat(testCassandraSink.getAcquiredPermits(), is(0));
                        }
                }
        }
@@ -355,10 +366,9 @@ public class CassandraSinkBaseTest {
                return testHarness;
        }
 
-       private TestCassandraSink 
createOpenedSendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
-               final TestCassandraSink testCassandraSink = new 
SendExceptionTestCassandraSink(config);
-               testCassandraSink.open(new Configuration());
-               return testCassandraSink;
+       private static <T extends Throwable> void 
throwCheckedAsUnchecked(Throwable ex) throws T {
+               //noinspection unchecked
+               throw (T) ex;
        }
 
        private static class TestCassandraSink extends 
CassandraSinkBase<String, ResultSet> implements AutoCloseable {
@@ -410,14 +420,19 @@ public class CassandraSinkBaseTest {
                }
        }
 
-       private static class SendExceptionTestCassandraSink extends 
TestCassandraSink {
-               SendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+       private static class MockCassandraSink extends TestCassandraSink {
+               private static final long serialVersionUID = 
-3363195776692829911L;
+
+               private final Function<String, ListenableFuture<ResultSet>> 
sendFunction;
+
+               MockCassandraSink(CassandraSinkBaseConfig config, 
Function<String, ListenableFuture<ResultSet>> sendFunction) {
                        super(config, new NoOpCassandraFailureHandler());
+                       this.sendFunction = sendFunction;
                }
 
                @Override
                public ListenableFuture<ResultSet> send(String value) {
-                       throw new InvalidQueryException("For test purposes");
+                       return this.sendFunction.apply(value);
                }
        }
 }

Reply via email to