[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-07-13 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
Hi @kl0u, IMO that is the expected behavior. The sink would not know that 
if the Redis is down or not unless it tries to send the next data to the Redis. 
When ever a new message reaches the sink it tries to use the connection pool, 
then an then only it can throw exception that it can not send the data to Redis.
You can build a heartbeat mechanism to check periodically that Redis serve 
is up or down, and can cancel the job if Redis is down. 
@mjsax please correct me if my understanding is wrong.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-07-06 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax , @rmetzger plz review. IMO it is ready to get merged at last :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax done. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69444781
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
+
+
+   /**
+* Jedis cluster configuration.
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes list of node information for JedisCluster
+* @param connectionTimeout socket / connection timeout. The default is 
2000
+* @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+* @param maxTotal the maximum number of objects that can be allocated 
by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+* @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+*/
+   private FlinkJedisClusterConfig(Set nodes, int 
connectionTimeout, int maxRedirections,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+   Preconditions.checkNotNull(nodes, "Node information should be 
presented");
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69444799
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69444815
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+*
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+* @throws NullPointerException if jedisPoolConfig is null
+*/
+   public static RedisCommandsContainer build(FlinkJedisPoolConfig 
jedisPoolConfig) {
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69444769
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+
+   protected int maxTotal;
+   protected int maxIdle;
+   protected int minIdle;
+   protected int connectionTimeout;
+
+   protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle){
+   this.connectionTimeout = connectionTimeout;
+   this.maxTotal = maxTotal;
+   this.maxIdle = maxIdle;
+   this.minIdle = minIdle;
+   }
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-04 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69442146
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
+
+
+   /**
+* Jedis cluster configuration.
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes list of node information for JedisCluster
+* @param connectionTimeout socket / connection timeout. The default is 
2000
+* @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+* @param maxTotal the maximum number of objects that can be allocated 
by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+* @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+*/
+   private FlinkJedisClusterConfig(Set nodes, int 
connectionTimeout, int maxRedirections,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+   Preconditions.checkNotNull(nodes, "Node information should be 
presented");
+   this.nodes = nodes;
+   this.maxRedirections = maxRedirections;
+   }
+
+
+
+   /**
+* Returns nodes.
+*
+* @return list of node information
+*/
+   public Set getNodes() {
+   Set ret = new HashSet<>();
+   for (InetSocketAddress node : nodes) {
+   ret.add(new HostAndPort(node.getHostName(), 
node.getPort()));
+   }
+   return ret;
+   }
+
+   /**
+* Returns limit of redirection.
+*
+* @return limit of redirection
+*/
+   public int getMaxRedirections() {
+   return maxRedirections;
+   }
+
+
+   /**
+* Builder for initializing  {@link FlinkJedisClusterConfig}.
+*/
+   public static class Builder {
--- End diff --

It is static inner class. IMO let it keep that way. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-01 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69372418
  
--- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml ---
@@ -0,0 +1,86 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-redis_2.10
+   flink-connector-redis
+
+   jar
+
+   
+   2.8.0
--- End diff --

added in doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-07-01 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69372416
  
--- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml ---
@@ -0,0 +1,86 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-redis_2.10
+   flink-connector-redis
+
+   jar
+
+   
+   2.8.0
+   
+
+   
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-30 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69247416
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not 
be null");
+
+   this.jedisCluster = jedisCluster;
+   }
+
+   @Override
+   public void hset(final String key, final String hashField, final String 
value) {
+   try {
+   jedisCluster.hset(key, hashField, value);
+   } catch (Exception e) {
--- End diff --

I am quite unsure what to do. Jedis did not implemented any checked or 
unchecked exception. Should we go to the same line or log it and re-throw the 
exception. @mjsax, @tzulitai what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-30 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69246112
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class FlinkJedisPoolConfig implements Serializable {
--- End diff --

@tzulitai `soTimeout` is extra config for `JedisSentinel`. Other common 
configs i am moving to base class. Really like it. thank u.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-28 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax , @rmetzger  plz review. The changed model is described in the PR 
description.

thanks,
subhankar


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-27 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68551999
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
--- End diff --

The `RedisDataTypeDescription.additionalKey` is used for the key. Yah the 
param names are confusing. i am making the changes according to the same line 
the client. Thank you very much. :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-27 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@tzulitai, @mjsax  thank you very much for your valuable feedback.
1) `additionalKey` was supposed to one time declaration for `SORTED_SET` 
and `HASH`. For `HASH`
it is the value of hash name and `SORTED_SET` it is the name of the set. I 
assume it would not change once it declares. 

2) `JedisPoolConfig`/ `JedisClusterConfig` were not `Serializable` so i 
needed a wrapper class for that. @mjsax there is a story for a source for 
Redis, so `RedisSinkConfig` would not be a valid one as the config would be 
same for sink and source. Thats why i make the `JedisPoolConfig`/ 
`JedisClusterConfig` ctor private. So that user forced to use the builder class 
to avoid confusion. 

3) When i started with this i thought it was obvious that if i use PUBSUB 
in sink, i would always use it for publish and if i use this in source i would 
always use it for subscribe  :) .  

4 + 5 ) We can group the command and dataType like.
`public enum RedisCommand {
LPUSH(RedisDataType.LIST),
RPUSH(RedisDataType.LIST);
private RedisDataType redisDataType;
RedisCommand(RedisDataType redisDataType) {
  this.redisDataType = redisDataType;
}
public boolean isInRedisDataType(RedisDataType redisDataType) {
  return this.redisDataType == redisDataType;
}
}`
And in `RedisDataTypeDescription` we can extract the command . So that in 
future we can add new command. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-24 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68484062
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to key {} and field {} error message {}",
+   key, key, e.getMessage());
+   }
+   } finally {
+   releaseInstance(jedis);
--- End diff --

`releaseInstance(jedis)` use jedis `close()` method. The close method 
return the resource to connection pool.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-24 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-22 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax plz review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-22 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68008039
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
--- End diff --

replaced with {@inheritDoc}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-22 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68007965
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class JedisSentinelConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+*
+* @param masterName master name of the replica set
+* @param sentinels set of sentinel hosts
+* @param connectionTimeout timeout connection timeout
+* @param soTimeout timeout socket timeout
+* @param password password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+*
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-21 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67994813
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   this.jedisCluster = jedisCluster;
+   }
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName   Hash name
+* @param key Hash field name
+* @param value Hash value
+*/
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   try {
+   jedisCluster.hset(hashName, key, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to hash {} error message {}",
+   hashName, key, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*
+* @param listName Name of the List
+* @param value  Value to be added
+*/
+   @Override
+   public void rpush(final String listName, final String value) {
+   try {
+   jedisCluster.rpush(listName, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message: {}",
+   listName, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Add the specified members to the set stored at key.
+* Specified members that are already a member of this set are ignored.
+* If key does not exist, a new set is created before adding the 
specified members.
+*
+* @param setName Name of the Set
+* @param value   Value to be added
+*/
+   @Override
+   public void sadd(final String setName, final String value) {
+   try {
+   jedisCluster.sadd(setName, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to set {} error message {}",
+   setName, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Posts a message to the given channel
+*
+* @param channelName Name of the channel to which data will be 
published
+* @param message the message
+*/
+   

[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-21 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67905768
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   this.jedisCluster = jedisCluster;
--- End diff --

Already checked in RedisCommandsContainerBuilder.build method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-21 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67904127
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class JedisClusterConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int timeout;
+   private int maxRedirections;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
--- End diff --

The constructor is private. So user has to set everything through builder 
class except nodes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-21 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax wow... that's a lot. thanks you very very much for your time. i am 
fixing ASAP. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2054: [FLINK-3763] RabbitMQ Source/Sink standardize connection ...

2016-06-03 Thread subhankarb
Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/2054
  
Hi @rmetzger ,
would you plzzz review the changes.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65303646
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+   this.requestedChannelMax = requestedChannelMax;
+   this.requestedFrameMax = requestedFrameMax;
+   this.requestedHeartbeat = requestedHeartbeat;

[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize ...

2016-05-31 Thread subhankarb
GitHub user subhankarb opened a pull request:

https://github.com/apache/flink/pull/2054

[FLINK-3763] RabbitMQ Source/Sink standardize connection parameters

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/subhankarb/flink FLINK-3763

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2054.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2054


commit 0894679aa7dfbb8e0d5f6edd51f5000a08707769
Author: subhankar <subhankar.bis...@target.com>
Date:   2016-05-31T10:38:27Z

[FLINK-3763] RabbitMQ Source/Sink standardize connection parameters




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-05-30 Thread subhankarb
Github user subhankarb commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-222546613
  
@rmetzger, @mjsax plz review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

2016-03-19 Thread subhankarb
Github user subhankarb closed the pull request at:

https://github.com/apache/flink/pull/1580


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-19 Thread subhankarb
GitHub user subhankarb opened a pull request:

https://github.com/apache/flink/pull/1813

[FLINK-3034] Redis Sink Connector

Flink streaming connector for redis. This pull request is for only sink 
part of the connector

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/subhankarb/flink FLINK-3034

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1813.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1813


commit 49ac527c7bffa72d979d058f0af41acddaba2c30
Author: subhankar <subhankar.bis...@target.com>
Date:   2016-03-17T12:10:07Z

[FLINK-3034] Redis Sink Connector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

2016-02-22 Thread subhankarb
Github user subhankarb commented on the pull request:

https://github.com/apache/flink/pull/1580#issuecomment-187546037
  
hi matthias,
   i was on vacation for 1 week. I'll update the pr once i get some input
from robert about my design.
On 22-Feb-2016 7:21 PM, "Robert Metzger" <notificati...@github.com> wrote:

> Sorry, I'm currently very busy with the 1.0 release. I hope I'll find some
> time to look into this PR again later this week.
>
> —
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/flink/pull/1580#issuecomment-187184409>.
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

2016-02-08 Thread subhankarb
Github user subhankarb commented on the pull request:

https://github.com/apache/flink/pull/1580#issuecomment-181340467
  
@rmetzger sorry for late reply. 
plz take a look at the gist 
https://gist.github.com/subhankarb/6a503378063819eb47e9
user can choose which one they want to use at the time of creating sink and 
source[while source can contain only PUBSUB(subscribe) and list(pop)].



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

2016-02-05 Thread subhankarb
Github user subhankarb commented on the pull request:

https://github.com/apache/flink/pull/1580#issuecomment-180679993
  
@rmetzger when i started i had only pubsub in my mind. today i thought 
about added list, sorted set for sink+source and hash and set for sink only. 
i'll update the pull req.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...

2016-02-02 Thread subhankarb
GitHub user subhankarb opened a pull request:

https://github.com/apache/flink/pull/1580

[FLINK-3034][Streaming Connectors] Redis Sink Connector



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/subhankarb/flink FLINK-3034

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1580.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1580


commit 668bf80ca2a07a29310d62e7628de28d26ed09aa
Author: subhankar <subhankar.bis...@target.com>
Date:   2016-02-03T03:27:38Z

[FLINK-3034][Streaming Connectors] Redis Sink Connector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---