[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579743#comment-16579743 ] ASF GitHub Bot commented on FLINK-8290: --- azagrebin commented on issue #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#issuecomment-412856931 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578043#comment-16578043 ] ASF GitHub Bot commented on FLINK-8290: --- azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r209553402 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: I think we can just replace this line in the current PR: ``` this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); ``` with these 2 lines: ``` String groupId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); this.clientId = config.getProperty("client.id", groupId); ``` `Group id` will be default value for `client id` then, as in Kafka docs, but user can change both of them independently if user ever needs it. It should also help in logs with multiple topics. `"flink-kafka-consumer-legacy-" + broker.id()` will be just default value for `group.id`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1651#comment-1651 ] ASF GitHub Bot commented on FLINK-8290: --- maqingxiang commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r209480301 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: The way that @azagrebin is suggesting is really my initially suggested, but ` "flink-kafka-consumer-legacy-" + broker.id() ` has not been in effect. I'm a little confused now, how do I set it up better. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576237#comment-16576237 ] ASF GitHub Bot commented on FLINK-8290: --- aljoscha commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r209251376 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: sorry about all the confusion. I think what @azagrebin suggested is the way to go: ``` String groupId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); this.clientId = config.getProperty("client.id", groupId); ``` @maqingxiang Isn't this also what you initially suggested? I'm afraid we just had a lot of miscommunication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575924#comment-16575924 ] ASF GitHub Bot commented on FLINK-8290: --- azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r209180335 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: I think it is better, but this PR change is in old consumer, I would make `group.id` default value for `client.id`, same as in Kafka docs. @aljoscha? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575732#comment-16575732 ] ASF GitHub Bot commented on FLINK-8290: --- maqingxiang commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r209138484 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: Thank you for the review @azagrebin and @aljoscha. I looked at the kafka document again, for `kafka consumer`, group.id is a required property. - for old consumer, the default value set "group.id value". - for new consumer, the default value set "". so, i agree with @aljoscha, clear and concise. ``` this.clientId = config.getProperty("client.id", "flink-kafka-consumer-legacy-" + broker.id()); ``` If ok, I'll resubmit it again, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574968#comment-16574968 ] ASF GitHub Bot commented on FLINK-8290: --- azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208965558 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: @maqingxiang Now I see your point, I think it makes sense to have `group.id` as a default value for `client.id` but keep them separately configurable, e.g.: ``` String groupId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); this.clientId = config.getProperty("client.id", groupId); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574930#comment-16574930 ] ASF GitHub Bot commented on FLINK-8290: --- aljoscha commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208955188 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: Why is it not ``` this.clientId = config.getProperty("client.id", "flink-kafka-consumer-legacy-" + broker.id()); ``` that way the client ID would be configurable separately from the group ID. As it is in the PR, you can never have a client ID that is different from the group ID if you set a `group.id`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574193#comment-16574193 ] ASF GitHub Bot commented on FLINK-8290: --- maqingxiang commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208790526 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: Hi @azagrebin, group.id and client.id are two separate concepts, it has different meanings. I mean that there's no real meaning to the name of the client id as "flink-kafka-consumer-legacy-" + broker.id(). As different topics consumed by different consumers are all named with this value, it is impossible to distinguish and difficult to troubleshoot. I would suggest to open the client id to the user, and its default value set "group.id value". thanks. Cheers This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16572961#comment-16572961 ] ASF GitHub Bot commented on FLINK-8290: --- azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208522780 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: Hi @maqingxiang, I would suggest to name it `client.id` as in its original description, because it is used in `SimpleConsumer` eventually in this case and `FlinkKafkaConsumer08` is based on the `SimpleConsumer`. As `client.id` identifies application, multiple `group.id`'s can have the same `client.id`. It should also help in logs and can be set to the same value as `group.id` in the user config if needed. `group.id` is also used in the commitment of the offsets in zookeeper for `FlinkKafkaConsumer08`. It is checked in `ZookeeperOffsetHandler` to be always set. So if we use it for the `clientId`, `"flink-kafka-consumer-legacy-" + broker.id()` will be ignored and `client.id` will be coupled with `group.id` with no way to use it for application name as original documentation suggests. What do you think? Cheers This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16572627#comment-16572627 ] ASF GitHub Bot commented on FLINK-8290: --- maqingxiang commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208448741 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: Hi, @azagrebin group.id is a concept of kafka, used to describe consumers property, its official description is "A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group". client.id is a concept of SimpleConsumer, its official descriptionis "The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request". its default value is "group.id value" thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16572626#comment-16572626 ] xymaqingxiang commented on FLINK-8290: -- Hi, *all* [group.id|http://group.id/] is a concept of kafka, used to describe consumers property, its official description is "A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group". [client.id|http://client.id/] is a concept of SimpleConsumer, its official descriptionis "The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.". like this, [kafka documentation|https://kafka.apache.org/documentation/] !https://mail.google.com/mail/u/0?ui=2=4365074462=0.1=msg-a:r-5049515360704170079=16517a104555e5fb=fimg=w1600-h1000=ANGjdJ9SfqqNzA50y7EDw4YMWcszL-eBrPbSpz50MbuXJjH1nI-wsVrtNw7lat5iFSi7PtVz8-eqRpldUYEHpbQQkOYhDp4PEJI9xNoP6qLeKBezQHuVwECpQm4FQtc=emb=ii_jkkl3yrt1|width=562,height=102! thanks. > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16570529#comment-16570529 ] ASF GitHub Bot commented on FLINK-8290: --- azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r207971760 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: If it is about client id in Kafka `SimpleConsumer`, why is this option named `group.id` and not `client.id`? As I understand, Flink job can be just one of consumers in a group. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327130#comment-16327130 ] ASF GitHub Bot commented on FLINK-8290: --- GitHub user maqingxiang opened a pull request: https://github.com/apache/flink/pull/5304 [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 Now the Clientid that consumes the all topics are constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy for us to look at kafka's log, so I recommend that it be modified to groupid. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maqingxiang/flink FLINK-8290 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5304.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5304 commit e8c19520e9b7500548ad36afb8bd698a35d64053 Author: maqingxiang-itDate: 2018-01-16T13:42:45Z Modify clientId to groupId in flink-connector-kafka-0.8 > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Priority: Major > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327118#comment-16327118 ] xymaqingxiang commented on FLINK-8290: -- The description of “client.id” is “An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.” in documentation of [kafka](https://kafka.apache.org/documentation/#consumerconfigs). > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Priority: Major > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)