Re: Review Request 25995: Patch for KAFKA-1650

2015-02-28 Thread Jiangjie Qin


 On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
  Sorry for the late review. A few more comments below.

Thanks a lot for the review, Jun. We actually have a new design for mirror 
maker based on the flush() call of producer (KAFKA-1865). The design is updated 
in KIP-3 and it is in voting process. It would be great if you can take a look 
at that. We plan to refactor mirror maker after the flush() call is checked in. 
But really appreciate the review and please see the answers below.


 On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 608-616
  https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line608
 
  I don't quite follow the comment on synchronization. Once producer.send 
  is called, the callback can be called anytime. In the callback, we will 
  remove the offset from unackedOffsets. However, the removal is not 
  synchronized on unackedOffsets.

Yes... That's my bad. I somehow missed putting synchronization on the 
removal... I realized this after we decided to move to the new design using 
flush(), so I did not submit follow up patch.


 On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 729-731
  https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line729
 
  Should we synchronize on the removal as well?

Please see above.


 On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 580-582
  https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line580
 
  It's kind of weird that we only catch OOME here. OOME can be thrown in 
  other places as well.

We used to catch all the throwables and exit, but later on we thought it might 
be better to just exit on fatal exception and ignore the other exceptions to 
let offset commit thread move on. So we fail on one commit but the next commit 
might succeed.


 On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 81-92
  https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line81
 
  Isn't NumUnackedMessages the same as UnackedOffsetListSize since they 
  are always modified together?

Good point! We probably should just use the UnackedOffsetListSize. It is more 
clear.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review74677
---


On Dec. 24, 2014, 12:44 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 24, 2014, 12:44 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some 

Re: Review Request 25995: Patch for KAFKA-1650

2015-02-28 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review74677
---


Sorry for the late review. A few more comments below.


core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment121347

Isn't NumUnackedMessages the same as UnackedOffsetListSize since they are 
always modified together?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment121348

It's kind of weird that we only catch OOME here. OOME can be thrown in 
other places as well.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment121349

I don't quite follow the comment on synchronization. Once producer.send is 
called, the callback can be called anytime. In the callback, we will remove the 
offset from unackedOffsets. However, the removal is not synchronized on 
unackedOffsets.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment121350

Should we synchronize on the removal as well?


- Jun Rao


On Dec. 24, 2014, 12:44 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 24, 2014, 12:44 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 set acks=-1 if --no.data.loss is specified.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
 066553cad290c3d3821537a964c7d713c122d9fc 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65874
---


Looks great. Just a few more minor comments + a question that you may have 
missed from the previous round.


core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment109127

I now remember what the reasoning behind this was. We originally decided 
that during rebalances, offset commits _need to_ succeed to reduce duplicates. 
i.e., retry indefinitely if there are failures in offset commits while 
rebalancing. We did not want it to hold up shutdown though. This is why we 
reduced retriesRemaining only if not shutting down.

However, in retrospect I think this change is better. i.e., retry always up 
to retry count. If a user wishes to reduce the probability of duplicates just 
bump up offset commit retry count. Do you agree?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109156

Use valuesIterator instead of values - the reason is that values 
materializes, but the iterator will not; map over the iterator will give 
another iterator. So I'm pretty sure with that sum is computed without 
materializing an entire collection of sizes.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109129

As mentioned in the previous RB: do we need this given that it should be 
almost equivalent to the producer's dropped messages metric?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109130

Configure



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109133

Repeating question from last round:

If block on buffer exhaustion is not turned on, do we still want to shut 
down the mirror maker? i.e., if the user really wants zero data loss they would 
set that to true right?

If it is set to false and the MM exits what purpose does it serve?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109136

offsets



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109135

Sorry I wasn't clear on this. I meant we should probably catch all 
throwables and only exit if OOME.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109137

Capital U
for %s to %d
(generally limit to alphanumeric symbols comma colon and dash for easier 
greps)



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109138

info(Ignoring interrupt while waiting.)



core/src/main/scala/kafka/utils/DoublyLinkedList.scala
https://reviews.apache.org/r/25995/#comment109143

This should ideally be nested static class of DoublyLinkedList and named 
Node



core/src/main/scala/kafka/utils/DoublyLinkedList.scala
https://reviews.apache.org/r/25995/#comment109165

Add @threadsafe annotation above



core/src/main/scala/kafka/utils/DoublyLinkedList.scala
https://reviews.apache.org/r/25995/#comment109162

private



core/src/main/scala/kafka/utils/DoublyLinkedList.scala
https://reviews.apache.org/r/25995/#comment109163

private



core/src/main/scala/kafka/utils/DoublyLinkedList.scala
https://reviews.apache.org/r/25995/#comment109164

private



core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala
https://reviews.apache.org/r/25995/#comment109139

Can you move this to UtilsTest?



core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala
https://reviews.apache.org/r/25995/#comment109147

Should probably also do a removal from a single-entry list.


- Joel Koshy


On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Joel Koshy


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
  https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614
 
  Should this be fatal? i.e., fatal is normally used before exiting 
  (abnormally). WARN would be more suitable.
  
  I don't think it makes sense to not advance the offset here 
  especially if you will still keep sending messages. I think you need to 
  remove it from the unacked offset list. E.g., you may configure your mirror 
  maker producer to only few retries (in which case you are okay with data 
  loss). In this scenario you should just let the error go and allow the 
  mirror maker to proceed normally.
  
  If someone wants zero data loss the MM should be configured with 
  required acks -1 and infinite retries.
  
  Maybe I'm misunderstanding what zero data loss really means - can you 
  clarify? (Especially if someone configures the producer with acks (say) one 
  and limited retries)
 
 Jiangjie Qin wrote:
 That makes sense. So I've changed the code to work in the following way:
 
 1. If retries is set to infinite, the producer will keep retrying and the 
 entire pipeline will finally be blocked. (This is strict data-loss free.)
 2. If retries are not set to infinite, after the retries are exhausted, 
 it will remove the offset from unacked list and record it as a 
 skippedUnackedMessage, which is an exposed metric.
 
 Joel Koshy wrote:
 For (1) will the pipeline be blocked though? i.e., the consumer will 
 continue to send messages to the data channel for all partitions. Also, we 
 will be susceptible to reordering for the partition that is being retried 
 even if maxInFlightRequests is set to one. (check wiki)
 
 It would be useful to very clearly outline the guarantees that the mirror 
 maker provides and under what configurations. e.g., zero data loss, whether 
 there will be duplicates, whether reordering is possible.
 
 Also, it may make sense to add a --lossless flag to the mirror maker that 
 automatically ensures the configs are set correctly:
 * acks to -1 (or all)
 * retries to infinity
 * use new producer
 * block on buffer exhaustion
 * consumer autocommit off
 * anything else that may be needed
 
 This makes it much easier to document and use. What do you think?
 
 Jiangjie Qin wrote:
 The --lossless option is areally good suggestion. I'll add that and 
 document it.
 
 For 1), Yes the pipeline will finally be blocked. Because one of the 
 datachannle queue will eventually full, consumer thread will block on 
 queue.offer(), so the ZookeeperConsumerConnector data chunk queue will not 
 move anymore, then fetcher thread will block on putting data chunk into the 
 data chunk queue. Eventually the entire pipeline will be blocked.
 I don't quite get why we could still have reodering if MaxInflightRequest 
 is set to 1. I understand that when a batch is retried, it is possible that 
 the previous try actually succeeded, so the batch is duplicated as a whole in 
 broker, there might be some reordering cross batches, but within the batch 
 the order should be preserved. Or do you mean the broker could ack only some 
 of the messages in a batch which could cause the batch to be resend thus the 
 messages in the same batch ends up out of order?
 
 By reordering I mean:
 For 2 messages M1 with offset OS1 and message M2 with offset OS2, 
 assuming they are going to the same partition,
 if OS2  OS1, then the time M2 acked by target broker can only be after 
 M1 acked by broker.
 
 Anyway, current offset commit solution does not rely on the in-order 
 delivery anymore. But I think the problem still matters for some other use 
 cases. So I would like to fully understand it.

Sorry - you are right. There won't be reordering with pipelining turned off. 
And I agree with the point about the pipeline getting blocked (i.e., 
eventually).


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
---


On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Jiangjie Qin


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
  394
  https://reviews.apache.org/r/25995/diff/18/?file=797121#file797121line394
 
  if isAutoCommit is true then we will not retry anyway. I think this 
  condition can be removed altogether. i.e., if we are shutting down, then we 
  should probably allow committing offsets up to retryCount. I don't 
  recollect why this was written this way, but I think retrying up to the 
  configured retry count is reasonable on shutdown. Do you agree?

Agree, so we set max retries based on isAutoCommit value and only use retry 
count to control how many times we actually retry. It's clearer.


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 85
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line85
 
  NumUnackedOffsets is a weird metric especially with the presence of 
  NumUnackedMessages. Can you think of a better name? My attempt would be 
  NumPendingSourceOffsets - but I don't like that either.

How about UnackedOffsetListsSize?


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line90
 
  Add comment on what this metric is. Actually do we need this since this 
  will be covered by the producer's dropped metric? As above, this is also a 
  weird mbean to see. Not sure if we can come up with a better name.

I added comments to this metric. I'm kind of relactant from using producer's 
dropped metrics. The reason is that the intention of this metric is diffrent 
from dropped messages, although they have some connection. Also, we could 
potentially have many producers, it would be better if we have a single number 
instead of having to go through multiple metrics. What do you think?


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 198
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line198
 
  Can you move this to the if (useNewProducer) block further down (line 
  230)

This block is here because the consumer property has to be set before the 
consumer instantiation. And offset commit thread could only starts after 
consumer connector is instantiated. It also seems better for offset commit 
thread starts after producer thread starts. That's why we have those spreaded 
if statement...


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 295
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line295
 
  I think the final commitOffsets (on shutdown) should be a reliable 
  commit - i.e., retry up to configured retries if there are commit errors.

Yes, it will, isAutoCommit is set to false. This commitOffsets is defined in 
mirror maker wraps the ZooKeeperConsumerConnector.commitOffsets, where 
isAutoCommit == false. Or do you mean something else?


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 449
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line449
 
  If block on buffer exhaustion is not turned on, do we still want to 
  shut down the mirror maker? i.e., if the user really wants zero data loss 
  they would set that to true right?
  
  If it is set to false and the MM exits what purpose does it serve?

