[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292235#comment-16292235
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user taizilongxu commented on the issue:

https://github.com/apache/flink/pull/5050
  
Hi, is there any thing to edit or change? 


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268403#comment-16268403
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user taizilongxu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5050#discussion_r153426741
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 ---
@@ -148,4 +171,17 @@ public static Long 
getOffsetFromZooKeeper(CuratorFramework curatorClient, String
}
}
}
+
+   public static void registerPartitionOwnership(CuratorFramework 
curatorClient, String groupId, String consumerId, String topic, int partition, 
String taskId) throws Exception {
+   String path = "/consumers/" + groupId + "/owners/" + topic + 
"/" + Integer.toString(partition);
+   // register with task info that we can read from zookeeper 
which taskmanager consume the right topic
+   String info = consumerId + "_" + taskId;
+   try {
+   if (curatorClient.checkExists().forPath(path) == null) {
+   
curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,
 info.getBytes());
+   }
+   } catch (KeeperException.NodeExistsException e) {
+   LOG.warn("NodeExists for {}", e);
--- End diff --

Thanks to review, I will edit them


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266560#comment-16266560
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5050#discussion_r153141966
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 ---
@@ -148,4 +171,17 @@ public static Long 
getOffsetFromZooKeeper(CuratorFramework curatorClient, String
}
}
}
+
+   public static void registerPartitionOwnership(CuratorFramework 
curatorClient, String groupId, String consumerId, String topic, int partition, 
String taskId) throws Exception {
+   String path = "/consumers/" + groupId + "/owners/" + topic + 
"/" + Integer.toString(partition);
+   // register with task info that we can read from zookeeper 
which taskmanager consume the right topic
+   String info = consumerId + "_" + taskId;
+   try {
+   if (curatorClient.checkExists().forPath(path) == null) {
+   
curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,
 info.getBytes());
+   }
+   } catch (KeeperException.NodeExistsException e) {
+   LOG.warn("NodeExists for {}", e);
--- End diff --

Also, shouldn't the log arguments be be `("Node exists for {}", consumerId, 
e)`?


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266557#comment-16266557
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5050#discussion_r153141396
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 ---
@@ -64,6 +71,20 @@ public ZookeeperOffsetHandler(Properties props) {
int backoffBaseSleepTime = 
Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
int backoffMaxRetries =  
Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
 
+   // set consumerId to register ownership in zookeeper, just like 
kafka high level API
+   UUID uuid = UUID.randomUUID();
+   String hostName = "Unkonw";
--- End diff --

type: `Unknown`


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266559#comment-16266559
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5050#discussion_r153141629
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 ---
@@ -148,4 +171,17 @@ public static Long 
getOffsetFromZooKeeper(CuratorFramework curatorClient, String
}
}
}
+
+   public static void registerPartitionOwnership(CuratorFramework 
curatorClient, String groupId, String consumerId, String topic, int partition, 
String taskId) throws Exception {
+   String path = "/consumers/" + groupId + "/owners/" + topic + 
"/" + Integer.toString(partition);
+   // register with task info that we can read from zookeeper 
which taskmanager consume the right topic
+   String info = consumerId + "_" + taskId;
+   try {
+   if (curatorClient.checkExists().forPath(path) == null) {
+   
curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,
 info.getBytes());
+   }
+   } catch (KeeperException.NodeExistsException e) {
+   LOG.warn("NodeExists for {}", e);
--- End diff --

How about simply: `Node exists for {}`, instead of the camel case?


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266558#comment-16266558
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5050#discussion_r153141260
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
 ---
@@ -45,14 +45,18 @@
/** Flag to mark the periodic committer as running. */
private volatile boolean running = true;
 
+   /** The task info to register in zk ownership path. */
+   private final String taskId;
+
PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
List 
partitionStates,
ExceptionProxy errorHandler,
-   long commitInterval) {
+   long commitInterval, String taskId) {
--- End diff --

Please list this parameter on a separate new line for consistently code 
style.


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266556#comment-16266556
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5050#discussion_r153141352
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 ---
@@ -64,6 +71,20 @@ public ZookeeperOffsetHandler(Properties props) {
int backoffBaseSleepTime = 
Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
int backoffMaxRetries =  
Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
 
+   // set consumerId to register ownership in zookeeper, just like 
kafka high level API
+   UUID uuid = UUID.randomUUID();
+   String hostName = "Unkonw";
+   try {
+   hostName = InetAddress.getLocalHost().getHostName();
+   } catch (UnknownHostException e) {
+   LOG.error("Can not get host name!");
--- End diff --

I would add the exception to the log also.


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262841#comment-16262841
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user taizilongxu commented on the issue:

https://github.com/apache/flink/pull/5050
  
the prefix is the same as kafka high level api, contain the uuid string, so 
it could be unique


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262720#comment-16262720
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5050
  
Don't you need a unique ID for this to work properly? (The task name is not 
unique)


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262328#comment-16262328
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

GitHub user taizilongxu opened a pull request:

https://github.com/apache/flink/pull/5050

[FLINK-4822] Ensure that the Kafka 0.8 connector is compatible with k…


## What is the purpose of the change

When we deploy the taskmanager in docker of our cluster, it's hard to 
locate which taskmanager cosnume the right partition of kafka except looking up 
the  log in docker, so I just add the owner in zk path when 
PeriodOffsetCommitter the offset.


## Brief change log

  - add the registerPartitionOwnership  when  commit the offset to 
zookeeper, and store the info like :  
/consumers/[group_id]/owner/[topic]/[partition_id] 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:no
  - The serializers:  no 
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/taizilongxu/flink flink-4822

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5050.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5050


commit 1c7a345e671010cf07b02cfdff9968ced38e2632
Author: xuxiao.xu 
Date:   2017-11-22T09:36:12Z

[FLINK-4822] Ensure that the Kafka 0.8 connector is compatible with 
kafka-consumer-groups.sh




> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)