[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-13 Thread GitBox
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify 
clientId to groupId in flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#discussion_r209553402
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -123,6 +124,7 @@ public SimpleConsumerThread(
this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
 
 Review comment:
   I think we can just replace this line in the current PR:
   ```
   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
   ```
   with these 2 lines:
   ```
   String groupId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
   this.clientId = config.getProperty("client.id", groupId);
   ```
   `Group id` will be default value for `client id` then, as in Kafka docs, but 
user can change both of them independently if user ever needs it. It should 
also help in logs with multiple topics.
   `"flink-kafka-consumer-legacy-" + broker.id()` will be just default value 
for `group.id`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-10 Thread GitBox
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify 
clientId to groupId in flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#discussion_r209180335
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -123,6 +124,7 @@ public SimpleConsumerThread(
this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
 
 Review comment:
   I think it is better, but this PR change is in old consumer, I would make 
`group.id` default value for `client.id`, same as in Kafka docs. @aljoscha?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-09 Thread GitBox
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify 
clientId to groupId in flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#discussion_r208965558
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -123,6 +124,7 @@ public SimpleConsumerThread(
this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
 
 Review comment:
   @maqingxiang 
   Now I see your point, I think it makes sense to have `group.id` as a default 
value for `client.id` but keep them separately configurable, e.g.:
   ```
   String groupId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
   this.clientId = config.getProperty("client.id", groupId);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-08 Thread GitBox
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify 
clientId to groupId in flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#discussion_r208522780
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -123,6 +124,7 @@ public SimpleConsumerThread(
this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
 
 Review comment:
   Hi @maqingxiang,
   
   I would suggest to name it `client.id` as in its original description, 
because it is used in `SimpleConsumer` eventually in this case and 
`FlinkKafkaConsumer08` is based on the `SimpleConsumer`. As `client.id` 
identifies application, multiple `group.id`'s can have the same `client.id`. It 
should also help in logs and can be set to the same value as `group.id` in the 
user config if needed. 
   
   `group.id` is also used in the commitment of the offsets in zookeeper for 
`FlinkKafkaConsumer08`. It is checked in `ZookeeperOffsetHandler` to be always 
set. So if we use it for the `clientId`, `"flink-kafka-consumer-legacy-" + 
broker.id()` will be ignored and `client.id` will be coupled with `group.id` 
with no way to use it for application name as original documentation suggests.
   
   What do you think?
   
   Cheers


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-06 Thread GitBox
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify 
clientId to groupId in flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#discussion_r207971760
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -123,6 +124,7 @@ public SimpleConsumerThread(
this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
 
 Review comment:
   If it is about client id in Kafka `SimpleConsumer`, why is this option named 
`group.id` and not `client.id`? As I understand, Flink job can be just one of 
consumers in a group.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services