I kind of think that we should let user know that there is something wrong in 
the producer once it occurs. For users not care about zero data loss, they 
probably still want to at least have a working mirror maker. If we just drop 
the message and let producer move on, potentially we can have a running mirror 
maker that only drops messages. In that case, it's probably better to let the 
mirror maker die to indicate something wrong happened. So I'm thinking exits on 
BufferExhaustedException is more from normal operating point of view instead of 
zero data loss point of view.


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 560
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line560
 
  The commit thread has not exited. i.e., this should just be Shutting 
  down mirror maker due to error when committing offsets. I think OOME is 
  the only exception we should really shutdown right? i.e., we should 
  probably let everything else go right?

Agreed.


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 720
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line720
 
  What does node validation skipped mean?

I meant something like whether the node is really in the list... But it 
probably doesn't matter in this case. I'll just remove it.


- Jiangjie



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Jiangjie Qin


 On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90
  https://reviews.apache.org/r/25995/diff/19/?file=799170#file799170line90
 
  Use valuesIterator instead of values - the reason is that values 
  materializes, but the iterator will not; map over the iterator will give 
  another iterator. So I'm pretty sure with that sum is computed without 
  materializing an entire collection of sizes.

There seems no valueIterator for Pool... Maybe
def value = unackedOffsetsMap.iterator.map(unackedOffsets = 
unackedOffsets._2.size).sum ?


 On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 96
  https://reviews.apache.org/r/25995/diff/19/?file=799170#file799170line96
 
  As mentioned in the previous RB: do we need this given that it should 
  be almost equivalent to the producer's dropped messages metric?

Sorry, I just found that I forgot to pubish the comments to previous question. 
I just published it and pastes the comments here:

I added comments to this metric. I'm kind of relactant from using producer's 
dropped metrics. The reason is that the intention of this metric is diffrent 
from dropped messages, although they have some connection. Also, we could 
potentially have many producers, it would be better if we have a single number 
instead of having to go through multiple metrics. What do you think?


 On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
  395
  https://reviews.apache.org/r/25995/diff/19/?file=799168#file799168line395
 
  I now remember what the reasoning behind this was. We originally 
  decided that during rebalances, offset commits _need to_ succeed to reduce 
  duplicates. i.e., retry indefinitely if there are failures in offset 
  commits while rebalancing. We did not want it to hold up shutdown though. 
  This is why we reduced retriesRemaining only if not shutting down.
  
  However, in retrospect I think this change is better. i.e., retry 
  always up to retry count. If a user wishes to reduce the probability of 
  duplicates just bump up offset commit retry count. Do you agree?

Agreed. Thanks for elaborating on this.


 On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 474
  https://reviews.apache.org/r/25995/diff/19/?file=799170#file799170line474
 
  Repeating question from last round:
  
  If block on buffer exhaustion is not turned on, do we still want to 
  shut down the mirror maker? i.e., if the user really wants zero data loss 
  they would set that to true right?
  
  If it is set to false and the MM exits what purpose does it serve?

Copy paste the comments from previous review round.

I kind of think that we should let user know that there is something wrong in 
the producer once it occurs. For users not care about zero data loss, they 
probably still want to at least have a working mirror maker. If we just drop 
the message and let producer move on, potentially we can have a running mirror 
maker that only drops messages. In that case, it's probably better to let the 
mirror maker die to indicate something wrong happened. So I'm thinking exits on 
BufferExhaustedException is more from normal operating point of view instead of 
zero data loss point of view.


 On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/DoublyLinkedList.scala, line 25
  https://reviews.apache.org/r/25995/diff/19/?file=799171#file799171line25
 
  This should ideally be nested static class of DoublyLinkedList and 
  named Node

But I needs to instantiate the node outside of the list. I seems not able to 
access the Node class if the class is nested.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65874
---


On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Joel Koshy


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 85
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line85
 
  NumUnackedOffsets is a weird metric especially with the presence of 
  NumUnackedMessages. Can you think of a better name? My attempt would be 
  NumPendingSourceOffsets - but I don't like that either.
 
 Jiangjie Qin wrote:
 How about UnackedOffsetListsSize?

Yes that sounds better.


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 198
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line198
 
  Can you move this to the if (useNewProducer) block further down (line 
  230)
 
 Jiangjie Qin wrote:
 This block is here because the consumer property has to be set before the 
 consumer instantiation. And offset commit thread could only starts after 
 consumer connector is instantiated. It also seems better for offset commit 
 thread starts after producer thread starts. That's why we have those spreaded 
 if statement...

I thought this was before the instantiation, but maybe I misread. Either way, I 
think your most recent patch looks fine in this regards with the no.data.loss 
option being inspected when instantiating the consumer and producer separately.


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 295
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line295
 
  I think the final commitOffsets (on shutdown) should be a reliable 
  commit - i.e., retry up to configured retries if there are commit errors.
 
 Jiangjie Qin wrote:
 Yes, it will, isAutoCommit is set to false. This commitOffsets is defined 
 in mirror maker wraps the ZooKeeperConsumerConnector.commitOffsets, where 
 isAutoCommit == false. Or do you mean something else?

Yes you are right. I missed that it is a wrapper.


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 449
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line449
 
  If block on buffer exhaustion is not turned on, do we still want to 
  shut down the mirror maker? i.e., if the user really wants zero data loss 
  they would set that to true right?
  
  If it is set to false and the MM exits what purpose does it serve?
 
 Jiangjie Qin wrote:
 I kind of think that we should let user know that there is something 
 wrong in the producer once it occurs. For users not care about zero data 
 loss, they probably still want to at least have a working mirror maker. If we 
 just drop the message and let producer move on, potentially we can have a 
 running mirror maker that only drops messages. In that case, it's probably 
 better to let the mirror maker die to indicate something wrong happened. So 
 I'm thinking exits on BufferExhaustedException is more from normal operating 
 point of view instead of zero data loss point of view.

Ok - we can leave it as is and revisit if necessary.

If the MM exits on buffer exhaustion then the user will know and will bump up 
memory. OTOH some users may be okay with dropping messages at peak traffic. I'm 
not sure which is better.


 On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90
  https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line90
 
  Add comment on what this metric is. Actually do we need this since this 
  will be covered by the producer's dropped metric? As above, this is also a 
  weird mbean to see. Not sure if we can come up with a better name.
 
 Jiangjie Qin wrote:
 I added comments to this metric. I'm kind of relactant from using 
 producer's dropped metrics. The reason is that the intention of this metric 
 is diffrent from dropped messages, although they have some connection. Also, 
 we could potentially have many producers, it would be better if we have a 
 single number instead of having to go through multiple metrics. What do you 
 think?

Ok sounds good.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65759
---


On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Joel Koshy


 On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90
  https://reviews.apache.org/r/25995/diff/19/?file=799170#file799170line90
 
  Use valuesIterator instead of values - the reason is that values 
  materializes, but the iterator will not; map over the iterator will give 
  another iterator. So I'm pretty sure with that sum is computed without 
  materializing an entire collection of sizes.
 
 Jiangjie Qin wrote:
 There seems no valueIterator for Pool... Maybe
 def value = unackedOffsetsMap.iterator.map(unackedOffsets = 
 unackedOffsets._2.size).sum ?

Ah yes - forgot that this is a pool. Your edit should work, although can you 
use case inside the map? i.e., `...iterator.map { case(topicPartition, 
unackedOffsets) = ...`


 On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/DoublyLinkedList.scala, line 25
  https://reviews.apache.org/r/25995/diff/19/?file=799171#file799171line25
 
  This should ideally be nested static class of DoublyLinkedList and 
  named Node
 
 Jiangjie Qin wrote:
 But I needs to instantiate the node outside of the list. I seems not able 
 to access the Node class if the class is nested.

Yes you are right - I was in Java world for a minute. You can accomplish 
something similar by defining an object DoublyLinkedList and defining a Node 
class inside that, but that is almost equivalent to what you have so ignore my 
comment here.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65874
---


On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Addressed Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala 
 PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 23, 2014, 3:04 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Missed some change in the prvevious patch submission, submit patch again.


change offset commit thread to use scheduler.


Addressed Joel's comments.


Incorporated Joel's comments


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
  core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
066553cad290c3d3821537a964c7d713c122d9fc 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65981
---

Ship it!


Minor observation below.

Also, mirror maker has become a bloated file. I think we can do some 
refactoring and split it up - will need a new package for that. I can file a 
jira.


core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109286

I think this might be confusing. i.e., if we have an explicit argument for 
no data loss then it should do everything needed to ensure that. I'll proceed 
with checking in this as is but we should discuss this further.


- Joel Koshy


On Dec. 23, 2014, 3:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
 066553cad290c3d3821537a964c7d713c122d9fc 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Jiangjie Qin


 On Dec. 23, 2014, 11:53 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 58
  https://reviews.apache.org/r/25995/diff/20/?file=799494#file799494line58
 
  I think this might be confusing. i.e., if we have an explicit argument 
  for no data loss then it should do everything needed to ensure that. I'll 
  proceed with checking in this as is but we should discuss this further.

Or can we maybe do this: If the user did not specify acks in there own 
settings, we use ack=all, otherwise, use whatever user has provided. In this 
case, if user want to override acks to achieve performance, they can still do 
that. What do you think?


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65981
---


On Dec. 23, 2014, 3:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
 066553cad290c3d3821537a964c7d713c122d9fc 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Jiangjie Qin


 On Dec. 23, 2014, 11:53 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 58
  https://reviews.apache.org/r/25995/diff/20/?file=799494#file799494line58
 
  I think this might be confusing. i.e., if we have an explicit argument 
  for no data loss then it should do everything needed to ensure that. I'll 
  proceed with checking in this as is but we should discuss this further.
 
 Jiangjie Qin wrote:
 Or can we maybe do this: If the user did not specify acks in there own 
 settings, we use ack=all, otherwise, use whatever user has provided. In this 
 case, if user want to override acks to achieve performance, they can still do 
 that. What do you think?

Thought about it again. You are right, if user does not want to use 
--no.data.loss, they can specify the settings by themselves. I'll submit 
another patch overriding the acks=all if the flag is set.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65981
---


