[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2245 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71869640 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -105,18 +105,13 @@ private void testDownBehavior(FlinkJedisConfigBase config) throws Exception { try { redisSink.open(new Configuration()); } catch (Throwable e) { - Throwable init = e; - // search for nested JedisConnectionExceptions + // search for the JedisConnectionExceptions // because this is the expected behavior - - int depth = 0; - while (!(e instanceof JedisConnectionException)) { - e = e.getCause(); - if (e == null || depth++ == 20) { - init.printStackTrace(); - fail("Test Failed: " + init.getMessage()); - } + + if (!(e instanceof JedisConnectionException)) { + e.printStackTrace(); + fail("Test Failed: " + e.getMessage()); --- End diff -- I also thought so that is why I had it like this but I thought your previous comment was to remove it. Probably i misunderstood. Actually the exception was in the first layer since the beginning. I can change it back ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71869391 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -105,18 +105,13 @@ private void testDownBehavior(FlinkJedisConfigBase config) throws Exception { try { redisSink.open(new Configuration()); } catch (Throwable e) { - Throwable init = e; - // search for nested JedisConnectionExceptions + // search for the JedisConnectionExceptions // because this is the expected behavior - - int depth = 0; - while (!(e instanceof JedisConnectionException)) { - e = e.getCause(); - if (e == null || depth++ == 20) { - init.printStackTrace(); - fail("Test Failed: " + init.getMessage()); - } + + if (!(e instanceof JedisConnectionException)) { + e.printStackTrace(); + fail("Test Failed: " + e.getMessage()); --- End diff -- Why did you change the `while` loop into a `if` condition. Did change something that the `JedisConnectionException` will always be the top level exception? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71869449 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -105,18 +105,13 @@ private void testDownBehavior(FlinkJedisConfigBase config) throws Exception { try { redisSink.open(new Configuration()); } catch (Throwable e) { - Throwable init = e; - // search for nested JedisConnectionExceptions + // search for the JedisConnectionExceptions // because this is the expected behavior - - int depth = 0; - while (!(e instanceof JedisConnectionException)) { - e = e.getCause(); - if (e == null || depth++ == 20) { - init.printStackTrace(); - fail("Test Failed: " + init.getMessage()); - } + + if (!(e instanceof JedisConnectionException)) { + e.printStackTrace(); + fail("Test Failed: " + e.getMessage()); --- End diff -- I think it is better to let the exception bubble up instead of printing the stack trace and calling `fail`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71698908 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -103,15 +105,17 @@ private void testDownBehavior(FlinkJedisConfigBase config) throws Exception { try { redisSink.open(new Configuration()); } catch (Throwable e) { + Throwable init = e; - // search for nested ConnectionExceptions + // search for nested JedisConnectionExceptions // because this is the expected behavior int depth = 0; while (!(e instanceof JedisConnectionException)) { - Throwable cause = e.getCause(); - if (cause == null || depth++ == 20) { - throw e; + e = e.getCause(); + if (e == null || depth++ == 20) { + init.printStackTrace(); + fail("Test Failed: " + init.getMessage()); --- End diff -- Why don't we simply throw the initial exception here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71512375 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -41,6 +50,73 @@ public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){ new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH))); } + @Test + public void testRedisDownBehavior() throws Exception { + + // create a wrong configuration so that open() fails. + + FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder() + .setHost("127.0.0.1") + .setPort(1234).build(); + + testDownBehavior(wrongJedisPoolConfig); + } + + @Test + public void testRedisClusterDownBehavior() throws Exception { + + Set hosts = new HashSet<>(); + hosts.add(new InetSocketAddress("127.0.0.1", 1234)); + + // create a wrong configuration so that open() fails. + + FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder() + .setNodes(hosts) + .setTimeout(100) + .setMaxIdle(1) + .setMaxTotal(1) + .setMinIdle(1).build(); + + testDownBehavior(wrongJedisClusterConfig); + } + + @Test + public void testRedisSentinelDownBehavior() throws Exception { + + Set hosts = new HashSet<>(); + hosts.add("localhost:55095"); + + // create a wrong configuration so that open() fails. + + FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder() + .setMasterName("master") + .setSentinels(hosts) + .build(); + + testDownBehavior(wrongJedisSentinelConfig); + } + + private void testDownBehavior(FlinkJedisConfigBase config) throws Exception { + RedisSink> redisSink = new RedisSink<>(config, + new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD)); + + try { + redisSink.open(new Configuration()); + } catch (Throwable e) { + + // search for nested ConnectionExceptions + // because this is the expected behavior + + int depth = 0; + while (!(e instanceof JedisConnectionException)) { --- End diff -- This condition will always be true/false if you don't reset `e = cause` in the while loop body. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71379345 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool sentinelPool) { */ @Override public void close() throws IOException { - if (this.jedisPool != null) { - this.jedisPool.close(); - } - if (this.jedisSentinelPool != null) { - this.jedisSentinelPool.close(); - } + getInstance().close(); --- End diff -- Yes because from the constructors, only one of the `jedisSentinelPool` or `jedisPool` can be non-null value and that is what `getInstance()` returns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71159044 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool sentinelPool) { */ @Override public void close() throws IOException { - if (this.jedisPool != null) { - this.jedisPool.close(); - } - if (this.jedisSentinelPool != null) { - this.jedisSentinelPool.close(); - } + getInstance().close(); + } + + @Override + public void open() throws Exception { + getInstance().echo("Test"); --- End diff -- Is this the only way to test a connection? If so, then it should be documented properly that this is used to test the connection. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71157963 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool sentinelPool) { */ --- End diff -- The `RedisContainer` class contains fields which are not serializable. The `jedisPool` and the `jedisSentinelPool`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71158368 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -47,6 +47,11 @@ public RedisClusterContainer(JedisCluster jedisCluster) { } --- End diff -- `jedisCluster` is not serializable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71157753 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool sentinelPool) { */ @Override public void close() throws IOException { - if (this.jedisPool != null) { - this.jedisPool.close(); - } - if (this.jedisSentinelPool != null) { - this.jedisSentinelPool.close(); - } + getInstance().close(); + } + + @Override + public void open() throws Exception { + getInstance().echo("Test"); --- End diff -- Yes. It tries to open a connection and just echo the message "Test". This is to see if there is a way to communicate with the cluster or server (depending on the setup). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71157765 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool sentinelPool) { */ @Override public void close() throws IOException { - if (this.jedisPool != null) { - this.jedisPool.close(); - } - if (this.jedisSentinelPool != null) { - this.jedisSentinelPool.close(); - } + getInstance().close(); --- End diff -- Is this equivalent to the old `close` method definition? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71157239 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool sentinelPool) { */ @Override public void close() throws IOException { - if (this.jedisPool != null) { - this.jedisPool.close(); - } - if (this.jedisSentinelPool != null) { - this.jedisSentinelPool.close(); - } + getInstance().close(); + } + + @Override + public void open() throws Exception { + getInstance().echo("Test"); --- End diff -- `echo("Test")`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71157090 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -47,6 +47,11 @@ public RedisClusterContainer(JedisCluster jedisCluster) { } @Override + public void open() throws Exception { + jedisCluster.echo("Test"); --- End diff -- testing artifact? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2245#discussion_r71156944 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -166,7 +166,13 @@ public void invoke(IN input) throws Exception { */ @Override public void open(Configuration parameters) throws Exception { - this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); + try { + this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); + this.redisCommandsContainer.open(); + } catch (Exception e) { + LOG.error("Redis has not been properly initialized."); --- End diff -- The exception should be logged here as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2245 [hotfix] Fix Redis Sink to fail at opening if Redis is not initialized. Before if Redis was not initialized, the job would start and would wait until the first element arrives in order to realize that Redis is not running. This fixes that by checking the connection to redis during the opening phase. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink redis_check Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2245.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2245 commit 9a31902de93732d2e3c39d3e23ed3e34162795ec Author: kl0uDate: 2016-07-13T16:40:17Z [hotfix] Fix Redis Sink to fail at opening if Redis is not initialized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---