[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292235#comment-16292235 ] ASF GitHub Bot commented on FLINK-4822: --- Github user taizilongxu commented on the issue: https://github.com/apache/flink/pull/5050 Hi, is there any thing to edit or change? > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268403#comment-16268403 ] ASF GitHub Bot commented on FLINK-4822: --- Github user taizilongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/5050#discussion_r153426741 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java --- @@ -148,4 +171,17 @@ public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String } } } + + public static void registerPartitionOwnership(CuratorFramework curatorClient, String groupId, String consumerId, String topic, int partition, String taskId) throws Exception { + String path = "/consumers/" + groupId + "/owners/" + topic + "/" + Integer.toString(partition); + // register with task info that we can read from zookeeper which taskmanager consume the right topic + String info = consumerId + "_" + taskId; + try { + if (curatorClient.checkExists().forPath(path) == null) { + curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, info.getBytes()); + } + } catch (KeeperException.NodeExistsException e) { + LOG.warn("NodeExists for {}", e); --- End diff -- Thanks to review, I will edit them > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266560#comment-16266560 ] ASF GitHub Bot commented on FLINK-4822: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5050#discussion_r153141966 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java --- @@ -148,4 +171,17 @@ public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String } } } + + public static void registerPartitionOwnership(CuratorFramework curatorClient, String groupId, String consumerId, String topic, int partition, String taskId) throws Exception { + String path = "/consumers/" + groupId + "/owners/" + topic + "/" + Integer.toString(partition); + // register with task info that we can read from zookeeper which taskmanager consume the right topic + String info = consumerId + "_" + taskId; + try { + if (curatorClient.checkExists().forPath(path) == null) { + curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, info.getBytes()); + } + } catch (KeeperException.NodeExistsException e) { + LOG.warn("NodeExists for {}", e); --- End diff -- Also, shouldn't the log arguments be be `("Node exists for {}", consumerId, e)`? > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266557#comment-16266557 ] ASF GitHub Bot commented on FLINK-4822: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5050#discussion_r153141396 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java --- @@ -64,6 +71,20 @@ public ZookeeperOffsetHandler(Properties props) { int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100")); int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10")); + // set consumerId to register ownership in zookeeper, just like kafka high level API + UUID uuid = UUID.randomUUID(); + String hostName = "Unkonw"; --- End diff -- type: `Unknown` > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266559#comment-16266559 ] ASF GitHub Bot commented on FLINK-4822: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5050#discussion_r153141629 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java --- @@ -148,4 +171,17 @@ public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String } } } + + public static void registerPartitionOwnership(CuratorFramework curatorClient, String groupId, String consumerId, String topic, int partition, String taskId) throws Exception { + String path = "/consumers/" + groupId + "/owners/" + topic + "/" + Integer.toString(partition); + // register with task info that we can read from zookeeper which taskmanager consume the right topic + String info = consumerId + "_" + taskId; + try { + if (curatorClient.checkExists().forPath(path) == null) { + curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, info.getBytes()); + } + } catch (KeeperException.NodeExistsException e) { + LOG.warn("NodeExists for {}", e); --- End diff -- How about simply: `Node exists for {}`, instead of the camel case? > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266558#comment-16266558 ] ASF GitHub Bot commented on FLINK-4822: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5050#discussion_r153141260 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java --- @@ -45,14 +45,18 @@ /** Flag to mark the periodic committer as running. */ private volatile boolean running = true; + /** The task info to register in zk ownership path. */ + private final String taskId; + PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler, ListpartitionStates, ExceptionProxy errorHandler, - long commitInterval) { + long commitInterval, String taskId) { --- End diff -- Please list this parameter on a separate new line for consistently code style. > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266556#comment-16266556 ] ASF GitHub Bot commented on FLINK-4822: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5050#discussion_r153141352 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java --- @@ -64,6 +71,20 @@ public ZookeeperOffsetHandler(Properties props) { int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100")); int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10")); + // set consumerId to register ownership in zookeeper, just like kafka high level API + UUID uuid = UUID.randomUUID(); + String hostName = "Unkonw"; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Can not get host name!"); --- End diff -- I would add the exception to the log also. > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262841#comment-16262841 ] ASF GitHub Bot commented on FLINK-4822: --- Github user taizilongxu commented on the issue: https://github.com/apache/flink/pull/5050 the prefix is the same as kafka high level api, contain the uuid string, so it could be unique > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262720#comment-16262720 ] ASF GitHub Bot commented on FLINK-4822: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5050 Don't you need a unique ID for this to work properly? (The task name is not unique) > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262328#comment-16262328 ] ASF GitHub Bot commented on FLINK-4822: --- GitHub user taizilongxu opened a pull request: https://github.com/apache/flink/pull/5050 [FLINK-4822] Ensure that the Kafka 0.8 connector is compatible with k… ## What is the purpose of the change When we deploy the taskmanager in docker of our cluster, it's hard to locate which taskmanager cosnume the right partition of kafka except looking up the log in docker, so I just add the owner in zk path when PeriodOffsetCommitter the offset. ## Brief change log - add the registerPartitionOwnership when commit the offset to zookeeper, and store the info like : /consumers/[group_id]/owner/[topic]/[partition_id] ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/taizilongxu/flink flink-4822 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5050.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5050 commit 1c7a345e671010cf07b02cfdff9968ced38e2632 Author: xuxiao.xuDate: 2017-11-22T09:36:12Z [FLINK-4822] Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)