On Dec. 23, 2014, 3:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
 066553cad290c3d3821537a964c7d713c122d9fc 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Joel Koshy


 On Dec. 23, 2014, 11:53 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 58
  https://reviews.apache.org/r/25995/diff/20/?file=799494#file799494line58
 
  I think this might be confusing. i.e., if we have an explicit argument 
  for no data loss then it should do everything needed to ensure that. I'll 
  proceed with checking in this as is but we should discuss this further.
 
 Jiangjie Qin wrote:
 Or can we maybe do this: If the user did not specify acks in there own 
 settings, we use ack=all, otherwise, use whatever user has provided. In this 
 case, if user want to override acks to achieve performance, they can still do 
 that. What do you think?
 
 Jiangjie Qin wrote:
 Thought about it again. You are right, if user does not want to use 
 --no.data.loss, they can specify the settings by themselves. I'll submit 
 another patch overriding the acks=all if the flag is set.

ok


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65981
---


On Dec. 23, 2014, 3:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 23, 2014, 3:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
 066553cad290c3d3821537a964c7d713c122d9fc 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 24, 2014, 12:44 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Missed some change in the prvevious patch submission, submit patch again.


change offset commit thread to use scheduler.


Addressed Joel's comments.


Incorporated Joel's comments


set acks=-1 if --no.data.loss is specified.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
  core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
066553cad290c3d3821537a964c7d713c122d9fc 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65993
---

Ship it!


Ship It!

- Joel Koshy


On Dec. 24, 2014, 12:44 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 24, 2014, 12:44 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 set acks=-1 if --no.data.loss is specified.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
 066553cad290c3d3821537a964c7d713c122d9fc 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-22 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65759
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment109001

if isAutoCommit is true then we will not retry anyway. I think this 
condition can be removed altogether. i.e., if we are shutting down, then we 
should probably allow committing offsets up to retryCount. I don't recollect 
why this was written this way, but I think retrying up to the configured retry 
count is reasonable on shutdown. Do you agree?



core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment109002

Since this is a Java compatility wrapper you need to use java.util.Map and 
convert that to a scala map.

Alternatively we can remove this altogether since it is only used 
semi-internally (by the mirror maker). However, I think it would be good to add 
this here with the fix.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109003

val scheduler = new KafkaScheduler

and later on

scheduler.startup

(since the scheduler does nothing until it is started up)



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109004

NumUnackedMessages



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109006

NumUnackedOffsets is a weird metric especially with the presence of 
NumUnackedMessages. Can you think of a better name? My attempt would be 
NumPendingSourceOffsets - but I don't like that either.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109005

`unackedOffsetMap.valuesIterator.map(unackedOffsets = 
unackedOffsets.size).sum`



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109015

Add comment on what this metric is. Actually do we need this since this 
will be covered by the producer's dropped metric? As above, this is also a 
weird mbean to see. Not sure if we can come up with a better name.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109016

Can you move this to the if (useNewProducer) block further down (line 230)



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109017

I think the final commitOffsets (on shutdown) should be a reliable commit - 
i.e., retry up to configured retries if there are commit errors.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109029

If block on buffer exhaustion is not turned on, do we still want to shut 
down the mirror maker? i.e., if the user really wants zero data loss they would 
set that to true right?

If it is set to false and the MM exits what purpose does it serve?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109018

Capital C



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109019

Committing offsets: (also, should we just do this trace message in consumer 
connector instead? Right now it is not there, but I think it should)



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109020

The commit thread has not exited. i.e., this should just be Shutting 
down mirror maker due to error when committing offsets. I think OOME is the 
only exception we should really shutdown right? i.e., we should probably let 
everything else go right?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109021

Line 522: Minor nit - how about naming this just unackedOffset (instead of 
offsetNode)?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109023

Updating offset to commit for %s to %d. (Note that [%s] will give you 
[[topic, partition]])



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109024

Should probably also catch interrupted exception



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109025

if (customRebalanceListener.isDefined)



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109027

Should (ideally) be volatile since it is reported by the gauge.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment109026

What does node validation skipped mean?


- Joel Koshy


On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote:
 
 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-22 Thread Joel Koshy


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489
 
  Why not use KafkaScheduler for the offset commit task?
 
 Jiangjie Qin wrote:
 Haven't thought that before... But it looks that we need to do some more 
 handling when something wrong happen in the offset commit threads. The 
 KafkaScheduler code seems not do so.
 
 Joel Koshy wrote:
 So you can make the task itself catch throwables. So it would look 
 something like this:
 
 scheduler.schedule(mirrorMakerOffsetsCommiter, commitTask, ...)
 
 And in commitTask:
 try {
   commitOffsets()
 }
 catch {
   case t: Throwable =
 // handle
 }
 
 That said, I don't think connector.commitOffsets will throw anything - 
 since we catch all throwables there.
 
 The only additional detail is that after you shutdown the scheduler you 
 will need to call commitOffsets() manually one last time.
 
 Jiangjie Qin wrote:
 I changed the code to use scheduler, it seems that the try catch block 
 only handles the kafka based offset commit and it did not include 
 ensureOffsetsManager connected. Also, theoretically OOM could be thrown when 
 create a super big offsetmap, so I kept the catch block to make mirror maker 
 exit in that case.

Not sure what you mean by only handles the Kafka based offset commit and it 
did not include ensureOffsetsManagerConnected


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65306
---


On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 19, 2014, 7:41 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-22 Thread Joel Koshy


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
  https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614
 
  Should this be fatal? i.e., fatal is normally used before exiting 
  (abnormally). WARN would be more suitable.
  
  I don't think it makes sense to not advance the offset here 
  especially if you will still keep sending messages. I think you need to 
  remove it from the unacked offset list. E.g., you may configure your mirror 
  maker producer to only few retries (in which case you are okay with data 
  loss). In this scenario you should just let the error go and allow the 
  mirror maker to proceed normally.
  
  If someone wants zero data loss the MM should be configured with 
  required acks -1 and infinite retries.
  
  Maybe I'm misunderstanding what zero data loss really means - can you 
  clarify? (Especially if someone configures the producer with acks (say) one 
  and limited retries)
 
 Jiangjie Qin wrote:
 That makes sense. So I've changed the code to work in the following way:
 
 1. If retries is set to infinite, the producer will keep retrying and the 
 entire pipeline will finally be blocked. (This is strict data-loss free.)
 2. If retries are not set to infinite, after the retries are exhausted, 
 it will remove the offset from unacked list and record it as a 
 skippedUnackedMessage, which is an exposed metric.

For (1) will the pipeline be blocked though? i.e., the consumer will continue 
to send messages to the data channel for all partitions. Also, we will be 
susceptible to reordering for the partition that is being retried even if 
maxInFlightRequests is set to one. (check wiki)

It would be useful to very clearly outline the guarantees that the mirror maker 
provides and under what configurations. e.g., zero data loss, whether there 
will be duplicates, whether reordering is possible.

Also, it may make sense to add a --lossless flag to the mirror maker that 
automatically ensures the configs are set correctly:
* acks to -1 (or all)
* retries to infinity
* use new producer
* block on buffer exhaustion
* consumer autocommit off
* anything else that may be needed

This makes it much easier to document and use. What do you think?


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 668
  https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line668
 
  Should we make this a generic DoublyLinkedList data structure in utils 
  or some other suitable place and unit test it as well?
 
 Jiangjie Qin wrote:
 I'm not sure if this is generic enough to put it into utils. This raw 
 linked list seems only serve the purpose of removing/inserting node in the 
 middle in O(1), which cannot be achieved in java linkedlist. Maybe we can 
 keep it here now. And if later on there are some other use cases, we can 
 refactor the code to create a raw LinkedList in utils and use that one. What 
 do you think?

Sort of agree, but only to some degree. The main issue is that you cannot unit 
test this - unless we come up with mirror maker unit tests that have 100 
percent code coverage of this utility. That's why it's generally a sign that 
the utility needs to be moved out so that it can be tested thoroughly.
http://stackoverflow.com/questions/3353318/how-do-i-test-local-inner-class-methods-in-java


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
---


On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 19, 2014, 7:41 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Joel Koshy


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
  338
  https://reviews.apache.org/r/25995/diff/13/?file=793041#file793041line338
 
  Can we make this a reliable commit - i.e., with retries up to the 
  configured retry count? The policy is retry on commit errors during 
  rebalance or shutdown, no need to retry on commit errors during 
  auto-commits.
  
  So for e.g., if a mirror maker rebalances and there is simultaneously 
  offset manager movement we would need to retry the commit.
  
  This is the motivation for the isAutoCommit flag - however, there seems 
  to be a bug right now which maybe you can fix. i.e., if this is not an 
  auto-commit then set retries to configured retries else no retries.
 
 Jiangjie Qin wrote:
 Changed the code based you your suggestion. My original thinking is that 
 in mirror maker one commit failure actually does not matter too much because 
 next commit will succeed if the failure is due to offset topic leader 
 migration, etc. But for a more general purpose, it probably should retry if 
 it is not an auto commit.

I was thinking more about the shutdown and rebalance cases. We ideally want the 
commits to be reliable for those cases.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 192
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line192
 
  Why do you need a dummy param?
 
 Jiangjie Qin wrote:
 Because the Utils.createObjct needs a args parameter and if we pass in a 
 null it will give an NPE...
 I've changed the code in Utils to allow us to pass in a null which use 
 the no arg constructor.

See comment in latest RB.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489
 
  Why not use KafkaScheduler for the offset commit task?
 
 Jiangjie Qin wrote:
 Haven't thought that before... But it looks that we need to do some more 
 handling when something wrong happen in the offset commit threads. The 
 KafkaScheduler code seems not do so.

So you can make the task itself catch throwables. So it would look something 
like this:

scheduler.schedule(mirrorMakerOffsetsCommiter, commitTask, ...)

And in commitTask:
try {
  commitOffsets()
}
catch {
  case t: Throwable =
// handle
}

That said, I don't think connector.commitOffsets will throw anything - since we 
catch all throwables there.

