Repository: flink Updated Branches: refs/heads/master 47acdeadf -> e9a067229
[hotfix] Fix Redis Sink to fail at opening if Redis is not initialized. This closes #2245 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9a06722 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9a06722 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9a06722 Branch: refs/heads/master Commit: e9a067229d787c3e874dc40303ac8661045cc54f Parents: 47acdea Author: kl0u <kklou...@gmail.com> Authored: Wed Jul 13 18:40:17 2016 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Thu Aug 18 16:41:53 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/redis/RedisSink.java | 8 +- .../common/container/RedisClusterContainer.java | 12 ++- .../container/RedisCommandsContainer.java | 7 ++ .../redis/common/container/RedisContainer.java | 17 ++++- .../connectors/redis/RedisSinkITCase.java | 2 - .../connectors/redis/RedisSinkTest.java | 79 ++++++++++++++++++++ 6 files changed, 117 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java index 43518e8..f6b0fd7 100644 --- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -166,7 +166,13 @@ public class RedisSink<IN> extends RichSinkFunction<IN> { */ @Override public void open(Configuration parameters) throws Exception { - this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); + try { + this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); + this.redisCommandsContainer.open(); + } catch (Exception e) { + LOG.error("Redis has not been properly initialized: ", e); + throw e; + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java index 7551c9e..d6621d6 100644 --- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -33,7 +33,7 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); - private JedisCluster jedisCluster; + private transient JedisCluster jedisCluster; /** * Initialize Redis command container for Redis cluster. @@ -47,6 +47,16 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable } @Override + public void open() throws Exception { + + // echo() tries to open a connection and echos back the + // message passed as argument. Here we use it to monitor + // if we can communicate with the cluster. + + jedisCluster.echo("Test"); + } + + @Override public void hset(final String key, final String hashField, final String value) { try { jedisCluster.hset(key, hashField, value); http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java index 1b92c2e..55dbfc2 100644 --- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -25,6 +25,13 @@ import java.io.Serializable; public interface RedisCommandsContainer extends Serializable { /** + * Open the Jedis container. + * + * @throws Exception if the instance can not be opened properly + */ + void open() throws Exception; + + /** * Sets field in the hash stored at key to value. * If key does not exist, a new key holding a hash is created. * If field already exists in the hash, it is overwritten. http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java index 8684e9a..ba4bbda 100644 --- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java +++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -29,7 +29,7 @@ import java.io.IOException; /** * Redis command container if we want to connect to a single Redis server or to Redis sentinels * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}. - * If want to connect to a Redis sentinels, Please use the second constructor {@link #RedisContainer(JedisSentinelPool)} + * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)} */ public class RedisContainer implements RedisCommandsContainer, Closeable { @@ -37,9 +37,8 @@ public class RedisContainer implements RedisCommandsContainer, Closeable { private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); - private final JedisPool jedisPool; - private final JedisSentinelPool jedisSentinelPool; - + private transient JedisPool jedisPool; + private transient JedisSentinelPool jedisSentinelPool; /** * Use this constructor if to connect with single Redis server. @@ -77,6 +76,16 @@ public class RedisContainer implements RedisCommandsContainer, Closeable { } @Override + public void open() throws Exception { + + // echo() tries to open a connection and echos back the + // message passed as argument. Here we use it to monitor + // if we can communicate with the cluster. + + getInstance().echo("Test"); + } + + @Override public void hset(final String key, final String hashField, final String value) { Jedis jedis = null; try { http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java index 237d9e5..21f3cca 100644 --- a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java +++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java @@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; @@ -31,7 +30,6 @@ import org.junit.Test; import redis.clients.jedis.Jedis; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; public class RedisSinkITCase extends RedisITCaseBase { http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java index 848af57..59f59f2 100644 --- a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java +++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java @@ -17,12 +17,23 @@ package org.apache.flink.streaming.connectors.redis; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.TestLogger; import org.junit.Test; +import redis.clients.jedis.exceptions.JedisConnectionException; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.fail; public class RedisSinkTest extends TestLogger { @@ -41,6 +52,74 @@ public class RedisSinkTest extends TestLogger { new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH))); } + @Test + public void testRedisDownBehavior() throws Exception { + + // create a wrong configuration so that open() fails. + + FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder() + .setHost("127.0.0.1") + .setPort(1234).build(); + + testDownBehavior(wrongJedisPoolConfig); + } + + @Test + public void testRedisClusterDownBehavior() throws Exception { + + Set<InetSocketAddress> hosts = new HashSet<>(); + hosts.add(new InetSocketAddress("127.0.0.1", 1234)); + + // create a wrong configuration so that open() fails. + + FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder() + .setNodes(hosts) + .setTimeout(100) + .setMaxIdle(1) + .setMaxTotal(1) + .setMinIdle(1).build(); + + testDownBehavior(wrongJedisClusterConfig); + } + + @Test + public void testRedisSentinelDownBehavior() throws Exception { + + Set<String> hosts = new HashSet<>(); + hosts.add("localhost:55095"); + + // create a wrong configuration so that open() fails. + + FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder() + .setMasterName("master") + .setSentinels(hosts) + .build(); + + testDownBehavior(wrongJedisSentinelConfig); + } + + private void testDownBehavior(FlinkJedisConfigBase config) throws Exception { + RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config, + new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD)); + + try { + redisSink.open(new Configuration()); + } catch (Exception e) { + + // search for nested JedisConnectionExceptions + // because this is the expected behavior + + Throwable t = e; + int depth = 0; + while (!(t instanceof JedisConnectionException)) { + t = t.getCause(); + if (t == null || depth++ == 20) { + throw e; + } + } + } + } + private class TestMapper implements RedisMapper<Tuple2<String, String>>{ private RedisCommandDescription redisCommandDescription;