Re: Review Request 35231: Fix KAFKA-1740

2015-06-17 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
---



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 558)


Did you intend to actually rethrow or just log?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (lines 229 - 
236)


In manual group management, would we expect consumerId and generationId to 
be null?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (line 280)


This looks like the wrong ticket.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (line 285)


Same as above, wrong ticket.


- Jason Gustafson


On June 8, 2015, 11:12 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> ---
> 
> (Updated June 8, 2015, 11:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
> https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move offset manager to coordinator, add validation logic for offset commit 
> and fetch
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
>  70844d65369f6ff300cbeb513dbb6650050c7eec 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  deec1fa480d5a5c5884a1c007b076aa64e902472 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  512a0ef7e619d54e74122c38119209f5cf9590e3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> 730a232482fdf77be5704cdf5941cfab3828db88 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
> 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala 
> ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 51e89c87ee2c20fc7f976536f01fa1055fb8e670 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
> c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
> 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
> c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 
> 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 
> 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 35231: Fix KAFKA-1740

2015-06-11 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review87583
---



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala


This can happen in two ways:
1. An automatic group management (subscribes to topics) consumer that sends 
an OffsetCommitRequest whose groupId hashes to the coordinator but hasn't first 
done a join group.
2. A manual group management (subscribes to partitions) consumer that sends 
an OffsetCommitRequest whose groupId hashes to the coordinator.

Should these be distinguishable? We can do this with an added flag in 
OffsetCommitRequest.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala


Same as the earlier comment but for OffsetFetchRequests.

This can happen in two ways:
1. An automatic group management (subscribes to topics) consumer that sends 
an OffsetFetchRequest whose groupId hashes to the coordinator but hasn't first 
done a join group.
2. A manual group management (subscribes to partitions) consumer that sends 
an OffsetFetchRequest whose groupId hashes to the coordinator.

Should these be distinguishable? We can do this with an added flag in 
OffsetFetchRequest.


- Onur Karaman


On June 8, 2015, 11:12 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> ---
> 
> (Updated June 8, 2015, 11:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
> https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move offset manager to coordinator, add validation logic for offset commit 
> and fetch
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
>  70844d65369f6ff300cbeb513dbb6650050c7eec 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  deec1fa480d5a5c5884a1c007b076aa64e902472 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  512a0ef7e619d54e74122c38119209f5cf9590e3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> 730a232482fdf77be5704cdf5941cfab3828db88 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
> 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala 
> ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 51e89c87ee2c20fc7f976536f01fa1055fb8e670 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
> c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
> 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
> c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 
> 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 
> 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 35231: Fix KAFKA-1740

2015-06-08 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review87107
---


I'll try to do a more detailed review soon. I just want to get this one comment 
out of the way.


core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala


I don't think the ConsumerCoordinator constructor should change.

The coordinator just needs a KafkaConfig, ZkClient, and OffsetManager in 
order to work. It shouldn't care about ReplicaManagers, KakfaSchedulers, or 
OffsetManagerConfigs.

Passing in these lower-level dependencies makes testing the 
ConsumerCoordinator harder. As an example, think about how 
https://reviews.apache.org/r/35086/diff/ would work with this change. You'd 
have to mock out dependencies(ReplicaManager) that are a level deeper than what 
you actually care about(OffsetManager) with the hope that OffsetManager would 
react nicely to your mock instead of directly mocking the OffsetManager itself.


- Onur Karaman


On June 8, 2015, 11:12 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> ---
> 
> (Updated June 8, 2015, 11:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
> https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move offset manager to coordinator, add validation logic for offset commit 
> and fetch
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
>  70844d65369f6ff300cbeb513dbb6650050c7eec 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  deec1fa480d5a5c5884a1c007b076aa64e902472 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  512a0ef7e619d54e74122c38119209f5cf9590e3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> 730a232482fdf77be5704cdf5941cfab3828db88 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
> 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala 
> ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 51e89c87ee2c20fc7f976536f01fa1055fb8e670 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
> c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
> 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
> c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 
> 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 
> 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 35231: Fix KAFKA-1740

2015-06-08 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/
---

(Updated June 8, 2015, 11:12 p.m.)


Review request for kafka.


Bugs: KAFKA-1740
https://issues.apache.org/jira/browse/KAFKA-1740


Repository: kafka


Description
---

Move offset manager to coordinator, add validation logic for offset commit and 
fetch


Diffs
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 70844d65369f6ff300cbeb513dbb6650050c7eec 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java 
deec1fa480d5a5c5884a1c007b076aa64e902472 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 
512a0ef7e619d54e74122c38119209f5cf9590e3 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  core/src/main/scala/kafka/admin/TopicCommand.scala 
dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
  core/src/main/scala/kafka/cluster/Partition.scala 
730a232482fdf77be5704cdf5941cfab3828db88 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
6b4242c7cd1df9b3465db0fec35a25102c76cd60 
  core/src/main/scala/kafka/common/Topic.scala 
ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
51e89c87ee2c20fc7f976536f01fa1055fb8e670 
  core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
  core/src/main/scala/kafka/server/KafkaApis.scala 
d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
  core/src/main/scala/kafka/server/KafkaServer.scala 
b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
  core/src/main/scala/kafka/server/OffsetManager.scala 
5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
  core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
  core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
c7136f20972614ac47aa57ab13e3c94ef775a4b7 
  core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 
4f124af5c3e946045a78ad1519c37372a72c8985 
  core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 
08854c5e6ec249368206298b2ac2623df18f266a 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
528525b719ec916e16f8b3ae3715bec4b5dcc47d 

Diff: https://reviews.apache.org/r/35231/diff/


Testing
---


Thanks,

Guozhang Wang