The only additional detail is that after you shutdown the scheduler you will 
need to call commitOffsets() manually one last time.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line614
 
  There might be a small memory leak here: if there was an error though 
  (in the branch above) it seems no one removes the offset from the list.
 
 Jiangjie Qin wrote:
 Yes, that is a memory leak, but in this case we should not commit the 
 offset of the message that was not sent successfully either. If any exception 
 occurs then the offsets will not advance anymore. We probably should have an 
 alert on the mirror consumer lags.

Hmm.. not sure if I'm on the same page here. See comments on new RB.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 705
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line705
 
  You need to also set tail.next = null (or None)
 
 Jiangjie Qin wrote:
 tail.next = null will be handled in the previous if...else... the old 
 tail.prev.next will become new tail.next, which will be null.

Makes sense.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 666
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line666
 
  Should we expose a size() method - i.e., increment on add and decrement 
  on remove. We can aggregate the size of all the offset lists outside and 
  emit a gauge. That will give us some assurance that there are no 
  forgotten offsets. Re: the potential leak mentioned above.
  
  In fact, I'm a bit nervous about correctness since this is a custom 
  implementation of a semi-non-trivial data structure. We should probably 
  even assert that it is empty when numMessageUnacked goes to zero as part of 
  the rebalance.
  
  Ideally, these custom implementations need a full-fledged unit test.
 
 Jiangjie Qin wrote:
 That's a good point, we probably need a metric to see if we have some 
 stuck offsets. But those stuck offsets should also not be committed anyways. 
 We need to be alerted on that situation once it happens. Maybe add an 
 assertion on the exception block where stuck 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment108686

This should be like the previous code - i.e., 1 + ...

E.g., if config.offsetsCommitMaxRetries is one, then we can have two 
attempts. In this version at most one attempt will be made.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment108687

Actually we should fix this. If this is the final commit during shutting 
down (and NOT autocommit) then we need to retry on error up to retries 
remaining.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108691

Can you use a named parameter here? i.e., `valueFactory = Some(...`



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108690

Should this be fatal? i.e., fatal is normally used before exiting 
(abnormally). WARN would be more suitable.

I don't think it makes sense to not advance the offset here especially if 
you will still keep sending messages. I think you need to remove it from the 
unacked offset list. E.g., you may configure your mirror maker producer to only 
few retries (in which case you are okay with data loss). In this scenario you 
should just let the error go and allow the mirror maker to proceed normally.

If someone wants zero data loss the MM should be configured with required 
acks -1 and infinite retries.

Maybe I'm misunderstanding what zero data loss really means - can you 
clarify? (Especially if someone configures the producer with acks (say) one and 
limited retries)



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108689

Should we make this a generic DoublyLinkedList data structure in utils or 
some other suitable place and unit test it as well?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108688

Does not seem to be used. (but it probably should be used)



core/src/main/scala/kafka/utils/Utils.scala
https://reviews.apache.org/r/25995/#comment108692

I don't think this is necessary right? i.e., args.map won't throw an NPE if 
you don't provide any additional arguments.

scala def f(args: Int*) {println(args.size)}
f: (args: Int*)Unit

scala f(1,2)
2

scala f()
0


- Joel Koshy


On Dec. 17, 2014, 8:29 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 17, 2014, 8:29 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 2:48 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
  https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614
 
  Should this be fatal? i.e., fatal is normally used before exiting 
  (abnormally). WARN would be more suitable.
  
  I don't think it makes sense to not advance the offset here 
  especially if you will still keep sending messages. I think you need to 
  remove it from the unacked offset list. E.g., you may configure your mirror 
  maker producer to only few retries (in which case you are okay with data 
  loss). In this scenario you should just let the error go and allow the 
  mirror maker to proceed normally.
  
  If someone wants zero data loss the MM should be configured with 
  required acks -1 and infinite retries.
  
  Maybe I'm misunderstanding what zero data loss really means - can you 
  clarify? (Especially if someone configures the producer with acks (say) one 
  and limited retries)

That makes sense. So I've changed the code to work in the following way:

1. If retries is set to infinite, the producer will keep retrying and the 
entire pipeline will finally be blocked. (This is strict data-loss free.)
2. If retries are not set to infinite, after the retries are exhausted, it will 
remove the offset from unacked list and record it as a skippedUnackedMessage, 
which is an exposed metric.


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 668
  https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line668
 
  Should we make this a generic DoublyLinkedList data structure in utils 
  or some other suitable place and unit test it as well?

I'm not sure if this is generic enough to put it into utils. This raw linked 
list seems only serve the purpose of removing/inserting node in the middle in 
O(1), which cannot be achieved in java linkedlist. Maybe we can keep it here 
now. And if later on there are some other use cases, we can refactor the code 
to create a raw LinkedList in utils and use that one. What do you think?


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/Utils.scala, line 441
  https://reviews.apache.org/r/25995/diff/14/?file=795053#file795053line441
 
  I don't think this is necessary right? i.e., args.map won't throw an 
  NPE if you don't provide any additional arguments.
  
  scala def f(args: Int*) {println(args.size)}
  f: (args: Int*)Unit
  
  scala f(1,2)
  2
  
  scala f()
  0

Yes, it seems to be working.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
---


On Dec. 19, 2014, 2:48 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 19, 2014, 2:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 6:17 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 6:53 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Missed some change in the prvevious patch submission, submit patch again.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 7:41 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Missed some change in the prvevious patch submission, submit patch again.


change offset commit thread to use scheduler.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489
 
  Why not use KafkaScheduler for the offset commit task?
 
 Jiangjie Qin wrote:
 Haven't thought that before... But it looks that we need to do some more 
 handling when something wrong happen in the offset commit threads. The 
 KafkaScheduler code seems not do so.
 
 Joel Koshy wrote:
 So you can make the task itself catch throwables. So it would look 
 something like this:
 
 scheduler.schedule(mirrorMakerOffsetsCommiter, commitTask, ...)
 
 And in commitTask:
 try {
   commitOffsets()
 }
 catch {
   case t: Throwable =
 // handle
 }
 
 That said, I don't think connector.commitOffsets will throw anything - 
 since we catch all throwables there.
 
 The only additional detail is that after you shutdown the scheduler you 
 will need to call commitOffsets() manually one last time.

I changed the code to use scheduler, it seems that the try catch block only 
handles the kafka based offset commit and it did not include 
ensureOffsetsManager connected. Also, theoretically OOM could be thrown when 
create a super big offsetmap, so I kept the catch block to make mirror maker 
exit in that case.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65306
---


On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 19, 2014, 7:41 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-17 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65306
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment108396

Can we make this a reliable commit - i.e., with retries up to the 
configured retry count? The policy is retry on commit errors during rebalance 
or shutdown, no need to retry on commit errors during auto-commits.

So for e.g., if a mirror maker rebalances and there is simultaneously 
offset manager movement we would need to retry the commit.

This is the motivation for the isAutoCommit flag - however, there seems to 
be a bug right now which maybe you can fix. i.e., if this is not an auto-commit 
then set retries to configured retries else no retries.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108400

Let us inline the valueFactory. i.e.,

`new Pool[TopicAndPartition, OffsetList](valueFactory = Some((k: 
TopicAndPartition) = new OffsetList))`



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108417

numUnackedMessages



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108421

Can you update the comment? I don't see any rebalance latch



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108410

This is a slightly misleading comment. i.e., we do allow a single message 
that may exceed this right? It is hard to document succinctly though.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108403

Why do you need a dummy param?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108404

This is a matter of taste, but personally I prefer using Options in these 
scenarios. i.e., instead of null use Null. It is less error prone when writing 
code since you cannot invoke directly without doing a get first so it forces 
you to remember to check whether it is defined or not.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108405

We should probably just say periodically commits consumed offsets. since 
it could commit to zookeeper as well.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108406

We should probably just say periodically commits consumed offsets. since 
it could commit to zookeeper as well.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108409

You _must_ also override the auto.commit.enable to false in the consumer 
config. The default is true.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108407

isCleanShutdown is not defined



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108408

should only be..



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108411

private



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108414

Do we need this counter? i.e., there is already a commit meter in the 
consumer connector.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108412

Why not use KafkaScheduler for the offset commit task?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108413

commitOffsets



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108415

For clarity and consistency (with MMRecord) can we call it 
sourceTopicPartition; sourceOffset?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108423

sourceTopicPartition...



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108416

// synchronize to ensure that addOffset precedes removeOffset



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108419

There might be a small memory leak here: if there was an error though (in 
the branch above) it seems no one removes the offset from the list.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108420

one message (not 1 message).
Rule in writing: 0-9 should be spelled, = 10 can be numeric.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108422

Shouldn't this be further up? Specifically, in the synchronized block after 
the while loop.

Also, this is not named very suitably 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-17 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 17, 2014, 8:29 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
77d951d13b8d8ad00af40257fe51623cc2caa61a 
  core/src/main/scala/kafka/utils/Utils.scala 
738c1af9ef5de16fdf5130daab69757a14c48b5c 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-17 Thread Jiangjie Qin


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
  338
  https://reviews.apache.org/r/25995/diff/13/?file=793041#file793041line338
 
  Can we make this a reliable commit - i.e., with retries up to the 
  configured retry count? The policy is retry on commit errors during 
  rebalance or shutdown, no need to retry on commit errors during 
  auto-commits.
  
  So for e.g., if a mirror maker rebalances and there is simultaneously 
  offset manager movement we would need to retry the commit.
  
  This is the motivation for the isAutoCommit flag - however, there seems 
  to be a bug right now which maybe you can fix. i.e., if this is not an 
  auto-commit then set retries to configured retries else no retries.

Changed the code based you your suggestion. My original thinking is that in 
mirror maker one commit failure actually does not matter too much because next 
commit will succeed if the failure is due to offset topic leader migration, 
etc. But for a more general purpose, it probably should retry if it is not an 
auto commit.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 124
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line124
 
  This is a slightly misleading comment. i.e., we do allow a single 
  message that may exceed this right? It is hard to document succinctly 
  though.

I think the description seems clear enough for mirror maker. We do have 
comments in ByteBoundedBlockingQueue about the one more message exceeding the 
capacity.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 192
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line192
 
  Why do you need a dummy param?

Because the Utils.createObjct needs a args parameter and if we pass in a null 
it will give an NPE...
I've changed the code in Utils to allow us to pass in a null which use the no 
arg constructor.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 477
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line477
 
  Do we need this counter? i.e., there is already a commit meter in the 
  consumer connector.

