[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r209553402 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: I think we can just replace this line in the current PR: ``` this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); ``` with these 2 lines: ``` String groupId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); this.clientId = config.getProperty("client.id", groupId); ``` `Group id` will be default value for `client id` then, as in Kafka docs, but user can change both of them independently if user ever needs it. It should also help in logs with multiple topics. `"flink-kafka-consumer-legacy-" + broker.id()` will be just default value for `group.id`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r209180335 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: I think it is better, but this PR change is in old consumer, I would make `group.id` default value for `client.id`, same as in Kafka docs. @aljoscha? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208965558 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: @maqingxiang Now I see your point, I think it makes sense to have `group.id` as a default value for `client.id` but keep them separately configurable, e.g.: ``` String groupId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); this.clientId = config.getProperty("client.id", groupId); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208522780 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: Hi @maqingxiang, I would suggest to name it `client.id` as in its original description, because it is used in `SimpleConsumer` eventually in this case and `FlinkKafkaConsumer08` is based on the `SimpleConsumer`. As `client.id` identifies application, multiple `group.id`'s can have the same `client.id`. It should also help in logs and can be set to the same value as `group.id` in the user config if needed. `group.id` is also used in the commitment of the offsets in zookeeper for `FlinkKafkaConsumer08`. It is checked in `ZookeeperOffsetHandler` to be always set. So if we use it for the `clientId`, `"flink-kafka-consumer-legacy-" + broker.id()` will be ignored and `client.id` will be coupled with `group.id` with no way to use it for application name as original documentation suggests. What do you think? Cheers This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r207971760 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: If it is about client id in Kafka `SimpleConsumer`, why is this option named `group.id` and not `client.id`? As I understand, Flink job can be just one of consumers in a group. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services