[GitHub] flink issue #3615: [FLINK-2720] support flink-storm metrics

2017-04-02 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/3615
  
@RalphSu Do you have an JIRA id so we can assign the ticket to you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-04-02 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1157
  
@HuangWHWHW Can you please close this PR? @RalphSu is working on this now 
(cf #3615 ). Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3615: [FLINK-2720] support flink-storm metrics

2017-04-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/3615#discussion_r109326550
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/CounterMetricAdapter.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+
+import backtype.storm.metric.api.CountMetric;
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * An adapter to compose the counter metric between flink counter and 
storm counter metric.
+ * 
+ * @since Mar 5, 2017
+ *
+ */
+public class CounterMetricAdapter extends CountMetric implements IMetric, 
Counter, IMetricAdapter {
--- End diff --

Why does this class inherit from `Counter`? I seems to be sufficient that 
it as a member `Counter`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3615: [FLINK-2720] support flink-storm metrics

2017-04-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/3615#discussion_r109326571
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/CounterMetricAdapter.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+
+import backtype.storm.metric.api.CountMetric;
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * An adapter to compose the counter metric between flink counter and 
storm counter metric.
+ * 
+ * @since Mar 5, 2017
+ *
+ */
+public class CounterMetricAdapter extends CountMetric implements IMetric, 
Counter, IMetricAdapter {
+
+/** the current count */
+// private long count;
--- End diff --

Nit: remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3615: [FLINK-2720] support flink-storm metrics

2017-04-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/3615#discussion_r109326653
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/IMetricAdapter.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.metric;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.storm.metric.api.IMetric;
+
+
+/**
+ * An adapter interface that returns storm and flink metric instance that 
back eath other.
+ * 
+ * @since Mar 25, 2017
+ *
+ */
+public interface IMetricAdapter {
--- End diff --

I don't understand the purpose of this interface. IMHO, we don't need it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3615: [FLINK-2720] support flink-storm metrics

2017-04-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/3615#discussion_r109326559
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/CounterMetricAdapter.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+
+import backtype.storm.metric.api.CountMetric;
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * An adapter to compose the counter metric between flink counter and 
storm counter metric.
+ * 
+ * @since Mar 5, 2017
--- End diff --

Nit: remove this line. (also in other classes)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3615: [FLINK-2720] support flink-storm metrics

2017-04-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/3615#discussion_r109326772
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/MultiCountMetricAdapter.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.metric;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import backtype.storm.metric.api.MultiCountMetric;
+
+/**
+ * @since Mar 5, 2017
+ *
+ */
+public class MultiCountMetricAdapter extends MultiCountMetric implements 
Metric, IMetricAdapter {
+
+private Map<String, CounterMetricAdapter> _value = new HashMap<>();
--- End diff --

Nit: rename `_value` -> `counterPerKey`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3615: [FLINK-2720] support flink-storm metrics

2017-04-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/3615#discussion_r109326583
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/CounterMetricAdapter.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+
+import backtype.storm.metric.api.CountMetric;
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * An adapter to compose the counter metric between flink counter and 
storm counter metric.
+ * 
+ * @since Mar 5, 2017
+ *
+ */
+public class CounterMetricAdapter extends CountMetric implements IMetric, 
Counter, IMetricAdapter {
+
+/** the current count */
+// private long count;
+
+private Counter delegate;
+
+public CounterMetricAdapter(Counter counter) {
--- End diff --

nit: add `final` to all parameter (do this for all methods)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3615: [FLINK-2720] support flink-storm metrics

2017-04-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/3615#discussion_r109326716
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/metric/MetricConvert.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.storm.metric.api.CountMetric;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.MultiCountMetric;
+
+/**
+ * @since Mar 25, 2017
+ *
+ */
+public class MetricConvert {
+
+   /**
+* 
+* @param name
+* @param originalMetric
+* @param context
+* @return
+*/
+   @SuppressWarnings("unchecked")
+   public static  IMetricAdapter 
convertFlinkAdapter(String name, T originalMetric,
+   StreamingRuntimeContext context) {
+   if (originalMetric instanceof CountMetric) {
+   Counter flinkc = context.getMetricGroup().counter(name);
+   return (IMetricAdapter) new 
CounterMetricAdapter(flinkc);
+   } else if (originalMetric instanceof MultiCountMetric) {
+   return (IMetricAdapter) new 
MultiCountMetricAdapter(context);
+   }
+   return null;
--- End diff --

I guess it's better to throw an exception here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-03-06 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1157
  
@RalphSu We should wait for @HuangWHWHW to reply... If no response comes 
within 3 days you can take over. Also, the JIRA should get assigned to you for 
this case: https://issues.apache.org/jira/browse/FLINK-2720


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-03-05 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1157
  
@HuangWHWHW Are you still working on this? @RalphSu Are you interested in 
talking this over?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-01-13 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1157
  
Did not have a chance to work on this either -- I am in a new job, too :)

You can pick this up again if you are still interested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1617: [FLINK-3035] Redis as State Backend

2016-10-22 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1617
  
I nobody is interested in the feature and Redis is not a good fix anyway, 
we can close. If the situation changes, we can always reopen or create a new 
JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2246: [hotfix] [doc] fixed example

2016-07-15 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/2246
  
@rmetzger yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2246: [hotfix] [doc] fixed example

2016-07-14 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/flink/pull/2246

[hotfix] [doc] fixed example

Should also be merged back to `1.0` documentation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/flink hotfix-doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2246.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2246


commit 3587abf4b7465080ce43411f869c00ccf3fc4898
Author: mjsax <mj...@apache.org>
Date:   2016-07-14T09:24:28Z

[hotfix] [doc] fixed example




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-07-07 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
Thanks @subhankarb ! Great work.
Thanks @tzulitai for helping with reviewing!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-07-06 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@rmetzger LGTM.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@subhankarb two tiny comments
@tzulitai @rmetzger Any more comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69464742
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.junit.Test;
+
+public class FlinkJedisConfigBaseTest {
--- End diff --

extends TestLogger


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69464673
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.Test;
+
+public class RedisSinkTest {
--- End diff --

extends TestLogger


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@subhankarb I think one more pass and we are good to go!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69432240
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class JedisClusterConfigTest extends TestLogger {
+
+   @Test(expected = NullPointerException.class)
+   public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
+   FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+   builder.build();
+   }
--- End diff --

Add test for new check `isEmpty()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69431563
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
 ---
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkITCase extends RedisITCaseBase {
+
+   private FlinkJedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisCommandMapper(RedisCommand.LPUSH));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisCommandMapper(RedisCommand.SADD));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisCommandMapper(RedisCommand.PFADD));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void 

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69430812
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, please use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Please use the second 
constructor {@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   @Override
+   public void hset(final String key, final String hashField, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(key, hashField, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to key {} and hashField {} error message {}",
+   key, hashField, e.getMessage());
+   }
+   throw e;
+   } finally {
+   releaseInstance(jedis);
+   }
+   }
+
+   @Override
+   public void rpush(final String listName, final String value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.rpush(listName, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message {}",
+   listName, e.getMessage());
+   }
+   throw e;
+   } finally {
+   releaseInstance(jedis);
+ 

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69430592
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, please use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Please use the second 
constructor {@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
--- End diff --

make both `final` and set to `null` in constructors accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69430279
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis Commands.
--- End diff --

Nit: [c]ommands


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69430042
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int soTimeout;
+   private String password;
+   private int database;
+
+   /**
+* Jedis Sentinels config.
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+*
+* @param masterName master name of the replica set
+* @param sentinels set of sentinel hosts
+* @param connectionTimeout timeout connection timeout
+* @param soTimeout timeout socket timeout
+* @param password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+*
+* @throws NullPointerException if {@code masterName} or {@code 
sentinels} is {@code null}
+* @throws IllegalArgumentException if {@code sentinels} are empty
+*/
+   private FlinkJedisSentinelConfig(String masterName, Set 
sentinels,
+   int 
connectionTimeout, int soTimeout,
+   String 
password, int database,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+   Preconditions.checkNotNull(masterName, "Master name should be 
presented");
+   Preconditions.checkNotNull(sentinels, "Sentinels information 
should be presented");
+   Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel 
hosts should not be empty");
+
+   this.masterName = masterName;
+   this.sentinels = sentinels;
--- End diff --

Should we copy this `Set` to guard against external modifications?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429926
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int soTimeout;
+   private String password;
+   private int database;
--- End diff --

`final` to all


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429896
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
--- End diff --

Nit: [p]ool


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429657
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int database;
+   private String password;
--- End diff --

`final` to all


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429555
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+/**
+ * Configuration for Jedis Pool.
--- End diff --

Nit: [p]ool


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429234
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+
+   protected int maxTotal;
+   protected int maxIdle;
+   protected int minIdle;
+   protected int connectionTimeout;
+
+   protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle){
+   this.connectionTimeout = connectionTimeout;
+   this.maxTotal = maxTotal;
+   this.maxIdle = maxIdle;
+   this.minIdle = minIdle;
+   }
--- End diff --

Should we check for negative values?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428684
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
+
+
+   /**
+* Jedis cluster configuration.
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes list of node information for JedisCluster
+* @param connectionTimeout socket / connection timeout. The default is 
2000
+* @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+* @param maxTotal the maximum number of objects that can be allocated 
by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+* @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+*/
+   private FlinkJedisClusterConfig(Set nodes, int 
connectionTimeout, int maxRedirections,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+   Preconditions.checkNotNull(nodes, "Node information should be 
presented");
--- End diff --

Should we throw on `nodes.isEmpty()` ?
Maybe also make a copy of the `Set` to ensure it cannot be altered from 
outside?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428463
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
--- End diff --

`final`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428451
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
--- End diff --

`final`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428276
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
--- End diff --

Nit: [c]luster


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428038
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *  The sink takes two arguments {@link FlinkJedisConfigBase} and 
{@link RedisMapper}.
+ *  When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ *  When {@link FlinkJedisSentinelConfig} is passed as the first 
argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this 
when you want to connect to Sentinel.
+ *  Please use {@link FlinkJedisClusterConfig} as the first argument if 
you want to connect to
+ * a Redis Cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisCommandDescription getCommandDescription() {
+ * return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2<String, String> data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2<String, String> data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+* {@code

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69427880
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *  The sink takes two arguments {@link FlinkJedisConfigBase} and 
{@link RedisMapper}.
+ *  When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ *  When {@link FlinkJedisSentinelConfig} is passed as the first 
argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this 
when you want to connect to Sentinel.
+ *  Please use {@link FlinkJedisClusterConfig} as the first argument if 
you want to connect to
+ * a Redis Cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisCommandDescription getCommandDescription() {
+ * return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2<String, String> data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2<String, String> data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+* {@code

[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-29 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@rmetzger What about this failing tests... 
```
JMXReporterTest.testJMXAvailability:148 » Runtime Could not start JMX 
server o...
```
Seems, there is no JIRA -- known issue? -- or no issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-29 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69023616
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
 ---
@@ -87,13 +87,15 @@ public void testRedisSentinelOperation() {
 
@After
public void tearDown() throws IOException {
-   if (jedisSentinelPool != null)
+   if (jedisSentinelPool != null){
--- End diff --

missing blank between `){`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-29 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69022373
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
+
+   /**
+* Insert the specified value at the head of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operations.
+*/
+   LPUSH(RedisDataType.LIST),
+   /**
+* Insert the specified value at the tail of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*/
+   RPUSH(RedisDataType.LIST),
+
+   /**
+* Add the specified member to the set stored at key.
+* Specified member that is already a member of this set is ignored.
+*/
+   SADD(RedisDataType.SET),
+
+   /**
+* Set key to hold the string value. If key already holds a value,
+* it is overwritten, regardless of its type.
+*/
+   SET(RedisDataType.STRING),
+
+   /**
+* Adds the element to the HyperLogLog data structure stored at the 
variable name specified as first argument.
+*/
+   PFADD(RedisDataType.HYPER_LOG_LOG),
+
+   /**
+* Posts a message to the given channel.
+*/
+   PUBLISH(RedisDataType.PUBSUB),
+
+   /**
+* Adds the specified members with the specified score to the sorted 
set stored at key.
+*/
+   ZADD(RedisDataType.SORTED_SET),
+
+   /**
+* Sets field in the hash stored at key to value. If key does not exist,
+* a new key holding a hash is created. If field already exists in the 
hash, it is overwritten.
+*/
+   HSET(RedisDataType.HASH);
+
+   /**
+* The {@link RedisDataType} this command belongs to
+*/
+   private RedisDataType redisDataType;
+
+   RedisCommand(RedisDataType redisDataType) {
+   this.redisDataType = redisDataType;
--- End diff --

`null` check missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-29 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69022016
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
+
+   /**
+* Insert the specified value at the head of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operations.
+*/
+   LPUSH(RedisDataType.LIST),
+   /**
--- End diff --

nit: missing empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-27 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
My two cents:
1) seems to got sorted out (thx @tzulitai for the input!)
2) I personally do not care too much about the name conflict. Reusing the 
same class for sink and source sounds reasonable. Maybe `FlinkRedisConfig` as 
name?
3) Agreed. `PUBSUB` as datatpye is fine IMHO (so we can use same type for 
source and sink, which makes it clear if PUB or SUB is used)
4 + 5 ) Your suggestions sound good to me. Please update PR so we can get 
better lock and feel for it :)

@tzulitai @subhankarb Thanks a lot!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-25 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@tzulitai Thank a lot for testing this! Your feedback is really great!

1) I am not a Redis users either -- from my understanding, the second key 
determined the field that would be updated -- it makes sense to me, that this 
field is the same for all updates (ie, independent of the data itself). Thus, 
it would not make sense to extract it from the record via `RedisMapper`.
2) Might be confusing for Redis users (I did not stumble over it as a 
no-user ;)) -- renaming seems reasonable -- maybe `RedisSinkConfig` ?
3+4) Well, the data type is the same, no matter if you want to read or 
write it. But I agree, that using the date type itself in `RedisSink` might not 
be sufficient. I think the idea to support `LPUSH` was, that Flink as a 
streaming system might do best to append at the tail, not at the head... 
5) Similar to my answer to (4) -- the data type is the same -- 
independently what operation you perform on it. And you state yourself, that 
`RedisMapper` maps the type to the command/action.
6) See my answer to (1)

As a non-Redis users, it is hard for me to judge what flexibility is 
required/expected by users (ie, secondary key per record or not?). It also 
seems, that not all available action (LPUSH vs RPUSH) are implemented. How 
important is it, to support all action -- do you think, that the current once 
cover the most basic subset to get started with. It might be good, to just 
start with a subset of commands, and add new commands later on (just a 
suggestion). And do the think the implemented actions for each type are the 
most useful? If we want to support multiple different actions per data type, it 
would be a larger refactoring of this code I think.

@subhankarb @rmetzger What is your opinion on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-24 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@rmetzger Thanks :)
I did not test with a Redis cluster or similar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-24 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
Please address last comment. Otherwise, LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-24 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68412669
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
--- End diff --

You can actually remove the whole JavaDoc comment (and it will inherit 
automatically) If you specify a new JavaDoc it replaces the old one; using 
`inheritDoc` tag you can include the original JavaDoc to extent it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
Most comments are nit or minor. Please fix. Otherwise, looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622740
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void test

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622731
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void test

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622738
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void test

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622734
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void test

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622724
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void test

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622681
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleDataMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return String.valueOf(data.f0);
+ *}
+ *public String getValueFromData(Tuple2 data) {
+ *return String.valueOf(data.f1);
--- End diff --

return data.f0;


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622668
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates build the description how the input data should 
be mapped to redis type
+ *Example:
+ *{@code
+ *private static class RedisTestMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new 
RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.PUBSUB);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return String.valueOf(data.f0);
--- End diff --

return data.f0;


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622673
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates build the description how the input data should 
be mapped to redis type
+ *Example:
+ *{@code
+ *private static class RedisTestMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new 
RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.PUBSUB);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return String.valueOf(data.f0);
+ *}
+ *public String getValueFromData(Tuple2 data) {
+ *return String.valueOf(data.f1);
--- End diff --

return data.f0;


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622678
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleDataMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return String.valueOf(data.f0);
--- End diff --

return data.f0;


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622660
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void test

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622665
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishTest.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishTest extends RedisTestBase {
+
+   private static final int NUM_ELEMENTS = 20;
+   private static final String REDIS_CHANNEL = "CHANNEL";
+
+   private static final List sourceList = new ArrayList<>();
+   private Thread sinkThread;
+   private PubSub pubSub;
+
+@Before
+public void before() throws Exception {
+pubSub = new PubSub();
+sinkThread = new Thread(new Subscribe(pubSub));
+}
+
+@Test
+public void redisSinkTest() throws Exception {
+sinkThread.start();
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig, new RedisTestMapper());
+
+source.addSink(redisSink);
+
+env.execute("Redis Sink Test");
+
+assertEquals(NUM_ELEMENTS, sourceList.size());
+}
+
+@After
+public void after() throws Exception {
+pubSub.unsubscribe();
+sinkThread.join();
+sourceList.clear();
+}
+
+private class Subscribe implements Runnable {
+private PubSub localPubSub;
+private Subscribe(PubSub pubSub){
+this.localPubSub = pubSub;
+}
+
+@Override
+public void run() {
+JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
+}
+}
+
+private static class TestSourceFunction implements 
SourceFunction<Tuple2<String, String>> {
+private static final long serialVersionUID = 1L;
+
+private volatile boolean running = true;
+
+@Override
+public void run(SourceContext<Tuple2<String, String>> ctx) throws 
Exception {
+for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
+}
+}
+
+@Override
+public void cancel() {
+running = false;
+}
+}
+
+public static class PubSub extends JedisPubSub {
+

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622663
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishTest.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishTest extends RedisTestBase {
+
+   private static final int NUM_ELEMENTS = 20;
+   private static final String REDIS_CHANNEL = "CHANNEL";
+
+   private static final List sourceList = new ArrayList<>();
+   private Thread sinkThread;
+   private PubSub pubSub;
+
+@Before
+public void before() throws Exception {
+pubSub = new PubSub();
+sinkThread = new Thread(new Subscribe(pubSub));
+}
+
+@Test
+public void redisSinkTest() throws Exception {
+sinkThread.start();
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig, new RedisTestMapper());
+
+source.addSink(redisSink);
+
+env.execute("Redis Sink Test");
+
+assertEquals(NUM_ELEMENTS, sourceList.size());
+}
+
+@After
+public void after() throws Exception {
+pubSub.unsubscribe();
+sinkThread.join();
+sourceList.clear();
+}
+
+private class Subscribe implements Runnable {
+private PubSub localPubSub;
+private Subscribe(PubSub pubSub){
+this.localPubSub = pubSub;
+}
+
+@Override
+public void run() {
+JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
+}
+}
+
+private static class TestSourceFunction implements 
SourceFunction<Tuple2<String, String>> {
+private static final long serialVersionUID = 1L;
+
+private volatile boolean running = true;
+
+@Override
+public void run(SourceContext<Tuple2<String, String>> ctx) throws 
Exception {
+for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
+}
+}
+
+@Override
+public void cancel() {
+running = false;
+}
+}
+
+public static class PubSub extends JedisPubSub {
+

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622657
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void test

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622655
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void test

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622605
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishTest.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishTest extends RedisTestBase {
+
+   private static final int NUM_ELEMENTS = 20;
+   private static final String REDIS_CHANNEL = "CHANNEL";
+
+   private static final List sourceList = new ArrayList<>();
+   private Thread sinkThread;
+   private PubSub pubSub;
+
+@Before
+public void before() throws Exception {
+pubSub = new PubSub();
+sinkThread = new Thread(new Subscribe(pubSub));
+}
+
+@Test
+public void redisSinkTest() throws Exception {
+sinkThread.start();
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig, new RedisTestMapper());
+
+source.addSink(redisSink);
+
+env.execute("Redis Sink Test");
+
+assertEquals(NUM_ELEMENTS, sourceList.size());
+}
+
+@After
+public void after() throws Exception {
+pubSub.unsubscribe();
+sinkThread.join();
+sourceList.clear();
+}
+
+private class Subscribe implements Runnable {
+private PubSub localPubSub;
+private Subscribe(PubSub pubSub){
+this.localPubSub = pubSub;
+}
+
+@Override
+public void run() {
+JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
+}
+}
+
+private static class TestSourceFunction implements 
SourceFunction<Tuple2<String, String>> {
+private static final long serialVersionUID = 1L;
+
+private volatile boolean running = true;
+
+@Override
+public void run(SourceContext<Tuple2<String, String>> ctx) throws 
Exception {
+for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
+}
+}
+
+@Override
+public void cancel() {
+running = false;
+}
+}
+
+public static class PubSub extends JedisPubSub {
+

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622604
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishTest.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishTest extends RedisTestBase {
+
+   private static final int NUM_ELEMENTS = 20;
+   private static final String REDIS_CHANNEL = "CHANNEL";
+
+   private static final List sourceList = new ArrayList<>();
+   private Thread sinkThread;
+   private PubSub pubSub;
+
+@Before
+public void before() throws Exception {
+pubSub = new PubSub();
+sinkThread = new Thread(new Subscribe(pubSub));
+}
+
+@Test
+public void redisSinkTest() throws Exception {
+sinkThread.start();
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+
+   RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig, new RedisTestMapper());
+
+source.addSink(redisSink);
+
+env.execute("Redis Sink Test");
+
+assertEquals(NUM_ELEMENTS, sourceList.size());
+}
+
+@After
+public void after() throws Exception {
+pubSub.unsubscribe();
+sinkThread.join();
+sourceList.clear();
+}
+
+private class Subscribe implements Runnable {
+private PubSub localPubSub;
+private Subscribe(PubSub pubSub){
+this.localPubSub = pubSub;
+}
+
+@Override
+public void run() {
+JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
+}
+}
+
+private static class TestSourceFunction implements 
SourceFunction<Tuple2<String, String>> {
+private static final long serialVersionUID = 1L;
+
+private volatile boolean running = true;
+
+@Override
+public void run(SourceContext<Tuple2<String, String>> ctx) throws 
Exception {
+for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
+}
+}
+
+@Override
+public void cancel() {
+running = false;
+}
+}
+
+public static class PubSub extends JedisPubSub {
+

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622590
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishTest.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishTest extends RedisTestBase {
+
+   private static final int NUM_ELEMENTS = 20;
+   private static final String REDIS_CHANNEL = "CHANNEL";
+
+   private static final List sourceList = new ArrayList<>();
+   private Thread sinkThread;
+   private PubSub pubSub;
+
+@Before
+public void before() throws Exception {
+pubSub = new PubSub();
+sinkThread = new Thread(new Subscribe(pubSub));
+}
+
+@Test
+public void redisSinkTest() throws Exception {
--- End diff --

nit: fix indention below


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622557
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates build the description how the input data should 
be mapped to redis type
+ *Example:
+ *{@code
+ *private static class RedisTestMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new 
RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.PUBSUB);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return String.valueOf(data.f0);
+ *}
+ *public String getValueFromData(Tuple2 data) {
--- End diff --

Generics for `Tuple2` is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622555
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates build the description how the input data should 
be mapped to redis type
+ *Example:
+ *{@code
+ *private static class RedisTestMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new 
RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.PUBSUB);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
--- End diff --

Generics for `Tuple2` is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622548
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates build the description how the input data should 
be mapped to redis type
--- End diff --

"creates build" redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622529
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import java.io.Serializable;
+
+/**
+ * The description of the data type. This must be passed while creating 
new {@link RedisMapper}.
+ * When creating descriptor for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET},
+ * plz use the first constructor {@link 
#RedisDataTypeDescription(RedisDataType, String)}.
+ * If the additional key is null it will throw IllegalArgumentException
+ *
+ * When {@link RedisDataType} is not {@link RedisDataType#HASH} and 
{@link RedisDataType#SORTED_SET}
+ * plz use the second constructor {@link 
#RedisDataTypeDescription(RedisDataType)}
+ */
+public class RedisDataTypeDescription implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private RedisDataType dataType;
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+* {@link #getAdditionalKey()} used as hash name for {@link 
RedisDataType#HASH}
+* For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+* {@link #getAdditionalKey()} used as set name for {@link 
RedisDataType#SORTED_SET}
+*/
+   private String additionalKey;
+
+   /**
+* Use this constructor when data type is HASH or SORTED_SET.
+*
+* @param dataType the redis data type {@link RedisDataType}
+* @param additionalKey additional key for Hash and Sorted set data type
+*/
+   public RedisDataTypeDescription(RedisDataType dataType, String 
additionalKey) {
+   this.dataType = dataType;
--- End diff --

Missing null check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622515
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import java.io.Serializable;
+
+/**
+ * The description of the data type. This must be passed while creating 
new {@link RedisMapper}.
+ * When creating descriptor for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET},
+ * plz use the first constructor {@link 
#RedisDataTypeDescription(RedisDataType, String)}.
+ * If the additional key is null it will throw IllegalArgumentException
+ *
+ * When {@link RedisDataType} is not {@link RedisDataType#HASH} and 
{@link RedisDataType#SORTED_SET}
+ * plz use the second constructor {@link 
#RedisDataTypeDescription(RedisDataType)}
+ */
+public class RedisDataTypeDescription implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private RedisDataType dataType;
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+* {@link #getAdditionalKey()} used as hash name for {@link 
RedisDataType#HASH}
+* For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+* {@link #getAdditionalKey()} used as set name for {@link 
RedisDataType#SORTED_SET}
+*/
+   private String additionalKey;
+
+   /**
+* Use this constructor when data type is HASH or SORTED_SET.
+*
--- End diff --

If different data type is specified, {@code additionalKey} is ignored.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622479
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import java.io.Serializable;
+
+/**
+ * The description of the data type. This must be passed while creating 
new {@link RedisMapper}.
+ * When creating descriptor for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET},
+ * plz use the first constructor {@link 
#RedisDataTypeDescription(RedisDataType, String)}.
+ * If the additional key is null it will throw IllegalArgumentException
+ *
+ * When {@link RedisDataType} is not {@link RedisDataType#HASH} and 
{@link RedisDataType#SORTED_SET}
+ * plz use the second constructor {@link 
#RedisDataTypeDescription(RedisDataType)}
--- End diff --

"you need to use second constructor"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622482
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import java.io.Serializable;
+
+/**
+ * The description of the data type. This must be passed while creating 
new {@link RedisMapper}.
+ * When creating descriptor for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET},
+ * plz use the first constructor {@link 
#RedisDataTypeDescription(RedisDataType, String)}.
+ * If the additional key is null it will throw IllegalArgumentException
+ *
+ * When {@link RedisDataType} is not {@link RedisDataType#HASH} and 
{@link RedisDataType#SORTED_SET}
+ * plz use the second constructor {@link 
#RedisDataTypeDescription(RedisDataType)}
+ */
+public class RedisDataTypeDescription implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private RedisDataType dataType;
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
--- End diff --

key [is] needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622473
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import java.io.Serializable;
+
+/**
+ * The description of the data type. This must be passed while creating 
new {@link RedisMapper}.
+ * When creating descriptor for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET},
+ * plz use the first constructor {@link 
#RedisDataTypeDescription(RedisDataType, String)}.
+ * If the additional key is null it will throw IllegalArgumentException
--- End diff --

Nit: {@code null} and {@code IllegalArgumentException}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622464
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import java.io.Serializable;
+
+/**
+ * The description of the data type. This must be passed while creating 
new {@link RedisMapper}.
+ * When creating descriptor for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET},
+ * plz use the first constructor {@link 
#RedisDataTypeDescription(RedisDataType, String)}.
--- End diff --

"you need to use first constructor"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622440
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available data type for Redis
--- End diff --

Nit. `.` at the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622392
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
${@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
--- End diff --

No need to copy JavaDoc for RedisCommandsContainer methods. (see above)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622383
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis Commands.
+ */
+public interface RedisCommandsContainer extends Serializable {
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName Hash name
+* @param key Hash key name
+* @param value Hash value
+*/
+   void hset(String hashName, String key, String value);
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
--- End diff --

value, ie, multiple?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622377
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleDataMapper implements 
RedisMapper<Tuple2<String, String>> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return String.valueOf(data.f0);
+ *}
+ *public String getValueFromData(Tuple2 data) {
+ *return String.valueOf(data.f1);
+ *}
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new RedisExampleDataMapper());
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+* additionalKey used as hash name for {@link RedisDataType#HASH}
+* For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+* additionalKey used as set name for {@link RedisDataType#SORTED_SET}
+*/
+   private String additionalKey;
+   private RedisMapper redisSinkMapper;
+   private RedisDataType redisDataType;
+
+   private JedisPoolConfig jedisPoolConfig;
+   private JedisSe

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622350
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
${@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName   Hash name
+* @param key Hash field name
+* @param value Hash value
+*/
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to key {} and field {} error message {}",
+   key, key, e.getMessage());
+   }
+   } finally {
+   returnInstance(jedis);
+   }
+   }
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
--- End diff --

One value or multiple values? It's only String type for value parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622319
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
${@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName   Hash name
+* @param key Hash field name
+* @param value Hash value
+*/
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to key {} and field {} error message {}",
+   key, key, e.getMessage());
+   }
+   } finally {
+   returnInstance(jedis);
+   }
+   }
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*
+* @param listName Name of the List
+* @param valueValue to be added
+*/
+   @Override
+   public void rpush(final String listName, final String value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.rpush(listName, value);
+   } catch (Exception e) {
+   

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622293
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
${@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   this.jedisSentinelPool = sentinelPool;
--- End diff --

Missing null check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622280
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
${@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   this.jedisPool = jedisPool;
--- End diff --

Missing null check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622261
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   this.jedisCluster = jedisCluster;
+   }
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName   Hash name
+* @param key Hash field name
+* @param value Hash value
+*/
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   try {
+   jedisCluster.hset(hashName, key, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to hash {} error message {}",
+   hashName, key, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*
+* @param listName Name of the List
+* @param value  Value to be added
+*/
+   @Override
+   public void rpush(final String listName, final String value) {
+   try {
+   jedisCluster.rpush(listName, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message: {}",
+   listName, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Add the specified members to the set stored at key.
+* Specified members that are already a member of this set are ignored.
+* If key does not exist, a new set is created before adding the 
specified members.
+*
+* @param setName Name of the Set
+* @param value   Value to be added
+*/
+   @Override
+   public void sadd(final String setName, final String value) {
+   try {
+   jedisCluster.sadd(setName, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to set {} error message {}",
+   setName, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Posts a message to the given channel
+*
+* @param channelName Name of the channel to which data will be 
published
+* @param message the message
+*/
+   @Override
  

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622255
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
--- End diff --

For methods that are inherited, you do not need to specify a JavaDoc (only 
if, you want a different JavaDoc as in interface or base class). JavaDoc is 
inherited per default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r6769
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
--- End diff --

All data types are `String` -- does Redis only support String? Or do I miss 
something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622177
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   this.jedisCluster = jedisCluster;
+   }
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName   Hash name
+* @param key Hash field name
+* @param value Hash value
+*/
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   try {
+   jedisCluster.hset(hashName, key, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to hash {} error message {}",
+   hashName, key, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
--- End diff --

Why "all ... values" -- it is a single string. Or do I miss something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622149
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   this.jedisCluster = jedisCluster;
--- End diff --

null check required?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622134
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class JedisSentinelConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
--- End diff --

What about default values?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622131
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class JedisSentinelConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+*
+* @param masterName master name of the replica set
+* @param sentinels set of sentinel hosts
+* @param connectionTimeout timeout connection timeout
+* @param soTimeout timeout socket timeout
+* @param password password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+*
+* @throws NullPointerException if do not see master name or sentinels
+* @throws IllegalArgumentException if sentinels are empty
+*/
+   private JedisSentinelConfig(String masterName, Set sentinels,
+   int 
connectionTimeout, int soTimeout,
+   String 
password, int database,
+   int maxTotal, 
int maxIdle, int minIdle) {
+   Preconditions.checkNotNull(masterName, "Master name should be 
presented");
+   Preconditions.checkNotNull(sentinels, "Sentinels information 
should be presented");
+   Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel 
hosts should not be empty");
+
+   this.masterName = masterName;
+   this.sentinels = sentinels;
+   this.connectionTimeout = connectionTimeout;
+   this.soTimeout = soTimeout;
+   this.password = password;
+   this.database = database;
+   this.maxTotal = maxTotal;
+   this.maxIdle = maxIdle;
+   this.minIdle = minIdle;
+   }
+
+   /**
+* Returns master name of the replica set.
+*
+* @return master name of the replica set.
+*/
+   public String getMasterName() {
+   return masterName;
+   }
+
+   /**
+* Returns Sentinels host addresses.
+*
+* @return Set of Sentinels host addresses
+*/
+   public Set getSentinels() {
+   return sentinels;
+   }
+
+   /**
+* Returns timeout.
+*
+* @return connection timeout
+*/
+   public int getConnectionTimeout() {
+   return connectionTimeout;
+   }
+
+   /**
+* Returns socket timeout.
+*
+* @return socket timeout
+*/
+   

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622112
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class JedisSentinelConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+*
+* @param masterName master name of the replica set
+* @param sentinels set of sentinel hosts
+* @param connectionTimeout timeout connection timeout
+* @param soTimeout timeout socket timeout
+* @param password password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+*
--- End diff --

remove double statements, eg, "password password password"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622101
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class JedisSentinelConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+*
+* @param masterName master name of the replica set
+* @param sentinels set of sentinel hosts
+* @param connectionTimeout timeout connection timeout
+* @param soTimeout timeout socket timeout
+* @param password password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+*
+* @throws NullPointerException if do not see master name or sentinels
--- End diff --

if {@code masterName} or {@code sentinels} is {@code null}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622093
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class JedisSentinelConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
--- End diff --

Jedis Sentinels config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622080
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class JedisPoolConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int timeout;
+   private int database;
+   private String password;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The host is mandatory, and when host is not set, it throws 
NullPointerException.
+*
+* @param host host hostname or IP
+* @param port port port
+* @param timeout timeout socket / connection timeout
+* @param password password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in 
the pool
+* @throws NullPointerException if do not see host
+ */
+   private JedisPoolConfig(String host, int port, int timeout, String 
password, int database,
+   int maxTotal, int 
maxIdle, int minIdle) {
+   Preconditions.checkNotNull(host, "Host information should be 
presented");
+   this.host = host;
+   this.port = port;
+   this.timeout = timeout;
+   this.database = database;
+   this.password = password;
+   this.maxTotal = maxTotal;
+   this.maxIdle = maxIdle;
+   this.minIdle = minIdle;
+   }
+
+   /**
+* Get the value for the {@code maxTotal} configuration attribute
+* for pools to be created with this configuration instance.
+*
+* @return  The current setting of {@code maxTotal} for this
+*  configuration instance
+* @see GenericObjectPoolConfig#getMaxTotal()
+*/
+   public int getMaxTotal() {
+   return maxTotal;
+   }
+
+   /**
+* Get the value for the {@code maxIdle} configuration attribute
+* for pools to be created with this configuration instance.
+*
+* @return  The current setting of {@code maxIdle} for this
+*  configuration instance
+* @see GenericObjectPoolConfig#getMaxIdle()
+*/
+   public int getMaxIdle() {
+   return maxIdle;
+   }
+
+   /**
+* Get the value for the {@code minIdle} configuration attribute
+* for pools to be created with this configuration instance.
+*
+* @return  The current setting of {@code minIdle} for this
+*  configuration instance
+* @see GenericObjectPoolConfig#getMinIdle()
+*/
+   public int getMinIdle() {
+   return minIdle;
+   }
+
+   /**
+* Returns host.
+*
+* @return hostname or IP
+*/
+   public String getHost() {
+   return host;
+   }
+
+   /**
+* Returns port.
+*
+* @return port
+*/
+   public int getPort() {
+   return port;
+   }
+
+   /**
+* Returns timeout.
+*
+* @return socket / connection timeout
+*/
+   public i

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622070
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class JedisClusterConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int timeout;
+   private int maxRedirections;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes nodes list of node information for JedisCluster
+* @param timeout timeout socket / connection timeout
+* @param maxRedirections maxRedirections limit of redirections - how 
much we'll follow MOVED or ASK
+* @param maxTotal the maximum number of objects that can be allocated 
by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in 
the pool
+* @throws NullPointerException if nodes are null
+*/
+   private JedisClusterConfig(Set nodes, int timeout, 
int maxRedirections,
+   int maxTotal, 
int maxIdle, int minIdle) {
+
+   Preconditions.checkNotNull(nodes, "Node information should be 
presented");
+
+   this.nodes = nodes;
+   this.timeout = timeout;
+   this.maxRedirections = maxRedirections;
+   this.maxTotal = maxTotal;
+   this.maxIdle = maxIdle;
+   this.minIdle = minIdle;
+   }
+
+   /**
+* Get the value for the {@code maxTotal} configuration attribute
+* for pools to be created with this configuration instance.
+*
+* @return  The current setting of {@code maxTotal} for this
+*  configuration instance
+* @see GenericObjectPoolConfig#getMaxTotal()
+*/
+   public int getMaxTotal() {
+   return maxTotal;
+   }
+
+   /**
+* Get the value for the {@code maxIdle} configuration attribute
+* for pools to be created with this configuration instance.
+*
+* @return  The current setting of {@code maxIdle} for this
+*  configuration instance
+* @see GenericObjectPoolConfig#getMaxIdle()
+*/
+   public int getMaxIdle() {
+   return maxIdle;
+   }
+
+   /**
+* Get the value for the {@code minIdle} configuration attribute
+* for pools to be created with this configuration instance.
+*
+* @return  The current setting of {@code minIdle} for this
+*  configuration instance
+* @see GenericObjectPoolConfig#getMinIdle()
+*/
+   public int getMinIdle() {
+   return minIdle;
+   }
+
+   /**
+* Returns nodes.
+*
+* @return list of node information
+*/
+   public Set getNodes() {
+   Set ret = new HashSet<>();
+   for (InetSocketAddress node : nodes) {
+   ret.add(new HostAndPort(node.getHostName(), 
node.getPort()));
+   }
+   return ret;
+   }
+
+   /**

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622038
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class JedisPoolConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int timeout;
+   private int database;
+   private String password;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The host is mandatory, and when host is not set, it throws 
NullPointerException.
--- End diff --

What are default values for parameters that are set to null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622027
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class JedisPoolConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int timeout;
+   private int database;
+   private String password;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The host is mandatory, and when host is not set, it throws 
NullPointerException.
--- End diff --

"Jedis pool configuration."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67621999
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class JedisPoolConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int timeout;
+   private int database;
+   private String password;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The host is mandatory, and when host is not set, it throws 
NullPointerException.
+*
+* @param host host hostname or IP
+* @param port port port
+* @param timeout timeout socket / connection timeout
+* @param password password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in 
the pool
+* @throws NullPointerException if do not see host
--- End diff --

remove double "hostname", "port", etc...
align "minIdle" correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67621988
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class JedisPoolConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int timeout;
+   private int database;
+   private String password;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The host is mandatory, and when host is not set, it throws 
NullPointerException.
+*
+* @param host host hostname or IP
+* @param port port port
+* @param timeout timeout socket / connection timeout
+* @param password password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in 
the pool
+* @throws NullPointerException if do not see host
--- End diff --

@throws NullPointerException if parameter {@code host} is {@code null}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67621945
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class JedisClusterConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int timeout;
+   private int maxRedirections;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
--- End diff --

What is default behavior for parameters that are set to `null` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-19 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67621900
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class JedisClusterConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int timeout;
+   private int maxRedirections;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes nodes list of node information for JedisCluster
+* @param timeout timeout socket / connection timeout
--- End diff --

@param timeout socket / connection timeout

remove double "timeout"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   7   >