Right, maybe we don't need it.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489
 
  Why not use KafkaScheduler for the offset commit task?

Haven't thought that before... But it looks that we need to do some more 
handling when something wrong happen in the offset commit threads. The 
KafkaScheduler code seems not do so.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line614
 
  There might be a small memory leak here: if there was an error though 
  (in the branch above) it seems no one removes the offset from the list.

Yes, that is a memory leak, but in this case we should not commit the offset of 
the message that was not sent successfully either. If any exception occurs then 
the offsets will not advance anymore. We probably should have an alert on the 
mirror consumer lags.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 658
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line658
 
  Note earlier comment on null vs None

As LinkedList seems to be a quite standard data structure, keeping it use null 
probably makes the code cleaner.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 666
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line666
 
  Should we expose a size() method - i.e., increment on add and decrement 
  on remove. We can aggregate the size of all the offset lists outside and 
  emit a gauge. That will give us some assurance that there are no 
  forgotten offsets. Re: the potential leak mentioned above.
  
  In fact, I'm a bit nervous about correctness since this is a custom 
  implementation of a semi-non-trivial data structure. We should probably 
  even assert that it is empty when numMessageUnacked goes to zero as part of 
  the rebalance.
  
  Ideally, these custom implementations need a full-fledged unit test.

That's a good point, we probably need a metric to see if we have some stuck 
offsets. But those stuck offsets should also not be committed anyways. We need 
to be alerted on that situation once it happens. Maybe add an assertion on the 
exception block where stuck offset occurs will be better?


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-16 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 16, 2014, 4:03 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
77d951d13b8d8ad00af40257fe51623cc2caa61a 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-09 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64505
---


Haven't looked at your new patch yet. It seems that the following tests start 
to fail after your commit. They seem to pass if run individually, but always 
fail if you run all unit tests. Could you take a look and at least understand 
the cause?

kafka.server.ServerShutdownTest  testCleanShutdown FAILED
java.lang.NullPointerException
at 
kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
at 
kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
at 
scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
at 
scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
at 
kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
at 
kafka.server.ServerShutdownTest.testCleanShutdown(ServerShutdownTest.scala:101)

kafka.server.ServerShutdownTest  testConsecutiveShutdown PASSED

kafka.server.ServerShutdownTest  testCleanShutdownWithDeleteTopicEnabled FAILED
java.lang.NullPointerException
at 
kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
at 
kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
at 
scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
at 
scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
at 
kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
at 
kafka.server.ServerShutdownTest.testCleanShutdownWithDeleteTopicEnabled(ServerShutdownTest.scala:114)

kafka.server.ServerShutdownTest  testCleanShutdownAfterFailedStartup FAILED
java.lang.NullPointerException
at 
kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
at 
kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
at 
scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
at 
scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
at 
kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
at 
kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartup(ServerShutdownTest.scala:141)

- Jun Rao


On Dec. 8, 2014, 9:36 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 8, 2014, 9:36 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-09 Thread Jiangjie Qin


 On Dec. 10, 2014, 5:09 a.m., Jun Rao wrote:
  Haven't looked at your new patch yet. It seems that the following tests 
  start to fail after your commit. They seem to pass if run individually, but 
  always fail if you run all unit tests. Could you take a look and at least 
  understand the cause?
  
  kafka.server.ServerShutdownTest  testCleanShutdown FAILED
  java.lang.NullPointerException
  at 
  kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
  at 
  kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
  at 
  scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
  at 
  scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
  at 
  scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at 
  scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at 
  scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
  at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
  at 
  kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
  at 
  kafka.server.ServerShutdownTest.testCleanShutdown(ServerShutdownTest.scala:101)
  
  kafka.server.ServerShutdownTest  testConsecutiveShutdown PASSED
  
  kafka.server.ServerShutdownTest  testCleanShutdownWithDeleteTopicEnabled 
  FAILED
  java.lang.NullPointerException
  at 
  kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
  at 
  kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
  at 
  scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
  at 
  scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
  at 
  scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at 
  scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at 
  scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
  at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
  at 
  kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
  at 
  kafka.server.ServerShutdownTest.testCleanShutdownWithDeleteTopicEnabled(ServerShutdownTest.scala:114)
  
  kafka.server.ServerShutdownTest  testCleanShutdownAfterFailedStartup FAILED
  java.lang.NullPointerException
  at 
  kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
  at 
  kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
  at 
  scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
  at 
  scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
  at 
  scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at 
  scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at 
  scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
  at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
  at 
  kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
  at 
  kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartup(ServerShutdownTest.scala:141)

KAFKA-1815 is opened for the failures. It turns out because I forgot to 
shutdown consumer in ZookeeperConsumerConnector in rebalance listener test. So 
it causes later on server shutdown test failure. Really sorry for this... I 
could not reproduce the issue on my desktop probably because it is fast. I 
actually found that I could reproduce it on my laptop.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64505
---


On Dec. 8, 2014, 9:36 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 8, 2014, 9:36 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-08 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 8, 2014, 9:36 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
b06ff6000183b257005b5ac3ccc7ba8976f1ab8d 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-07 Thread Jun Rao


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
  https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529
 
  Does that imply there is always an offset? Is that always true?
  
  I don't quite follow how the logic works. Since the offset for each 
  target partition is updated independantly, I am not sure why you can rely 
  on checking that those offsets are consecutive. Also, does this logic work 
  when there is partitioning key?
  
  It would be useful to add some comments to describe why a two-level 
  offset map is needed.
 
 Jiangjie Qin wrote:
 The iterator only iterates over the key that exits in the map, i.e. 
 offset exists.
 
 It does seem confusing without explanation... Will it be clearer if the 
 following comments are added?
 
 Following is the offset commit logic:
 We know that:
 1. Messages from same source partition end up in the same data channel 
 queue in order and will be sent by the same producer.
 2. Messages sent by the same producer could go to different target 
 partitions even if the messages are from the same source partition. 
 3. The order is only guaranteed for messages sent to the same target 
 partition. That means a (SourceTopicPartition, TargetPartition) combination 
 is needed.
 4. For each (SourceTopicParitition, TargetPartition), keeping track of a 
 single offset is sufficient, because if an offset is acked, all the offset 
 smaller than that offset going to the same target partition must have been 
 sent successfully (MaxInFlightRequest=1). That said, if we have multiple 
 producers, after sorting all the last acked offsets of target partitions 
 which are corresponding to the same source partition, we can commit the 
 offsets from the smallest until the acked offset is no longer consecutive. 
 (But we do need to set send retries to be infinite in producer config, 
 otherwise this won't work. I'll add it to the comments.)
 
 Based on above logic, we could use Map(SourceTopicParitition, 
 TargetPartition), offset to track the offset. But because the offset commit 
 is based on source topic partitions, it is easier to have a 
 MapSourceTopicPartition, MapTargetPatition, offset to find the offset to 
 commit. That's why there is a 2-level map.
 
 The logic above does not rely on any message key, so it works in general 
 for both keyed/non-keyed messages.

The part that's not clear to me is until the acked offset is no longer 
consecutive. Suppose there are 2 target partitions. Offset 1,2,4,5 can be 
routed to partition 1 and offset 3 can be routed to partition 2. When all 
messages are sent, you will see partition 1 with offset 5 and partition 2 with 
offset 3. Since offset 3 and 5 are not consecutive, are you just going to 
commit offset 3? If so, you may be stuck in offset 3 forever if no new messages 
 are sent to partition 2.

Also, does this work with a log configured with compact? Such a log may not 
have consecutive offsets to begin with.


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 614-618
  https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line614
 
  This kind of coordination doesn't quite work. Suppose that we set 
  inRebalance to true and entered the while loop. However, just before we 
  call inRelanace.wait(), the producer could have finished sending all data 
  and called inRebalance.notify(). Then we will be stuck in 
  inRebalance.wait() forever since we missed the notification.
  
  One way to do that is to create a lock that protects the read/write of 
  numMessageUnacked. Then we use a condition created from the lock to do the 
  coordination. This way, both the wait/notification and the update/check of 
  numMessageUnacked are protected by the same lock.
 
 Jiangjie Qin wrote:
 Yes, it is clearer to just use a single lock. I'll change the code.
 
 But the current code itself seems to work, please corret me if I'm wrong.
 In this case we essentially guarantees that [enable notify, 
 numMessagesUnacked  0 then wait] is atomic to notify. So either notify is 
 not enabled yet or it occurs after rebalance listener starts to wait. So the 
 notify will not be missed.
 If rebalance listener grabs the inRebalance lock first and is in while 
 loop then it won't release the inRebalance lock until it enters wait. Because 
 producer have to grab inRebalance lock before calling inReblance.notify(), 
 the producer will not be able to call inRebalance.notify() until the offset 
 commit thread starts to wait. 
 There are some more complicated sequences when producers call notify 
 mutilple times, but as long as 
 1.[numMessageUnacked  0 then wait] is atomic to notify, and
 2.[numMessageUnacked = 0] happens before notify
 notify should not be missed.

