This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 6039e11 [FLINK-14976][cassandra] Release semaphore on all Throwable's in send() 6039e11 is described below commit 6039e11c1cad20fe3468715ff594a49cbdc8d95e Author: Mads Chr. Olesen <m...@trackunit.com> AuthorDate: Wed Nov 27 14:56:16 2019 +0100 [FLINK-14976][cassandra] Release semaphore on all Throwable's in send() --- .../connectors/cassandra/CassandraSinkBase.java | 2 +- .../cassandra/CassandraSinkBaseTest.java | 51 ++++++++++++++-------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 0e7eb6f..76ed9c7 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -132,7 +132,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl final ListenableFuture<V> result; try { result = send(value); - } catch (Exception e) { + } catch (Throwable e) { semaphore.release(); throw e; } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java index 3ce9742..5c0a431 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java @@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; -import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.util.concurrent.ListenableFuture; import org.junit.Assert; @@ -42,9 +41,13 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.number.OrderingComparison.greaterThan; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.when; @@ -81,7 +84,7 @@ public class CassandraSinkBaseTest { casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null)); final int originalPermits = casSinkFunc.getAvailablePermits(); - Assert.assertThat(originalPermits, greaterThan(0)); + assertThat(originalPermits, greaterThan(0)); Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); casSinkFunc.invoke("hello"); @@ -277,21 +280,29 @@ public class CassandraSinkBaseTest { } @Test(timeout = DEFAULT_TEST_TIMEOUT) - public void testReleaseOnSendException() throws Exception { + public void testReleaseOnThrowingSend() throws Exception { final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder() .setMaxConcurrentRequests(1) .build(); - try (TestCassandraSink testCassandraSink = createOpenedSendExceptionTestCassandraSink(config)) { - Assert.assertEquals(1, testCassandraSink.getAvailablePermits()); - Assert.assertEquals(0, testCassandraSink.getAcquiredPermits()); + Function<String, ListenableFuture<ResultSet>> failingSendFunction = ignoredMessage -> { + throwCheckedAsUnchecked(new Throwable("expected")); + //noinspection ReturnOfNull + return null; + }; + try (TestCassandraSink testCassandraSink = new MockCassandraSink(config, failingSendFunction)) { + testCassandraSink.open(new Configuration()); + assertThat(testCassandraSink.getAvailablePermits(), is(1)); + assertThat(testCassandraSink.getAcquiredPermits(), is(0)); + + //noinspection OverlyBroadCatchBlock,NestedTryStatement try { - testCassandraSink.invoke("N/A"); - } catch (Exception e) { - Assert.assertTrue(e instanceof InvalidQueryException); - Assert.assertEquals(1, testCassandraSink.getAvailablePermits()); - Assert.assertEquals(0, testCassandraSink.getAcquiredPermits()); + testCassandraSink.invoke("none"); + } catch (Throwable e) { + assertThat(e, instanceOf(Throwable.class)); + assertThat(testCassandraSink.getAvailablePermits(), is(1)); + assertThat(testCassandraSink.getAcquiredPermits(), is(0)); } } } @@ -355,10 +366,9 @@ public class CassandraSinkBaseTest { return testHarness; } - private TestCassandraSink createOpenedSendExceptionTestCassandraSink(CassandraSinkBaseConfig config) { - final TestCassandraSink testCassandraSink = new SendExceptionTestCassandraSink(config); - testCassandraSink.open(new Configuration()); - return testCassandraSink; + private static <T extends Throwable> void throwCheckedAsUnchecked(Throwable ex) throws T { + //noinspection unchecked + throw (T) ex; } private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> implements AutoCloseable { @@ -410,14 +420,19 @@ public class CassandraSinkBaseTest { } } - private static class SendExceptionTestCassandraSink extends TestCassandraSink { - SendExceptionTestCassandraSink(CassandraSinkBaseConfig config) { + private static class MockCassandraSink extends TestCassandraSink { + private static final long serialVersionUID = -3363195776692829911L; + + private final Function<String, ListenableFuture<ResultSet>> sendFunction; + + MockCassandraSink(CassandraSinkBaseConfig config, Function<String, ListenableFuture<ResultSet>> sendFunction) { super(config, new NoOpCassandraFailureHandler()); + this.sendFunction = sendFunction; } @Override public ListenableFuture<ResultSet> send(String value) { - throw new InvalidQueryException("For test purposes"); + return this.sendFunction.apply(value); } } }