[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2245


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-22 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71869640
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -105,18 +105,13 @@ private void testDownBehavior(FlinkJedisConfigBase 
config) throws Exception {
try {
redisSink.open(new Configuration());
} catch (Throwable e) {
-   Throwable init = e;
 
-   // search for nested JedisConnectionExceptions
+   // search for the JedisConnectionExceptions
// because this is the expected behavior
-
-   int depth = 0;
-   while (!(e instanceof JedisConnectionException)) {
-   e = e.getCause();
-   if (e == null || depth++ == 20) {
-   init.printStackTrace();
-   fail("Test Failed: " + 
init.getMessage());
-   }
+   
+   if (!(e instanceof JedisConnectionException)) {
+   e.printStackTrace();
+   fail("Test Failed: " + e.getMessage());
--- End diff --

I also thought so that is why I had it like this but I thought your 
previous comment was to remove it.
Probably i misunderstood. Actually the exception was in the first layer 
since the beginning.
I can change it back ;)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71869391
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -105,18 +105,13 @@ private void testDownBehavior(FlinkJedisConfigBase 
config) throws Exception {
try {
redisSink.open(new Configuration());
} catch (Throwable e) {
-   Throwable init = e;
 
-   // search for nested JedisConnectionExceptions
+   // search for the JedisConnectionExceptions
// because this is the expected behavior
-
-   int depth = 0;
-   while (!(e instanceof JedisConnectionException)) {
-   e = e.getCause();
-   if (e == null || depth++ == 20) {
-   init.printStackTrace();
-   fail("Test Failed: " + 
init.getMessage());
-   }
+   
+   if (!(e instanceof JedisConnectionException)) {
+   e.printStackTrace();
+   fail("Test Failed: " + e.getMessage());
--- End diff --

Why did you change the `while` loop into a `if` condition. Did change 
something that the `JedisConnectionException` will always be the top level 
exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71869449
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -105,18 +105,13 @@ private void testDownBehavior(FlinkJedisConfigBase 
config) throws Exception {
try {
redisSink.open(new Configuration());
} catch (Throwable e) {
-   Throwable init = e;
 
-   // search for nested JedisConnectionExceptions
+   // search for the JedisConnectionExceptions
// because this is the expected behavior
-
-   int depth = 0;
-   while (!(e instanceof JedisConnectionException)) {
-   e = e.getCause();
-   if (e == null || depth++ == 20) {
-   init.printStackTrace();
-   fail("Test Failed: " + 
init.getMessage());
-   }
+   
+   if (!(e instanceof JedisConnectionException)) {
+   e.printStackTrace();
+   fail("Test Failed: " + e.getMessage());
--- End diff --

I think it is better to let the exception bubble up instead of printing the 
stack trace and calling `fail`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71698908
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -103,15 +105,17 @@ private void testDownBehavior(FlinkJedisConfigBase 
config) throws Exception {
try {
redisSink.open(new Configuration());
} catch (Throwable e) {
+   Throwable init = e;
 
-   // search for nested ConnectionExceptions
+   // search for nested JedisConnectionExceptions
// because this is the expected behavior
 
int depth = 0;
while (!(e instanceof JedisConnectionException)) {
-   Throwable cause = e.getCause();
-   if (cause == null || depth++ == 20) {
-   throw e;
+   e = e.getCause();
+   if (e == null || depth++ == 20) {
+   init.printStackTrace();
+   fail("Test Failed: " + 
init.getMessage());
--- End diff --

Why don't we simply throw the initial exception here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71512375
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -41,6 +50,73 @@ public void 
shouldThrowNullPointerExceptionIfConfigurationIsNull(){
new RedisSink<>(null, new TestMapper(new 
RedisCommandDescription(RedisCommand.LPUSH)));
}
 
+   @Test
+   public void testRedisDownBehavior() throws Exception {
+
+   // create a wrong configuration so that open() fails.
+
+   FlinkJedisPoolConfig wrongJedisPoolConfig = new 
FlinkJedisPoolConfig.Builder()
+   .setHost("127.0.0.1")
+   .setPort(1234).build();
+
+   testDownBehavior(wrongJedisPoolConfig);
+   }
+
+   @Test
+   public void testRedisClusterDownBehavior() throws Exception {
+
+   Set hosts = new HashSet<>();
+   hosts.add(new InetSocketAddress("127.0.0.1", 1234));
+
+   // create a wrong configuration so that open() fails.
+
+   FlinkJedisClusterConfig wrongJedisClusterConfig = new 
FlinkJedisClusterConfig.Builder()
+   .setNodes(hosts)
+   .setTimeout(100)
+   .setMaxIdle(1)
+   .setMaxTotal(1)
+   .setMinIdle(1).build();
+
+   testDownBehavior(wrongJedisClusterConfig);
+   }
+
+   @Test
+   public void testRedisSentinelDownBehavior() throws Exception {
+
+   Set hosts = new HashSet<>();
+   hosts.add("localhost:55095");
+
+   // create a wrong configuration so that open() fails.
+
+   FlinkJedisSentinelConfig wrongJedisSentinelConfig = new 
FlinkJedisSentinelConfig.Builder()
+   .setMasterName("master")
+   .setSentinels(hosts)
+   .build();
+
+   testDownBehavior(wrongJedisSentinelConfig);
+   }
+
+   private void testDownBehavior(FlinkJedisConfigBase config) throws 
Exception {
+   RedisSink> redisSink = new 
RedisSink<>(config,
+   new 
RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
+
+   try {
+   redisSink.open(new Configuration());
+   } catch (Throwable e) {
+
+   // search for nested ConnectionExceptions
+   // because this is the expected behavior
+
+   int depth = 0;
+   while (!(e instanceof JedisConnectionException)) {
--- End diff --

This condition will always be true/false if you don't reset `e = cause` in 
the while loop body.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-19 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71379345
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool 
sentinelPool) {
 */
@Override
public void close() throws IOException {
-   if (this.jedisPool != null) {
-   this.jedisPool.close();
-   }
-   if (this.jedisSentinelPool != null) {
-   this.jedisSentinelPool.close();
-   }
+   getInstance().close();
--- End diff --

Yes because from the constructors, only one of the `jedisSentinelPool` or 
`jedisPool` can be non-null value and that is what `getInstance()` returns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71159044
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool 
sentinelPool) {
 */
@Override
public void close() throws IOException {
-   if (this.jedisPool != null) {
-   this.jedisPool.close();
-   }
-   if (this.jedisSentinelPool != null) {
-   this.jedisSentinelPool.close();
-   }
+   getInstance().close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   getInstance().echo("Test");
--- End diff --

Is this the only way to test a connection? If so, then it should be 
documented properly that this is used to test the connection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71157963
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool 
sentinelPool) {
 */
--- End diff --

The `RedisContainer` class contains fields which are not serializable. The 
`jedisPool` and the `jedisSentinelPool`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71158368
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -47,6 +47,11 @@ public RedisClusterContainer(JedisCluster jedisCluster) {
}
--- End diff --

`jedisCluster` is not serializable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-18 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71157753
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool 
sentinelPool) {
 */
@Override
public void close() throws IOException {
-   if (this.jedisPool != null) {
-   this.jedisPool.close();
-   }
-   if (this.jedisSentinelPool != null) {
-   this.jedisSentinelPool.close();
-   }
+   getInstance().close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   getInstance().echo("Test");
--- End diff --

Yes. It tries to open a connection and just echo the message "Test". 
This is to see if there is a way to communicate with the cluster or server 
(depending on the setup).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71157765
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool 
sentinelPool) {
 */
@Override
public void close() throws IOException {
-   if (this.jedisPool != null) {
-   this.jedisPool.close();
-   }
-   if (this.jedisSentinelPool != null) {
-   this.jedisSentinelPool.close();
-   }
+   getInstance().close();
--- End diff --

Is this equivalent to the old `close` method definition?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71157239
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -68,12 +68,12 @@ public RedisContainer(final JedisSentinelPool 
sentinelPool) {
 */
@Override
public void close() throws IOException {
-   if (this.jedisPool != null) {
-   this.jedisPool.close();
-   }
-   if (this.jedisSentinelPool != null) {
-   this.jedisSentinelPool.close();
-   }
+   getInstance().close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   getInstance().echo("Test");
--- End diff --

`echo("Test")`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71157090
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -47,6 +47,11 @@ public RedisClusterContainer(JedisCluster jedisCluster) {
}
 
@Override
+   public void open() throws Exception {
+   jedisCluster.echo("Test");
--- End diff --

testing artifact?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2245#discussion_r71156944
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -166,7 +166,13 @@ public void invoke(IN input) throws Exception {
  */
@Override
public void open(Configuration parameters) throws Exception {
-   this.redisCommandsContainer = 
RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+   try {
+   this.redisCommandsContainer = 
RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+   this.redisCommandsContainer.open();
+   } catch (Exception e) {
+   LOG.error("Redis has not been properly initialized.");
--- End diff --

The exception should be logged here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2245: [hotfix] Fix Redis Sink to fail at opening if Redi...

2016-07-14 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2245

[hotfix] Fix Redis Sink to fail at opening if Redis is not initialized.

Before if Redis was not initialized, the job would start and would wait 
until the first element arrives in order to realize that Redis is not running. 
This fixes that by checking the connection to redis during the opening phase.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink redis_check

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2245.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2245


commit 9a31902de93732d2e3c39d3e23ed3e34162795ec
Author: kl0u 
Date:   2016-07-13T16:40:17Z

[hotfix] Fix Redis Sink to fail at opening if Redis is not initialized.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---