Got it. I missed the part that we 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-07 Thread Jiangjie Qin


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
  https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529
 
  Does that imply there is always an offset? Is that always true?
  
  I don't quite follow how the logic works. Since the offset for each 
  target partition is updated independantly, I am not sure why you can rely 
  on checking that those offsets are consecutive. Also, does this logic work 
  when there is partitioning key?
  
  It would be useful to add some comments to describe why a two-level 
  offset map is needed.
 
 Jiangjie Qin wrote:
 The iterator only iterates over the key that exits in the map, i.e. 
 offset exists.
 
 It does seem confusing without explanation... Will it be clearer if the 
 following comments are added?
 
 Following is the offset commit logic:
 We know that:
 1. Messages from same source partition end up in the same data channel 
 queue in order and will be sent by the same producer.
 2. Messages sent by the same producer could go to different target 
 partitions even if the messages are from the same source partition. 
 3. The order is only guaranteed for messages sent to the same target 
 partition. That means a (SourceTopicPartition, TargetPartition) combination 
 is needed.
 4. For each (SourceTopicParitition, TargetPartition), keeping track of a 
 single offset is sufficient, because if an offset is acked, all the offset 
 smaller than that offset going to the same target partition must have been 
 sent successfully (MaxInFlightRequest=1). That said, if we have multiple 
 producers, after sorting all the last acked offsets of target partitions 
 which are corresponding to the same source partition, we can commit the 
 offsets from the smallest until the acked offset is no longer consecutive. 
 (But we do need to set send retries to be infinite in producer config, 
 otherwise this won't work. I'll add it to the comments.)
 
 Based on above logic, we could use Map(SourceTopicParitition, 
 TargetPartition), offset to track the offset. But because the offset commit 
 is based on source topic partitions, it is easier to have a 
 MapSourceTopicPartition, MapTargetPatition, offset to find the offset to 
 commit. That's why there is a 2-level map.
 
 The logic above does not rely on any message key, so it works in general 
 for both keyed/non-keyed messages.
 
 Jun Rao wrote:
 The part that's not clear to me is until the acked offset is no longer 
 consecutive. Suppose there are 2 target partitions. Offset 1,2,4,5 can be 
 routed to partition 1 and offset 3 can be routed to partition 2. When all 
 messages are sent, you will see partition 1 with offset 5 and partition 2 
 with offset 3. Since offset 3 and 5 are not consecutive, are you just going 
 to commit offset 3? If so, you may be stuck in offset 3 forever if no new 
 messages  are sent to partition 2.
 
 Also, does this work with a log configured with compact? Such a log may 
 not have consecutive offsets to begin with.

I see, you are right. In that case it won't work. How about the following 
algorithm:
1. We keep a MapSourceTopicPartition, (SetOffset, MaxOffsetInSet)
2. When a message is sent, producer put its offset into the map based on source 
topic partition, and update the MaxOffsetInSet if necessary
3. In the producer callback, remove the offset from the Setoffset if the 
message was sent successfully
4. When offset commit thread comes, it gets the smallest offset in the 
Setoffset and commit this one. If the offset is empty, it commits 
MaxOffsetInSet+1.

This algorithm should be able to handle compacted partitions as well.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
---


On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 7, 2014, 2:59 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-07 Thread Jun Rao


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
  https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529
 
  Does that imply there is always an offset? Is that always true?
  
  I don't quite follow how the logic works. Since the offset for each 
  target partition is updated independantly, I am not sure why you can rely 
  on checking that those offsets are consecutive. Also, does this logic work 
  when there is partitioning key?
  
  It would be useful to add some comments to describe why a two-level 
  offset map is needed.
 
 Jiangjie Qin wrote:
 The iterator only iterates over the key that exits in the map, i.e. 
 offset exists.
 
 It does seem confusing without explanation... Will it be clearer if the 
 following comments are added?
 
 Following is the offset commit logic:
 We know that:
 1. Messages from same source partition end up in the same data channel 
 queue in order and will be sent by the same producer.
 2. Messages sent by the same producer could go to different target 
 partitions even if the messages are from the same source partition. 
 3. The order is only guaranteed for messages sent to the same target 
 partition. That means a (SourceTopicPartition, TargetPartition) combination 
 is needed.
 4. For each (SourceTopicParitition, TargetPartition), keeping track of a 
 single offset is sufficient, because if an offset is acked, all the offset 
 smaller than that offset going to the same target partition must have been 
 sent successfully (MaxInFlightRequest=1). That said, if we have multiple 
 producers, after sorting all the last acked offsets of target partitions 
 which are corresponding to the same source partition, we can commit the 
 offsets from the smallest until the acked offset is no longer consecutive. 
 (But we do need to set send retries to be infinite in producer config, 
 otherwise this won't work. I'll add it to the comments.)
 
 Based on above logic, we could use Map(SourceTopicParitition, 
 TargetPartition), offset to track the offset. But because the offset commit 
 is based on source topic partitions, it is easier to have a 
 MapSourceTopicPartition, MapTargetPatition, offset to find the offset to 
 commit. That's why there is a 2-level map.
 
 The logic above does not rely on any message key, so it works in general 
 for both keyed/non-keyed messages.
 
 Jun Rao wrote:
 The part that's not clear to me is until the acked offset is no longer 
 consecutive. Suppose there are 2 target partitions. Offset 1,2,4,5 can be 
 routed to partition 1 and offset 3 can be routed to partition 2. When all 
 messages are sent, you will see partition 1 with offset 5 and partition 2 
 with offset 3. Since offset 3 and 5 are not consecutive, are you just going 
 to commit offset 3? If so, you may be stuck in offset 3 forever if no new 
 messages  are sent to partition 2.
 
 Also, does this work with a log configured with compact? Such a log may 
 not have consecutive offsets to begin with.
 
 Jiangjie Qin wrote:
 I see, you are right. In that case it won't work. How about the following 
 algorithm:
 1. We keep a MapSourceTopicPartition, (SetOffset, MaxOffsetInSet)
 2. When a message is sent, producer put its offset into the map based on 
 source topic partition, and update the MaxOffsetInSet if necessary
 3. In the producer callback, remove the offset from the Setoffset if 
 the message was sent successfully
 4. When offset commit thread comes, it gets the smallest offset in the 
 Setoffset and commit this one. If the offset is empty, it commits 
 MaxOffsetInSet+1.
 
 This algorithm should be able to handle compacted partitions as well.

Yes, but getting the smallest offset from a set can be expensive. I was 
thinking that we can put the offset of all outstanding messages in a linked 
list in source offset order. Every produced message will be taken out of the 
linked list. We also maintain a maxOffsetSeen. The commit offset will be the 
offset in the first link if the linked list is not empty. Otherwise, it will be 
maxOffsetSeen.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
---


On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 7, 2014, 2:59 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-07 Thread Jiangjie Qin


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
  https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529
 
  Does that imply there is always an offset? Is that always true?
  
  I don't quite follow how the logic works. Since the offset for each 
  target partition is updated independantly, I am not sure why you can rely 
  on checking that those offsets are consecutive. Also, does this logic work 
  when there is partitioning key?
  
  It would be useful to add some comments to describe why a two-level 
  offset map is needed.
 
 Jiangjie Qin wrote:
 The iterator only iterates over the key that exits in the map, i.e. 
 offset exists.
 
 It does seem confusing without explanation... Will it be clearer if the 
 following comments are added?
 
 Following is the offset commit logic:
 We know that:
 1. Messages from same source partition end up in the same data channel 
 queue in order and will be sent by the same producer.
 2. Messages sent by the same producer could go to different target 
 partitions even if the messages are from the same source partition. 
 3. The order is only guaranteed for messages sent to the same target 
 partition. That means a (SourceTopicPartition, TargetPartition) combination 
 is needed.
 4. For each (SourceTopicParitition, TargetPartition), keeping track of a 
 single offset is sufficient, because if an offset is acked, all the offset 
 smaller than that offset going to the same target partition must have been 
 sent successfully (MaxInFlightRequest=1). That said, if we have multiple 
 producers, after sorting all the last acked offsets of target partitions 
 which are corresponding to the same source partition, we can commit the 
 offsets from the smallest until the acked offset is no longer consecutive. 
 (But we do need to set send retries to be infinite in producer config, 
 otherwise this won't work. I'll add it to the comments.)
 
 Based on above logic, we could use Map(SourceTopicParitition, 
 TargetPartition), offset to track the offset. But because the offset commit 
 is based on source topic partitions, it is easier to have a 
 MapSourceTopicPartition, MapTargetPatition, offset to find the offset to 
 commit. That's why there is a 2-level map.
 
 The logic above does not rely on any message key, so it works in general 
 for both keyed/non-keyed messages.
 
 Jun Rao wrote:
 The part that's not clear to me is until the acked offset is no longer 
 consecutive. Suppose there are 2 target partitions. Offset 1,2,4,5 can be 
 routed to partition 1 and offset 3 can be routed to partition 2. When all 
 messages are sent, you will see partition 1 with offset 5 and partition 2 
 with offset 3. Since offset 3 and 5 are not consecutive, are you just going 
 to commit offset 3? If so, you may be stuck in offset 3 forever if no new 
 messages  are sent to partition 2.
 
 Also, does this work with a log configured with compact? Such a log may 
 not have consecutive offsets to begin with.
 
 Jiangjie Qin wrote:
 I see, you are right. In that case it won't work. How about the following 
 algorithm:
 1. We keep a MapSourceTopicPartition, (SetOffset, MaxOffsetInSet)
 2. When a message is sent, producer put its offset into the map based on 
 source topic partition, and update the MaxOffsetInSet if necessary
 3. In the producer callback, remove the offset from the Setoffset if 
 the message was sent successfully
 4. When offset commit thread comes, it gets the smallest offset in the 
 Setoffset and commit this one. If the offset is empty, it commits 
 MaxOffsetInSet+1.
 
 This algorithm should be able to handle compacted partitions as well.
 
 Jun Rao wrote:
 Yes, but getting the smallest offset from a set can be expensive. I was 
 thinking that we can put the offset of all outstanding messages in a linked 
 list in source offset order. Every produced message will be taken out of the 
 linked list. We also maintain a maxOffsetSeen. The commit offset will be the 
 offset in the first link if the linked list is not empty. Otherwise, it will 
 be maxOffsetSeen.

Yes, this solution should have no bottleneck. I'll write a raw linked list. I 
was thinking that offset commit is infrequent and hopefuly we do not have many 
pending messages for a partition at a given point.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
---


On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 7, 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-06 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
---


Thanks for the patch. Some comments below.


core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment106601

Should this api be in ConsumerConnector?



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment106604

Should this be part of the public api in ConsumerConnector?

Also, could we restruct the api a bit better? It seems that the public api 
should just be commitOffsets(offsetMap) since we know this is manual offset 
commit. We can let the other api commitOffsets(isAutoCommit) gets the internal 
offsets and pass them to commitOffsets(offsetMap).



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment106606

If we expect consumerConfig to be a singleton, we should just use 
options.valueOf(), instead of valuesOf().head.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment106605

This is really weird. We get a customized listener from the consumer 
config, but expects the listener to implement a special class 
MirrorMakerConsumerRelabanceListener, instead of the standard 
ConsumerRebalanceListener. It's probably better to get this from a MirrorMaker 
input param that defaults to MirrorMakerConsumerRelabanceListener. We can then 
add a description if custoimzation is needed, which class the customized 
implmenetation needs to extend from.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment106623

