Repository: flink
Updated Branches:
  refs/heads/master 47acdeadf -> e9a067229


[hotfix] Fix Redis Sink to fail at opening if Redis is not initialized.

This closes #2245


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9a06722
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9a06722
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9a06722

Branch: refs/heads/master
Commit: e9a067229d787c3e874dc40303ac8661045cc54f
Parents: 47acdea
Author: kl0u <kklou...@gmail.com>
Authored: Wed Jul 13 18:40:17 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Thu Aug 18 16:41:53 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/redis/RedisSink.java   |  8 +-
 .../common/container/RedisClusterContainer.java | 12 ++-
 .../container/RedisCommandsContainer.java       |  7 ++
 .../redis/common/container/RedisContainer.java  | 17 ++++-
 .../connectors/redis/RedisSinkITCase.java       |  2 -
 .../connectors/redis/RedisSinkTest.java         | 79 ++++++++++++++++++++
 6 files changed, 117 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 43518e8..f6b0fd7 100644
--- 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -166,7 +166,13 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
      */
        @Override
        public void open(Configuration parameters) throws Exception {
-               this.redisCommandsContainer = 
RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+               try {
+                       this.redisCommandsContainer = 
RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+                       this.redisCommandsContainer.open();
+               } catch (Exception e) {
+                       LOG.error("Redis has not been properly initialized: ", 
e);
+                       throw e;
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 7551c9e..d6621d6 100644
--- 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -33,7 +33,7 @@ public class RedisClusterContainer implements 
RedisCommandsContainer, Closeable
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
 
-       private JedisCluster jedisCluster;
+       private transient JedisCluster jedisCluster;
 
        /**
         * Initialize Redis command container for Redis cluster.
@@ -47,6 +47,16 @@ public class RedisClusterContainer implements 
RedisCommandsContainer, Closeable
        }
 
        @Override
+       public void open() throws Exception {
+
+               // echo() tries to open a connection and echos back the
+               // message passed as argument. Here we use it to monitor
+               // if we can communicate with the cluster.
+
+               jedisCluster.echo("Test");
+       }
+
+       @Override
        public void hset(final String key, final String hashField, final String 
value) {
                try {
                        jedisCluster.hset(key, hashField, value);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 1b92c2e..55dbfc2 100644
--- 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -25,6 +25,13 @@ import java.io.Serializable;
 public interface RedisCommandsContainer extends Serializable {
 
        /**
+        * Open the Jedis container.
+        *
+        * @throws Exception if the instance can not be opened properly
+        */
+       void open() throws Exception;
+
+       /**
         * Sets field in the hash stored at key to value.
         * If key does not exist, a new key holding a hash is created.
         * If field already exists in the hash, it is overwritten.

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 8684e9a..ba4bbda 100644
--- 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 /**
  * Redis command container if we want to connect to a single Redis server or 
to Redis sentinels
  * If want to connect to a single Redis server, please use the first 
constructor {@link #RedisContainer(JedisPool)}.
- * If want to connect to a Redis sentinels, Please use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ * If want to connect to a Redis sentinels, please use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
  */
 public class RedisContainer implements RedisCommandsContainer, Closeable {
 
@@ -37,9 +37,8 @@ public class RedisContainer implements 
RedisCommandsContainer, Closeable {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
 
-       private final JedisPool jedisPool;
-       private final JedisSentinelPool jedisSentinelPool;
-
+       private transient JedisPool jedisPool;
+       private transient JedisSentinelPool jedisSentinelPool;
 
        /**
         * Use this constructor if to connect with single Redis server.
@@ -77,6 +76,16 @@ public class RedisContainer implements 
RedisCommandsContainer, Closeable {
        }
 
        @Override
+       public void open() throws Exception {
+
+               // echo() tries to open a connection and echos back the
+               // message passed as argument. Here we use it to monitor
+               // if we can communicate with the cluster.
+
+               getInstance().echo("Test");
+       }
+
+       @Override
        public void hset(final String key, final String hashField, final String 
value) {
                Jedis jedis = null;
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 237d9e5..21f3cca 100644
--- 
a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
 import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
 import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
@@ -31,7 +30,6 @@ import org.junit.Test;
 import redis.clients.jedis.Jedis;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class RedisSinkITCase extends RedisITCaseBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
index 848af57..59f59f2 100644
--- 
a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -17,12 +17,23 @@
 package org.apache.flink.streaming.connectors.redis;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
 import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
 
 public class RedisSinkTest extends TestLogger {
 
@@ -41,6 +52,74 @@ public class RedisSinkTest extends TestLogger {
                new RedisSink<>(null, new TestMapper(new 
RedisCommandDescription(RedisCommand.LPUSH)));
        }
 
+       @Test
+       public void testRedisDownBehavior() throws Exception {
+
+               // create a wrong configuration so that open() fails.
+
+               FlinkJedisPoolConfig wrongJedisPoolConfig = new 
FlinkJedisPoolConfig.Builder()
+                       .setHost("127.0.0.1")
+                       .setPort(1234).build();
+
+               testDownBehavior(wrongJedisPoolConfig);
+       }
+
+       @Test
+       public void testRedisClusterDownBehavior() throws Exception {
+
+               Set<InetSocketAddress> hosts = new HashSet<>();
+               hosts.add(new InetSocketAddress("127.0.0.1", 1234));
+
+               // create a wrong configuration so that open() fails.
+
+               FlinkJedisClusterConfig wrongJedisClusterConfig = new 
FlinkJedisClusterConfig.Builder()
+                       .setNodes(hosts)
+                       .setTimeout(100)
+                       .setMaxIdle(1)
+                       .setMaxTotal(1)
+                       .setMinIdle(1).build();
+
+               testDownBehavior(wrongJedisClusterConfig);
+       }
+
+       @Test
+       public void testRedisSentinelDownBehavior() throws Exception {
+
+               Set<String> hosts = new HashSet<>();
+               hosts.add("localhost:55095");
+
+               // create a wrong configuration so that open() fails.
+
+               FlinkJedisSentinelConfig wrongJedisSentinelConfig = new 
FlinkJedisSentinelConfig.Builder()
+                       .setMasterName("master")
+                       .setSentinels(hosts)
+                       .build();
+
+               testDownBehavior(wrongJedisSentinelConfig);
+       }
+
+       private void testDownBehavior(FlinkJedisConfigBase config) throws 
Exception {
+               RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(config,
+                       new 
RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
+
+               try {
+                       redisSink.open(new Configuration());
+               } catch (Exception e) {
+
+                       // search for nested JedisConnectionExceptions
+                       // because this is the expected behavior
+
+                       Throwable t = e;
+                       int depth = 0;
+                       while (!(t instanceof JedisConnectionException)) {
+                               t = t.getCause();
+                               if (t == null || depth++ == 20) {
+                                       throw e;
+                               }
+                       }
+               }
+       }
+
        private class TestMapper implements RedisMapper<Tuple2<String, String>>{
                private RedisCommandDescription redisCommandDescription;
 

Reply via email to