Does that imply there is always an offset? Is that always true?

I don't quite follow how the logic works. Since the offset for each target 
partition is updated independantly, I am not sure why you can rely on checking 
that those offsets are consecutive. Also, does this logic work when there is 
partitioning key?

It would be useful to add some comments to describe why a two-level offset 
map is needed.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment106622

It's weird to extend from NewShinyProducer but not using its method. 
Perhaps it will be clearer if we just let MirrorMakerNewProducer implement 
MirrorMakerBaseProducer.

Ditto for MirrorMakerOldProducer.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment106621

This kind of coordination doesn't quite work. Suppose that we set 
inRebalance to true and entered the while loop. However, just before we call 
inRelanace.wait(), the producer could have finished sending all data and called 
inRebalance.notify(). Then we will be stuck in inRebalance.wait() forever since 
we missed the notification.

One way to do that is to create a lock that protects the read/write of 
numMessageUnacked. Then we use a condition created from the lock to do the 
coordination. This way, both the wait/notification and the update/check of 
numMessageUnacked are protected by the same lock.


- Jun Rao


On Dec. 4, 2014, 7:59 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 4, 2014, 7:59 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-06 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 7, 2014, 2:59 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-06 Thread Jiangjie Qin


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  Thanks for the patch. Some comments below.

Thank you very much for the review.


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
  https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529
 
  Does that imply there is always an offset? Is that always true?
  
  I don't quite follow how the logic works. Since the offset for each 
  target partition is updated independantly, I am not sure why you can rely 
  on checking that those offsets are consecutive. Also, does this logic work 
  when there is partitioning key?
  
  It would be useful to add some comments to describe why a two-level 
  offset map is needed.

The iterator only iterates over the key that exits in the map, i.e. offset 
exists.

It does seem confusing without explanation... Will it be clearer if the 
following comments are added?

Following is the offset commit logic:
We know that:
1. Messages from same source partition end up in the same data channel queue in 
order and will be sent by the same producer.
2. Messages sent by the same producer could go to different target partitions 
even if the messages are from the same source partition. 
3. The order is only guaranteed for messages sent to the same target partition. 
That means a (SourceTopicPartition, TargetPartition) combination is needed.
4. For each (SourceTopicParitition, TargetPartition), keeping track of a single 
offset is sufficient, because if an offset is acked, all the offset smaller 
than that offset going to the same target partition must have been sent 
successfully (MaxInFlightRequest=1). That said, if we have multiple producers, 
after sorting all the last acked offsets of target partitions which are 
corresponding to the same source partition, we can commit the offsets from the 
smallest until the acked offset is no longer consecutive. (But we do need to 
set send retries to be infinite in producer config, otherwise this won't work. 
I'll add it to the comments.)

Based on above logic, we could use Map(SourceTopicParitition, 
TargetPartition), offset to track the offset. But because the offset commit is 
based on source topic partitions, it is easier to have a 
MapSourceTopicPartition, MapTargetPatition, offset to find the offset to 
commit. That's why there is a 2-level map.

The logic above does not rely on any message key, so it works in general for 
both keyed/non-keyed messages.


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 558-559
  https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line558
 
  It's weird to extend from NewShinyProducer but not using its method. 
  Perhaps it will be clearer if we just let MirrorMakerNewProducer implement 
  MirrorMakerBaseProducer.
  
  Ditto for MirrorMakerOldProducer.

I was trying to avoid duplicate code seems not saving too much code. I'll get 
rid of the extention.


 On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 614-618
  https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line614
 
  This kind of coordination doesn't quite work. Suppose that we set 
  inRebalance to true and entered the while loop. However, just before we 
  call inRelanace.wait(), the producer could have finished sending all data 
  and called inRebalance.notify(). Then we will be stuck in 
  inRebalance.wait() forever since we missed the notification.
  
  One way to do that is to create a lock that protects the read/write of 
  numMessageUnacked. Then we use a condition created from the lock to do the 
  coordination. This way, both the wait/notification and the update/check of 
  numMessageUnacked are protected by the same lock.

Yes, it is clearer to just use a single lock. I'll change the code.

But the current code itself seems to work, please corret me if I'm wrong.
In this case we essentially guarantees that [enable notify, numMessagesUnacked 
 0 then wait] is atomic to notify. So either notify is not enabled yet or it 
occurs after rebalance listener starts to wait. So the notify will not be 
missed.
If rebalance listener grabs the inRebalance lock first and is in while loop 
then it won't release the inRebalance lock until it enters wait. Because 
producer have to grab inRebalance lock before calling inReblance.notify(), the 
producer will not be able to call inRebalance.notify() until the offset commit 
thread starts to wait. 
There are some more complicated sequences when producers call notify mutilple 
times, but as long as 
1.[numMessageUnacked  0 then wait] is atomic to notify, and
2.[numMessageUnacked = 0] happens before notify
notify should not be missed.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 3, 2014, 11:02 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 4, 2014, 3:02 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin


 On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
  https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613
 
  I think there may be a race condition here, for example consider this 
  sequence:
  
  1. data channel only contain one message.
  2. producer take the message from channel.
  3. dataChannel.clear() called.
  4. numMessageUnacked.get() == 0, offsets committed.
  5. producer.send() called, increment numMessageUnacked.
  6. data duplicate happens when the rebalance finished.
  
  I think on line 599 we should use while instead of if, but this 
  alone does not fix this.
 
 Jiangjie Qin wrote:
 Yes, I actually have comment on this race condition in line 581. The 
 reason I'm not handling it here is:
 1. The chance of this situation is very slight.
 2. A single duplicate message does not really hurt.
 3. The fix increase the complexity of the code (looking into the producer 
 thread status) and I'm not sure if it worth doing.
 4. Even if we fix this, from the producer side, duplicates could still 
 happen.
 
 Guozhang Wang wrote:
 Shall we change line 691 to while (numMessageUnacked.get()  0) at 
 least?

Yes, it should be a while loop, forgot to change it...


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
---


On Dec. 4, 2014, 3:02 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 4, 2014, 3:02 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 da29a8cb461099eb675161db2f11a9937424a5c6 
   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
 PRE-CREATION 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9d5a47fb8e04d0055cce820afde7f73affc0a984 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
 6a85d7e494f6c88798133a17f6180b61029dff58 
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin


 On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
  https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613
 
  I think there may be a race condition here, for example consider this 
  sequence:
  
  1. data channel only contain one message.
  2. producer take the message from channel.
  3. dataChannel.clear() called.
  4. numMessageUnacked.get() == 0, offsets committed.
  5. producer.send() called, increment numMessageUnacked.
  6. data duplicate happens when the rebalance finished.
  
  I think on line 599 we should use while instead of if, but this 
  alone does not fix this.

Yes, I actually have comment on this race condition in line 581. The reason I'm 
not handling it here is:
1. The chance of this situation is very slight.
2. A single duplicate message does not really hurt.
3. The fix increase the complexity of the code (looking into the producer 
thread status) and I'm not sure if it worth doing.
4. Even if we fix this, from the producer side, duplicates could still happen.


 On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 96
  https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line96
 
  Could you add a comment here about the in-flight-request config and its 
  effects?

The comments was put at the very beginning with a note. Maybe we can put a 
comment referring to that note.


 On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 186
  https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line186
 
  I am wondering are there some scenarios we want to allow customized 
  rebalance listener? Also if we decide to make this customizable we need to 
  make it clear that the customized listener would expect the datachannel as 
  its constructor since this is not checked at compile time.

Yes, we do foresee some usecases for this customized rebalance listener. I'll 
add the following comments:
Customized consumer rebalance listener should extends 
MirrorMakerConsumerRebalanceListener and take datachannel as argument.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
---


On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Nov. 24, 2014, 4:15 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
 PRE-CREATION 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9d5a47fb8e04d0055cce820afde7f73affc0a984 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
 6a85d7e494f6c88798133a17f6180b61029dff58 
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Guozhang Wang


 On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
  https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613
 
  I think there may be a race condition here, for example consider this 
  sequence:
  
  1. data channel only contain one message.
  2. producer take the message from channel.
  3. dataChannel.clear() called.
  4. numMessageUnacked.get() == 0, offsets committed.
  5. producer.send() called, increment numMessageUnacked.
  6. data duplicate happens when the rebalance finished.
  
  I think on line 599 we should use while instead of if, but this 
  alone does not fix this.
 
 Jiangjie Qin wrote:
 Yes, I actually have comment on this race condition in line 581. The 
 reason I'm not handling it here is:
 1. The chance of this situation is very slight.
 2. A single duplicate message does not really hurt.
 3. The fix increase the complexity of the code (looking into the producer 
 thread status) and I'm not sure if it worth doing.
 4. Even if we fix this, from the producer side, duplicates could still 
 happen.

Shall we change line 691 to while (numMessageUnacked.get()  0) at least?


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
---


On Dec. 3, 2014, 11:02 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 3, 2014, 11:02 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 da29a8cb461099eb675161db2f11a9937424a5c6 
   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
 PRE-CREATION 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9d5a47fb8e04d0055cce820afde7f73affc0a984 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
 6a85d7e494f6c88798133a17f6180b61029dff58 
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
---



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment105980

Could you add a comment here about the in-flight-request config and its 
effects?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment105982

I am wondering are there some scenarios we want to allow customized 
rebalance listener? Also if we decide to make this customizable we need to make 
it clear that the customized listener would expect the datachannel as its 
constructor since this is not checked at compile time.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment105984

I think there may be a race condition here, for example consider this 
sequence:

1. data channel only contain one message.
2. producer take the message from channel.
3. dataChannel.clear() called.
4. numMessageUnacked.get() == 0, offsets committed.
5. producer.send() called, increment numMessageUnacked.
6. data duplicate happens when the rebalance finished.

I think on line 599 we should use while instead of if, but this alone 
does not fix this.


- Guozhang Wang


On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Nov. 24, 2014, 4:15 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
 PRE-CREATION 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9d5a47fb8e04d0055cce820afde7f73affc0a984 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
 6a85d7e494f6c88798133a17f6180b61029dff58 
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63878
---

Ship it!


Ship It!

- Guozhang Wang


On Dec. 4, 2014, 3:02 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 4, 2014, 3:02 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 da29a8cb461099eb675161db2f11a9937424a5c6 
   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
 PRE-CREATION 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9d5a47fb8e04d0055cce820afde7f73affc0a984 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
 6a85d7e494f6c88798133a17f6180b61029dff58 
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 4, 2014, 7:59 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-11-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Nov. 24, 2014, 4:15 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-11-20 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Nov. 20, 2014, 8 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-11-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review61528
---



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103712

based on the hash value of the source topic-partitonId string



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103221

Could you make this consistent with others as offset.commit.internal.ms?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103713

In this case, for MaxInFlightRequests  1 we are not only at the risk of 
message reordering but also at the risk of message loss upon failures right?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103714

Actually, can we move this comment into the top comment of the MM class as 
a NOTE under the producer paragraph?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103715

Is it possible that the current chunk has been consumed completely and the 
fetcher thread has yet put in a new chunk, and hence hasNext() will return 
false? If this case shall we stop the consumer or just let it block?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103716

Since we already has the logIdent as threadName, I think we do not need 
this.getName here?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103718

With the logIdent it will become:

FATAL [mirrormaker-offset-commit-thread] Offset commit thread exits due to 
...

which is a bit verbose?


- Guozhang Wang


On Nov. 12, 2014, 5:51 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Nov. 12, 2014, 5:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-11-17 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Nov. 18, 2014, 2:44 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
f476973eeff653473a60c3ecf36e870e386536bc 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-11-17 Thread Jiangjie Qin


 On Nov. 17, 2014, 9:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 338
  https://reviews.apache.org/r/25995/diff/4/?file=760097#file760097line338
 
  Is it possible that the current chunk has been consumed completely and 
  the fetcher thread has yet put in a new chunk, and hence hasNext() will 
  return false? If this case shall we stop the consumer or just let it block?

I dont' think it is possible. If consuemr time out is set to -1, the hasNext() 
seems only return false when:
1. Shutdown message received.
2. Imcomplete message (which we should probably exit)
Otherwise it will block until the next data chunk is put into the queue.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review61528
---


On Nov. 18, 2014, 2:44 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Nov. 18, 2014, 2:44 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 f476973eeff653473a60c3ecf36e870e386536bc 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-11-12 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Nov. 12, 2014, 5:51 p.m.)


Review request for kafka.


Summary (updated)
-

Patch for KAFKA-1650


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
fbc680fde21b02f11285a4f4b442987356abd17b 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-10-07 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review55612
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment95974

Do we need to add = here?



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/25995/#comment95975

We should keep the changes of KAFKA-1647 in its only RB and do not merge 
them here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment95978

Could we add some introduction comment here on:

1. The architecture of the MM: producer / consumer thread, data channel per 
producer thread, offset commit thread, and how different modules interact with 
each other.
2. Why we need a separate offset commit thread, and how it works.
3. The startup / shutdown process, like which modules to start / shutdown 
first (this could be moved to the head of the corresponding functions also).



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment95979

Embedded consumer config for consuming from the source cluster.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment95980

Embedded producer config for producing to the target cluster.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment95981

The offset commit thread periodically commit consumed offsets to the 
source cluster. With the new producer, the offsets are updated upon the 
returned future metadata of the send() call; with the old producer, the offsets 
are updated upon the consumer's iterator advances. By doing this, it is 
guaranteed no data loss even when mirror maker is uncleanly shutdown with the 
new producer, while with the old producer messages inside the data channel 
could be lost upon mirror maker unclean shutdown.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96019

numMessageCapacity and byteCapacity? numGetters and numPutters 
(since the producer is the consumer of this buffer and vice versa)?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96021

How about MirrorMaker-DataChannel-queue%d-NumMessages and 
MirrorMaker-DataChannel-queue%d-Bytes? and variable name 
channelNumMessageHists and channelByteHists?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96020

Can we define put(record, queueId) and put(record), and the latter includes 
the logic of determining the queueId and then call the former?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96022

comment on why we use the hashCode of source topic / partition here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96026

Instead of letting the consumer to check on the global shutdown flag, could 
we just add a shutdown function which sets it own flag like the producer thread 
and the commit thread? Then the process of the shutdown becomes

consumers.shutdown
consumers.awaitShutdown
producers.shutdown
producers.awaitShutdown
committer.shutdown
committer.awaitShutdown
connector.shutdown



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96023

Maybe just // if it exits accidentally, stop the entire mirror maker as 
we did below?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96024

// if it exits accidentally, stop the entire mirror maker



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96025

// the committed offset will be the first offset of the un-consumed 
message, hence we need to increment by one.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/25995/#comment96027

queueNumItemCapacity and queueByteCapacity?


- Guozhang Wang


On Oct. 6, 2014, 5:20 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Oct. 6, 2014, 5:20 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 Talked with Joel and decided to remove multi connector support as people can 
 always creat multiple mirror maker instances if they want to consumer from 
 multiple clusters.
 
 
 Diffs
 -
 
   

Re: Review Request 25995: Patch for KAFKA-1650

2014-10-06 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Oct. 6, 2014, 5:17 p.m.)


Review request for kafka.


Bugs: KAFKA-1650
https://issues.apache.org/jira/browse/KAFKA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
fbc680fde21b02f11285a4f4b442987356abd17b 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
b8698ee1469c8fbc92ccc176d916eb3e28b87867 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-10-06 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Oct. 6, 2014, 5:20 p.m.)


Review request for kafka.


Bugs: KAFKA-1650
https://issues.apache.org/jira/browse/KAFKA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.

Talked with Joel and decided to remove multi connector support as people can 
always creat multiple mirror maker instances if they want to consumer from 
multiple clusters.


Diffs
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
fbc680fde21b02f11285a4f4b442987356abd17b 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
b8698ee1469c8fbc92ccc176d916eb3e28b87867 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-09-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review54620
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment94830

Do we need to do this check every time in the loop?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94831

no need empty line here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94832

No need bracket



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94833

No need bracket



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94835

Maximum bytes that can be buffered in the data channels



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94834

in terms of bytes



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94836

Inconsistency indentation.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94838

Capitalize: Offset commit interval in ms



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94841

Do you need to turn off auto commit on the consumer threads here?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94840

We can add some more comment here, explaning:

1) why we add the offset commit thread for new producer, but not old 
producer;

2) what risks does the old producer have (for not having offset commit 
thread).



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94842

For clean shutdown, you need to

1) halt consumer threads first.

2) wait for producer to drain all the messages in data channel.

3) manually commit offsets on consumer threads.

4) shut down consumer threads.

Otherwise we will have data duplicates as we commit offsets based on min.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94844

queueId



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94846

How about having a histogram for each queue instead of getting the sum? The 
update call would be a bit less expensive and we can monitor if some queues are 
empty while others get all the data.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94847

Ditto above.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94849

Add comments explaining why we force an unclean shutdown with System.exit 
here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94857

Unfortunately this may not be the case, as we can have multiple connectors 
which are using different consumer configs with different group ids. We need to 
either 1) change the config settings to enforce this to be true, or 2) use a 
separate offset client that remembers which topics belongs to which groups.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94858

Capitalize first word



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94859

Capitalize first word



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94863

Adding comment to the logic of how this works. Also a few questions:

1) is the map() call synchronized with other threads putting new offsets 
into the map?

2) after the sorting, the logic may be clearer as

val commitableOffsetIndex = 0
while (offsets[commitableOffsetIndex] - offsets.head == 
commitableOffsetIndex) commitableOffsetIndex += 1

offsetToCommit = offsets[commitableOffsetIndex] + 1



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94855

The send().get() call is missing.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/25995/#comment94853

Apache header missing.


- Guozhang Wang


On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Sept. 24, 2014, 4:26 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 
 
 Repository: kafka
 
 
 Description
 

Re: Review Request 25995: Patch for KAFKA-1650

2014-09-25 Thread Jiangjie Qin


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
  299
  https://reviews.apache.org/r/25995/diff/1/?file=704523#file704523line299
 
  Do we need to do this check every time in the loop?

Maybe we can put this check out of while loop but that would probably introduce 
more duplicate code. Since the offset commit is not that frequent and the retry 
is hopefully rare, it should not create much overhead.


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 168
  https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line168
 
  Do you need to turn off auto commit on the consumer threads here?

I thought offset commit should be turned off in the consumer config. Is that 
the case?


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 207
  https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line207
 
  For clean shutdown, you need to
  
  1) halt consumer threads first.
  
  2) wait for producer to drain all the messages in data channel.
  
  3) manually commit offsets on consumer threads.
  
  4) shut down consumer threads.
  
  Otherwise we will have data duplicates as we commit offsets based on 
  min.

Talked to Guozhang, changed the process to be as below:
1. shutdown consumer threads.
2. shutdown producer
3. commit offsets
4. shutdown consumer connectors


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 447
  https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line447
 
  Adding comment to the logic of how this works. Also a few questions:
  
  1) is the map() call synchronized with other threads putting new 
  offsets into the map?
  
  2) after the sorting, the logic may be clearer as
  
  val commitableOffsetIndex = 0
  while (offsets[commitableOffsetIndex] - offsets.head == 
  commitableOffsetIndex) commitableOffsetIndex += 1
  
  offsetToCommit = offsets[commitableOffsetIndex] + 1

We are using a concurrent map, that guarantees a single put/get operation is 
atomic. Although its possible that the offset we get for different partitions 
might reflects different time point's value. But it should not matter that much 
because later commits will get updated value. And the offset we commit when 
exiting is guaranteed to be after the producer is shutdown. So I think the 
commits during running time does not needs to be 100% accurate.


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 483
  https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line483
 
  The send().get() call is missing.

I put it in side the put().


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review54620
---


On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Sept. 24, 2014, 4:26 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 mirror maker redesign; adding byte bounded blocking queue.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Review Request 25995: Patch for KAFKA-1650

2014-09-24 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

Review request for kafka.


Bugs: KAFKA-1650
https://issues.apache.org/jira/browse/KAFKA-1650


Repository: kafka


Description
---

mirror maker redesign; adding byte bounded blocking queue.


Diffs
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
fbc680fde21b02f11285a4f4b442987356abd17b 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
b8698ee1469c8fbc92ccc176d916eb3e28b87867 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin