Re: Review Request 25995: Patch for KAFKA-1650
On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote: Sorry for the late review. A few more comments below. Thanks a lot for the review, Jun. We actually have a new design for mirror maker based on the flush() call of producer (KAFKA-1865). The design is updated in KIP-3 and it is in voting process. It would be great if you can take a look at that. We plan to refactor mirror maker after the flush() call is checked in. But really appreciate the review and please see the answers below. On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 608-616 https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line608 I don't quite follow the comment on synchronization. Once producer.send is called, the callback can be called anytime. In the callback, we will remove the offset from unackedOffsets. However, the removal is not synchronized on unackedOffsets. Yes... That's my bad. I somehow missed putting synchronization on the removal... I realized this after we decided to move to the new design using flush(), so I did not submit follow up patch. On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 729-731 https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line729 Should we synchronize on the removal as well? Please see above. On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 580-582 https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line580 It's kind of weird that we only catch OOME here. OOME can be thrown in other places as well. We used to catch all the throwables and exit, but later on we thought it might be better to just exit on fatal exception and ignore the other exceptions to let offset commit thread move on. So we fail on one commit but the next commit might succeed. On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 81-92 https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line81 Isn't NumUnackedMessages the same as UnackedOffsetListSize since they are always modified together? Good point! We probably should just use the UnackedOffsetListSize. It is more clear. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review74677 --- On Dec. 24, 2014, 12:44 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 24, 2014, 12:44 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review74677 --- Sorry for the late review. A few more comments below. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment121347 Isn't NumUnackedMessages the same as UnackedOffsetListSize since they are always modified together? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment121348 It's kind of weird that we only catch OOME here. OOME can be thrown in other places as well. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment121349 I don't quite follow the comment on synchronization. Once producer.send is called, the callback can be called anytime. In the callback, we will remove the offset from unackedOffsets. However, the removal is not synchronized on unackedOffsets. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment121350 Should we synchronize on the removal as well? - Jun Rao On Dec. 24, 2014, 12:44 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 24, 2014, 12:44 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Incorporated Joel's comments set acks=-1 if --no.data.loss is specified. Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65874 --- Looks great. Just a few more minor comments + a question that you may have missed from the previous round. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment109127 I now remember what the reasoning behind this was. We originally decided that during rebalances, offset commits _need to_ succeed to reduce duplicates. i.e., retry indefinitely if there are failures in offset commits while rebalancing. We did not want it to hold up shutdown though. This is why we reduced retriesRemaining only if not shutting down. However, in retrospect I think this change is better. i.e., retry always up to retry count. If a user wishes to reduce the probability of duplicates just bump up offset commit retry count. Do you agree? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109156 Use valuesIterator instead of values - the reason is that values materializes, but the iterator will not; map over the iterator will give another iterator. So I'm pretty sure with that sum is computed without materializing an entire collection of sizes. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109129 As mentioned in the previous RB: do we need this given that it should be almost equivalent to the producer's dropped messages metric? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109130 Configure core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109133 Repeating question from last round: If block on buffer exhaustion is not turned on, do we still want to shut down the mirror maker? i.e., if the user really wants zero data loss they would set that to true right? If it is set to false and the MM exits what purpose does it serve? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109136 offsets core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109135 Sorry I wasn't clear on this. I meant we should probably catch all throwables and only exit if OOME. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109137 Capital U for %s to %d (generally limit to alphanumeric symbols comma colon and dash for easier greps) core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109138 info(Ignoring interrupt while waiting.) core/src/main/scala/kafka/utils/DoublyLinkedList.scala https://reviews.apache.org/r/25995/#comment109143 This should ideally be nested static class of DoublyLinkedList and named Node core/src/main/scala/kafka/utils/DoublyLinkedList.scala https://reviews.apache.org/r/25995/#comment109165 Add @threadsafe annotation above core/src/main/scala/kafka/utils/DoublyLinkedList.scala https://reviews.apache.org/r/25995/#comment109162 private core/src/main/scala/kafka/utils/DoublyLinkedList.scala https://reviews.apache.org/r/25995/#comment109163 private core/src/main/scala/kafka/utils/DoublyLinkedList.scala https://reviews.apache.org/r/25995/#comment109164 private core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala https://reviews.apache.org/r/25995/#comment109139 Can you move this to UtilsTest? core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala https://reviews.apache.org/r/25995/#comment109147 Should probably also do a removal from a single-entry list. - Joel Koshy On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:07 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614 https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614 Should this be fatal? i.e., fatal is normally used before exiting (abnormally). WARN would be more suitable. I don't think it makes sense to not advance the offset here especially if you will still keep sending messages. I think you need to remove it from the unacked offset list. E.g., you may configure your mirror maker producer to only few retries (in which case you are okay with data loss). In this scenario you should just let the error go and allow the mirror maker to proceed normally. If someone wants zero data loss the MM should be configured with required acks -1 and infinite retries. Maybe I'm misunderstanding what zero data loss really means - can you clarify? (Especially if someone configures the producer with acks (say) one and limited retries) Jiangjie Qin wrote: That makes sense. So I've changed the code to work in the following way: 1. If retries is set to infinite, the producer will keep retrying and the entire pipeline will finally be blocked. (This is strict data-loss free.) 2. If retries are not set to infinite, after the retries are exhausted, it will remove the offset from unacked list and record it as a skippedUnackedMessage, which is an exposed metric. Joel Koshy wrote: For (1) will the pipeline be blocked though? i.e., the consumer will continue to send messages to the data channel for all partitions. Also, we will be susceptible to reordering for the partition that is being retried even if maxInFlightRequests is set to one. (check wiki) It would be useful to very clearly outline the guarantees that the mirror maker provides and under what configurations. e.g., zero data loss, whether there will be duplicates, whether reordering is possible. Also, it may make sense to add a --lossless flag to the mirror maker that automatically ensures the configs are set correctly: * acks to -1 (or all) * retries to infinity * use new producer * block on buffer exhaustion * consumer autocommit off * anything else that may be needed This makes it much easier to document and use. What do you think? Jiangjie Qin wrote: The --lossless option is areally good suggestion. I'll add that and document it. For 1), Yes the pipeline will finally be blocked. Because one of the datachannle queue will eventually full, consumer thread will block on queue.offer(), so the ZookeeperConsumerConnector data chunk queue will not move anymore, then fetcher thread will block on putting data chunk into the data chunk queue. Eventually the entire pipeline will be blocked. I don't quite get why we could still have reodering if MaxInflightRequest is set to 1. I understand that when a batch is retried, it is possible that the previous try actually succeeded, so the batch is duplicated as a whole in broker, there might be some reordering cross batches, but within the batch the order should be preserved. Or do you mean the broker could ack only some of the messages in a batch which could cause the batch to be resend thus the messages in the same batch ends up out of order? By reordering I mean: For 2 messages M1 with offset OS1 and message M2 with offset OS2, assuming they are going to the same partition, if OS2 OS1, then the time M2 acked by target broker can only be after M1 acked by broker. Anyway, current offset commit solution does not rely on the in-order delivery anymore. But I think the problem still matters for some other use cases. So I would like to fully understand it. Sorry - you are right. There won't be reordering with pipelining turned off. And I agree with the point about the pipeline getting blocked (i.e., eventually). - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65477 --- On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:07 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 394 https://reviews.apache.org/r/25995/diff/18/?file=797121#file797121line394 if isAutoCommit is true then we will not retry anyway. I think this condition can be removed altogether. i.e., if we are shutting down, then we should probably allow committing offsets up to retryCount. I don't recollect why this was written this way, but I think retrying up to the configured retry count is reasonable on shutdown. Do you agree? Agree, so we set max retries based on isAutoCommit value and only use retry count to control how many times we actually retry. It's clearer. On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 85 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line85 NumUnackedOffsets is a weird metric especially with the presence of NumUnackedMessages. Can you think of a better name? My attempt would be NumPendingSourceOffsets - but I don't like that either. How about UnackedOffsetListsSize? On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line90 Add comment on what this metric is. Actually do we need this since this will be covered by the producer's dropped metric? As above, this is also a weird mbean to see. Not sure if we can come up with a better name. I added comments to this metric. I'm kind of relactant from using producer's dropped metrics. The reason is that the intention of this metric is diffrent from dropped messages, although they have some connection. Also, we could potentially have many producers, it would be better if we have a single number instead of having to go through multiple metrics. What do you think? On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 198 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line198 Can you move this to the if (useNewProducer) block further down (line 230) This block is here because the consumer property has to be set before the consumer instantiation. And offset commit thread could only starts after consumer connector is instantiated. It also seems better for offset commit thread starts after producer thread starts. That's why we have those spreaded if statement... On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 295 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line295 I think the final commitOffsets (on shutdown) should be a reliable commit - i.e., retry up to configured retries if there are commit errors. Yes, it will, isAutoCommit is set to false. This commitOffsets is defined in mirror maker wraps the ZooKeeperConsumerConnector.commitOffsets, where isAutoCommit == false. Or do you mean something else? On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 449 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line449 If block on buffer exhaustion is not turned on, do we still want to shut down the mirror maker? i.e., if the user really wants zero data loss they would set that to true right? If it is set to false and the MM exits what purpose does it serve? I kind of think that we should let user know that there is something wrong in the producer once it occurs. For users not care about zero data loss, they probably still want to at least have a working mirror maker. If we just drop the message and let producer move on, potentially we can have a running mirror maker that only drops messages. In that case, it's probably better to let the mirror maker die to indicate something wrong happened. So I'm thinking exits on BufferExhaustedException is more from normal operating point of view instead of zero data loss point of view. On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 560 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line560 The commit thread has not exited. i.e., this should just be Shutting down mirror maker due to error when committing offsets. I think OOME is the only exception we should really shutdown right? i.e., we should probably let everything else go right? Agreed. On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 720 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line720 What does node validation skipped mean? I meant something like whether the node is really in the list... But it probably doesn't matter in this case. I'll just remove it. - Jiangjie
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90 https://reviews.apache.org/r/25995/diff/19/?file=799170#file799170line90 Use valuesIterator instead of values - the reason is that values materializes, but the iterator will not; map over the iterator will give another iterator. So I'm pretty sure with that sum is computed without materializing an entire collection of sizes. There seems no valueIterator for Pool... Maybe def value = unackedOffsetsMap.iterator.map(unackedOffsets = unackedOffsets._2.size).sum ? On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 96 https://reviews.apache.org/r/25995/diff/19/?file=799170#file799170line96 As mentioned in the previous RB: do we need this given that it should be almost equivalent to the producer's dropped messages metric? Sorry, I just found that I forgot to pubish the comments to previous question. I just published it and pastes the comments here: I added comments to this metric. I'm kind of relactant from using producer's dropped metrics. The reason is that the intention of this metric is diffrent from dropped messages, although they have some connection. Also, we could potentially have many producers, it would be better if we have a single number instead of having to go through multiple metrics. What do you think? On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 395 https://reviews.apache.org/r/25995/diff/19/?file=799168#file799168line395 I now remember what the reasoning behind this was. We originally decided that during rebalances, offset commits _need to_ succeed to reduce duplicates. i.e., retry indefinitely if there are failures in offset commits while rebalancing. We did not want it to hold up shutdown though. This is why we reduced retriesRemaining only if not shutting down. However, in retrospect I think this change is better. i.e., retry always up to retry count. If a user wishes to reduce the probability of duplicates just bump up offset commit retry count. Do you agree? Agreed. Thanks for elaborating on this. On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 474 https://reviews.apache.org/r/25995/diff/19/?file=799170#file799170line474 Repeating question from last round: If block on buffer exhaustion is not turned on, do we still want to shut down the mirror maker? i.e., if the user really wants zero data loss they would set that to true right? If it is set to false and the MM exits what purpose does it serve? Copy paste the comments from previous review round. I kind of think that we should let user know that there is something wrong in the producer once it occurs. For users not care about zero data loss, they probably still want to at least have a working mirror maker. If we just drop the message and let producer move on, potentially we can have a running mirror maker that only drops messages. In that case, it's probably better to let the mirror maker die to indicate something wrong happened. So I'm thinking exits on BufferExhaustedException is more from normal operating point of view instead of zero data loss point of view. On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/DoublyLinkedList.scala, line 25 https://reviews.apache.org/r/25995/diff/19/?file=799171#file799171line25 This should ideally be nested static class of DoublyLinkedList and named Node But I needs to instantiate the node outside of the list. I seems not able to access the Node class if the class is nested. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65874 --- On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:07 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 85 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line85 NumUnackedOffsets is a weird metric especially with the presence of NumUnackedMessages. Can you think of a better name? My attempt would be NumPendingSourceOffsets - but I don't like that either. Jiangjie Qin wrote: How about UnackedOffsetListsSize? Yes that sounds better. On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 198 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line198 Can you move this to the if (useNewProducer) block further down (line 230) Jiangjie Qin wrote: This block is here because the consumer property has to be set before the consumer instantiation. And offset commit thread could only starts after consumer connector is instantiated. It also seems better for offset commit thread starts after producer thread starts. That's why we have those spreaded if statement... I thought this was before the instantiation, but maybe I misread. Either way, I think your most recent patch looks fine in this regards with the no.data.loss option being inspected when instantiating the consumer and producer separately. On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 295 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line295 I think the final commitOffsets (on shutdown) should be a reliable commit - i.e., retry up to configured retries if there are commit errors. Jiangjie Qin wrote: Yes, it will, isAutoCommit is set to false. This commitOffsets is defined in mirror maker wraps the ZooKeeperConsumerConnector.commitOffsets, where isAutoCommit == false. Or do you mean something else? Yes you are right. I missed that it is a wrapper. On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 449 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line449 If block on buffer exhaustion is not turned on, do we still want to shut down the mirror maker? i.e., if the user really wants zero data loss they would set that to true right? If it is set to false and the MM exits what purpose does it serve? Jiangjie Qin wrote: I kind of think that we should let user know that there is something wrong in the producer once it occurs. For users not care about zero data loss, they probably still want to at least have a working mirror maker. If we just drop the message and let producer move on, potentially we can have a running mirror maker that only drops messages. In that case, it's probably better to let the mirror maker die to indicate something wrong happened. So I'm thinking exits on BufferExhaustedException is more from normal operating point of view instead of zero data loss point of view. Ok - we can leave it as is and revisit if necessary. If the MM exits on buffer exhaustion then the user will know and will bump up memory. OTOH some users may be okay with dropping messages at peak traffic. I'm not sure which is better. On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90 https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line90 Add comment on what this metric is. Actually do we need this since this will be covered by the producer's dropped metric? As above, this is also a weird mbean to see. Not sure if we can come up with a better name. Jiangjie Qin wrote: I added comments to this metric. I'm kind of relactant from using producer's dropped metrics. The reason is that the intention of this metric is diffrent from dropped messages, although they have some connection. Also, we could potentially have many producers, it would be better if we have a single number instead of having to go through multiple metrics. What do you think? Ok sounds good. - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65759 --- On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:07 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90 https://reviews.apache.org/r/25995/diff/19/?file=799170#file799170line90 Use valuesIterator instead of values - the reason is that values materializes, but the iterator will not; map over the iterator will give another iterator. So I'm pretty sure with that sum is computed without materializing an entire collection of sizes. Jiangjie Qin wrote: There seems no valueIterator for Pool... Maybe def value = unackedOffsetsMap.iterator.map(unackedOffsets = unackedOffsets._2.size).sum ? Ah yes - forgot that this is a pool. Your edit should work, although can you use case inside the map? i.e., `...iterator.map { case(topicPartition, unackedOffsets) = ...` On Dec. 23, 2014, 8:34 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/DoublyLinkedList.scala, line 25 https://reviews.apache.org/r/25995/diff/19/?file=799171#file799171line25 This should ideally be nested static class of DoublyLinkedList and named Node Jiangjie Qin wrote: But I needs to instantiate the node outside of the list. I seems not able to access the Node class if the class is nested. Yes you are right - I was in Java world for a minute. You can accomplish something similar by defining an object DoublyLinkedList and defining a Node class inside that, but that is almost equivalent to what you have so ignore my comment here. - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65874 --- On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:07 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:04 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Incorporated Joel's comments Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65981 --- Ship it! Minor observation below. Also, mirror maker has become a bloated file. I think we can do some refactoring and split it up - will need a new package for that. I can file a jira. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109286 I think this might be confusing. i.e., if we have an explicit argument for no data loss then it should do everything needed to ensure that. I'll proceed with checking in this as is but we should discuss this further. - Joel Koshy On Dec. 23, 2014, 3:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:04 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Incorporated Joel's comments Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 23, 2014, 11:53 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 58 https://reviews.apache.org/r/25995/diff/20/?file=799494#file799494line58 I think this might be confusing. i.e., if we have an explicit argument for no data loss then it should do everything needed to ensure that. I'll proceed with checking in this as is but we should discuss this further. Or can we maybe do this: If the user did not specify acks in there own settings, we use ack=all, otherwise, use whatever user has provided. In this case, if user want to override acks to achieve performance, they can still do that. What do you think? - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65981 --- On Dec. 23, 2014, 3:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:04 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Incorporated Joel's comments Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 23, 2014, 11:53 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 58 https://reviews.apache.org/r/25995/diff/20/?file=799494#file799494line58 I think this might be confusing. i.e., if we have an explicit argument for no data loss then it should do everything needed to ensure that. I'll proceed with checking in this as is but we should discuss this further. Jiangjie Qin wrote: Or can we maybe do this: If the user did not specify acks in there own settings, we use ack=all, otherwise, use whatever user has provided. In this case, if user want to override acks to achieve performance, they can still do that. What do you think? Thought about it again. You are right, if user does not want to use --no.data.loss, they can specify the settings by themselves. I'll submit another patch overriding the acks=all if the flag is set. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65981 --- On Dec. 23, 2014, 3:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:04 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Incorporated Joel's comments Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 23, 2014, 11:53 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 58 https://reviews.apache.org/r/25995/diff/20/?file=799494#file799494line58 I think this might be confusing. i.e., if we have an explicit argument for no data loss then it should do everything needed to ensure that. I'll proceed with checking in this as is but we should discuss this further. Jiangjie Qin wrote: Or can we maybe do this: If the user did not specify acks in there own settings, we use ack=all, otherwise, use whatever user has provided. In this case, if user want to override acks to achieve performance, they can still do that. What do you think? Jiangjie Qin wrote: Thought about it again. You are right, if user does not want to use --no.data.loss, they can specify the settings by themselves. I'll submit another patch overriding the acks=all if the flag is set. ok - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65981 --- On Dec. 23, 2014, 3:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 23, 2014, 3:04 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Incorporated Joel's comments Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 24, 2014, 12:44 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Incorporated Joel's comments set acks=-1 if --no.data.loss is specified. Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65993 --- Ship it! Ship It! - Joel Koshy On Dec. 24, 2014, 12:44 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 24, 2014, 12:44 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Addressed Joel's comments. Incorporated Joel's comments set acks=-1 if --no.data.loss is specified. Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65759 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment109001 if isAutoCommit is true then we will not retry anyway. I think this condition can be removed altogether. i.e., if we are shutting down, then we should probably allow committing offsets up to retryCount. I don't recollect why this was written this way, but I think retrying up to the configured retry count is reasonable on shutdown. Do you agree? core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment109002 Since this is a Java compatility wrapper you need to use java.util.Map and convert that to a scala map. Alternatively we can remove this altogether since it is only used semi-internally (by the mirror maker). However, I think it would be good to add this here with the fix. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109003 val scheduler = new KafkaScheduler and later on scheduler.startup (since the scheduler does nothing until it is started up) core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109004 NumUnackedMessages core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109006 NumUnackedOffsets is a weird metric especially with the presence of NumUnackedMessages. Can you think of a better name? My attempt would be NumPendingSourceOffsets - but I don't like that either. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109005 `unackedOffsetMap.valuesIterator.map(unackedOffsets = unackedOffsets.size).sum` core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109015 Add comment on what this metric is. Actually do we need this since this will be covered by the producer's dropped metric? As above, this is also a weird mbean to see. Not sure if we can come up with a better name. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109016 Can you move this to the if (useNewProducer) block further down (line 230) core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109017 I think the final commitOffsets (on shutdown) should be a reliable commit - i.e., retry up to configured retries if there are commit errors. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109029 If block on buffer exhaustion is not turned on, do we still want to shut down the mirror maker? i.e., if the user really wants zero data loss they would set that to true right? If it is set to false and the MM exits what purpose does it serve? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109018 Capital C core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109019 Committing offsets: (also, should we just do this trace message in consumer connector instead? Right now it is not there, but I think it should) core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109020 The commit thread has not exited. i.e., this should just be Shutting down mirror maker due to error when committing offsets. I think OOME is the only exception we should really shutdown right? i.e., we should probably let everything else go right? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109021 Line 522: Minor nit - how about naming this just unackedOffset (instead of offsetNode)? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109023 Updating offset to commit for %s to %d. (Note that [%s] will give you [[topic, partition]]) core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109024 Should probably also catch interrupted exception core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109025 if (customRebalanceListener.isDefined) core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109027 Should (ideally) be volatile since it is reported by the gauge. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment109026 What does node validation skipped mean? - Joel Koshy On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote:
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489 Why not use KafkaScheduler for the offset commit task? Jiangjie Qin wrote: Haven't thought that before... But it looks that we need to do some more handling when something wrong happen in the offset commit threads. The KafkaScheduler code seems not do so. Joel Koshy wrote: So you can make the task itself catch throwables. So it would look something like this: scheduler.schedule(mirrorMakerOffsetsCommiter, commitTask, ...) And in commitTask: try { commitOffsets() } catch { case t: Throwable = // handle } That said, I don't think connector.commitOffsets will throw anything - since we catch all throwables there. The only additional detail is that after you shutdown the scheduler you will need to call commitOffsets() manually one last time. Jiangjie Qin wrote: I changed the code to use scheduler, it seems that the try catch block only handles the kafka based offset commit and it did not include ensureOffsetsManager connected. Also, theoretically OOM could be thrown when create a super big offsetmap, so I kept the catch block to make mirror maker exit in that case. Not sure what you mean by only handles the Kafka based offset commit and it did not include ensureOffsetsManagerConnected - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65306 --- On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 19, 2014, 7:41 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614 https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614 Should this be fatal? i.e., fatal is normally used before exiting (abnormally). WARN would be more suitable. I don't think it makes sense to not advance the offset here especially if you will still keep sending messages. I think you need to remove it from the unacked offset list. E.g., you may configure your mirror maker producer to only few retries (in which case you are okay with data loss). In this scenario you should just let the error go and allow the mirror maker to proceed normally. If someone wants zero data loss the MM should be configured with required acks -1 and infinite retries. Maybe I'm misunderstanding what zero data loss really means - can you clarify? (Especially if someone configures the producer with acks (say) one and limited retries) Jiangjie Qin wrote: That makes sense. So I've changed the code to work in the following way: 1. If retries is set to infinite, the producer will keep retrying and the entire pipeline will finally be blocked. (This is strict data-loss free.) 2. If retries are not set to infinite, after the retries are exhausted, it will remove the offset from unacked list and record it as a skippedUnackedMessage, which is an exposed metric. For (1) will the pipeline be blocked though? i.e., the consumer will continue to send messages to the data channel for all partitions. Also, we will be susceptible to reordering for the partition that is being retried even if maxInFlightRequests is set to one. (check wiki) It would be useful to very clearly outline the guarantees that the mirror maker provides and under what configurations. e.g., zero data loss, whether there will be duplicates, whether reordering is possible. Also, it may make sense to add a --lossless flag to the mirror maker that automatically ensures the configs are set correctly: * acks to -1 (or all) * retries to infinity * use new producer * block on buffer exhaustion * consumer autocommit off * anything else that may be needed This makes it much easier to document and use. What do you think? On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 668 https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line668 Should we make this a generic DoublyLinkedList data structure in utils or some other suitable place and unit test it as well? Jiangjie Qin wrote: I'm not sure if this is generic enough to put it into utils. This raw linked list seems only serve the purpose of removing/inserting node in the middle in O(1), which cannot be achieved in java linkedlist. Maybe we can keep it here now. And if later on there are some other use cases, we can refactor the code to create a raw LinkedList in utils and use that one. What do you think? Sort of agree, but only to some degree. The main issue is that you cannot unit test this - unless we come up with mirror maker unit tests that have 100 percent code coverage of this utility. That's why it's generally a sign that the utility needs to be moved out so that it can be tested thoroughly. http://stackoverflow.com/questions/3353318/how-do-i-test-local-inner-class-methods-in-java - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65477 --- On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 19, 2014, 7:41 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts:
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 338 https://reviews.apache.org/r/25995/diff/13/?file=793041#file793041line338 Can we make this a reliable commit - i.e., with retries up to the configured retry count? The policy is retry on commit errors during rebalance or shutdown, no need to retry on commit errors during auto-commits. So for e.g., if a mirror maker rebalances and there is simultaneously offset manager movement we would need to retry the commit. This is the motivation for the isAutoCommit flag - however, there seems to be a bug right now which maybe you can fix. i.e., if this is not an auto-commit then set retries to configured retries else no retries. Jiangjie Qin wrote: Changed the code based you your suggestion. My original thinking is that in mirror maker one commit failure actually does not matter too much because next commit will succeed if the failure is due to offset topic leader migration, etc. But for a more general purpose, it probably should retry if it is not an auto commit. I was thinking more about the shutdown and rebalance cases. We ideally want the commits to be reliable for those cases. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 192 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line192 Why do you need a dummy param? Jiangjie Qin wrote: Because the Utils.createObjct needs a args parameter and if we pass in a null it will give an NPE... I've changed the code in Utils to allow us to pass in a null which use the no arg constructor. See comment in latest RB. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489 Why not use KafkaScheduler for the offset commit task? Jiangjie Qin wrote: Haven't thought that before... But it looks that we need to do some more handling when something wrong happen in the offset commit threads. The KafkaScheduler code seems not do so. So you can make the task itself catch throwables. So it would look something like this: scheduler.schedule(mirrorMakerOffsetsCommiter, commitTask, ...) And in commitTask: try { commitOffsets() } catch { case t: Throwable = // handle } That said, I don't think connector.commitOffsets will throw anything - since we catch all throwables there. The only additional detail is that after you shutdown the scheduler you will need to call commitOffsets() manually one last time. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line614 There might be a small memory leak here: if there was an error though (in the branch above) it seems no one removes the offset from the list. Jiangjie Qin wrote: Yes, that is a memory leak, but in this case we should not commit the offset of the message that was not sent successfully either. If any exception occurs then the offsets will not advance anymore. We probably should have an alert on the mirror consumer lags. Hmm.. not sure if I'm on the same page here. See comments on new RB. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 705 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line705 You need to also set tail.next = null (or None) Jiangjie Qin wrote: tail.next = null will be handled in the previous if...else... the old tail.prev.next will become new tail.next, which will be null. Makes sense. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 666 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line666 Should we expose a size() method - i.e., increment on add and decrement on remove. We can aggregate the size of all the offset lists outside and emit a gauge. That will give us some assurance that there are no forgotten offsets. Re: the potential leak mentioned above. In fact, I'm a bit nervous about correctness since this is a custom implementation of a semi-non-trivial data structure. We should probably even assert that it is empty when numMessageUnacked goes to zero as part of the rebalance. Ideally, these custom implementations need a full-fledged unit test. Jiangjie Qin wrote: That's a good point, we probably need a metric to see if we have some stuck offsets. But those stuck offsets should also not be committed anyways. We need to be alerted on that situation once it happens. Maybe add an assertion on the exception block where stuck
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65477 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment108686 This should be like the previous code - i.e., 1 + ... E.g., if config.offsetsCommitMaxRetries is one, then we can have two attempts. In this version at most one attempt will be made. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment108687 Actually we should fix this. If this is the final commit during shutting down (and NOT autocommit) then we need to retry on error up to retries remaining. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108691 Can you use a named parameter here? i.e., `valueFactory = Some(...` core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108690 Should this be fatal? i.e., fatal is normally used before exiting (abnormally). WARN would be more suitable. I don't think it makes sense to not advance the offset here especially if you will still keep sending messages. I think you need to remove it from the unacked offset list. E.g., you may configure your mirror maker producer to only few retries (in which case you are okay with data loss). In this scenario you should just let the error go and allow the mirror maker to proceed normally. If someone wants zero data loss the MM should be configured with required acks -1 and infinite retries. Maybe I'm misunderstanding what zero data loss really means - can you clarify? (Especially if someone configures the producer with acks (say) one and limited retries) core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108689 Should we make this a generic DoublyLinkedList data structure in utils or some other suitable place and unit test it as well? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108688 Does not seem to be used. (but it probably should be used) core/src/main/scala/kafka/utils/Utils.scala https://reviews.apache.org/r/25995/#comment108692 I don't think this is necessary right? i.e., args.map won't throw an NPE if you don't provide any additional arguments. scala def f(args: Int*) {println(args.size)} f: (args: Int*)Unit scala f(1,2) 2 scala f() 0 - Joel Koshy On Dec. 17, 2014, 8:29 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 17, 2014, 8:29 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 19, 2014, 2:48 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614 https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614 Should this be fatal? i.e., fatal is normally used before exiting (abnormally). WARN would be more suitable. I don't think it makes sense to not advance the offset here especially if you will still keep sending messages. I think you need to remove it from the unacked offset list. E.g., you may configure your mirror maker producer to only few retries (in which case you are okay with data loss). In this scenario you should just let the error go and allow the mirror maker to proceed normally. If someone wants zero data loss the MM should be configured with required acks -1 and infinite retries. Maybe I'm misunderstanding what zero data loss really means - can you clarify? (Especially if someone configures the producer with acks (say) one and limited retries) That makes sense. So I've changed the code to work in the following way: 1. If retries is set to infinite, the producer will keep retrying and the entire pipeline will finally be blocked. (This is strict data-loss free.) 2. If retries are not set to infinite, after the retries are exhausted, it will remove the offset from unacked list and record it as a skippedUnackedMessage, which is an exposed metric. On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 668 https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line668 Should we make this a generic DoublyLinkedList data structure in utils or some other suitable place and unit test it as well? I'm not sure if this is generic enough to put it into utils. This raw linked list seems only serve the purpose of removing/inserting node in the middle in O(1), which cannot be achieved in java linkedlist. Maybe we can keep it here now. And if later on there are some other use cases, we can refactor the code to create a raw LinkedList in utils and use that one. What do you think? On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote: core/src/main/scala/kafka/utils/Utils.scala, line 441 https://reviews.apache.org/r/25995/diff/14/?file=795053#file795053line441 I don't think this is necessary right? i.e., args.map won't throw an NPE if you don't provide any additional arguments. scala def f(args: Int*) {println(args.size)} f: (args: Int*)Unit scala f(1,2) 2 scala f() 0 Yes, it seems to be working. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65477 --- On Dec. 19, 2014, 2:48 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 19, 2014, 2:48 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 19, 2014, 6:17 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 19, 2014, 6:53 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 19, 2014, 7:41 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489 Why not use KafkaScheduler for the offset commit task? Jiangjie Qin wrote: Haven't thought that before... But it looks that we need to do some more handling when something wrong happen in the offset commit threads. The KafkaScheduler code seems not do so. Joel Koshy wrote: So you can make the task itself catch throwables. So it would look something like this: scheduler.schedule(mirrorMakerOffsetsCommiter, commitTask, ...) And in commitTask: try { commitOffsets() } catch { case t: Throwable = // handle } That said, I don't think connector.commitOffsets will throw anything - since we catch all throwables there. The only additional detail is that after you shutdown the scheduler you will need to call commitOffsets() manually one last time. I changed the code to use scheduler, it seems that the try catch block only handles the kafka based offset commit and it did not include ensureOffsetsManager connected. Also, theoretically OOM could be thrown when create a super big offsetmap, so I kept the catch block to make mirror maker exit in that case. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65306 --- On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 19, 2014, 7:41 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Incorporated Joel's comments Incorporated Joel's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Joel's comments Fix a bug in metric. Missed some change in the prvevious patch submission, submit patch again. change offset commit thread to use scheduler. Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review65306 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment108396 Can we make this a reliable commit - i.e., with retries up to the configured retry count? The policy is retry on commit errors during rebalance or shutdown, no need to retry on commit errors during auto-commits. So for e.g., if a mirror maker rebalances and there is simultaneously offset manager movement we would need to retry the commit. This is the motivation for the isAutoCommit flag - however, there seems to be a bug right now which maybe you can fix. i.e., if this is not an auto-commit then set retries to configured retries else no retries. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108400 Let us inline the valueFactory. i.e., `new Pool[TopicAndPartition, OffsetList](valueFactory = Some((k: TopicAndPartition) = new OffsetList))` core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108417 numUnackedMessages core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108421 Can you update the comment? I don't see any rebalance latch core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108410 This is a slightly misleading comment. i.e., we do allow a single message that may exceed this right? It is hard to document succinctly though. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108403 Why do you need a dummy param? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108404 This is a matter of taste, but personally I prefer using Options in these scenarios. i.e., instead of null use Null. It is less error prone when writing code since you cannot invoke directly without doing a get first so it forces you to remember to check whether it is defined or not. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108405 We should probably just say periodically commits consumed offsets. since it could commit to zookeeper as well. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108406 We should probably just say periodically commits consumed offsets. since it could commit to zookeeper as well. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108409 You _must_ also override the auto.commit.enable to false in the consumer config. The default is true. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108407 isCleanShutdown is not defined core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108408 should only be.. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108411 private core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108414 Do we need this counter? i.e., there is already a commit meter in the consumer connector. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108412 Why not use KafkaScheduler for the offset commit task? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108413 commitOffsets core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108415 For clarity and consistency (with MMRecord) can we call it sourceTopicPartition; sourceOffset? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108423 sourceTopicPartition... core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108416 // synchronize to ensure that addOffset precedes removeOffset core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108419 There might be a small memory leak here: if there was an error though (in the branch above) it seems no one removes the offset from the list. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108420 one message (not 1 message). Rule in writing: 0-9 should be spelled, = 10 can be numeric. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment108422 Shouldn't this be further up? Specifically, in the synchronized block after the while loop. Also, this is not named very suitably
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 17, 2014, 8:29 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Addressed Joel's comments. Addressed Joel's comments. Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 77d951d13b8d8ad00af40257fe51623cc2caa61a core/src/main/scala/kafka/utils/Utils.scala 738c1af9ef5de16fdf5130daab69757a14c48b5c Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 338 https://reviews.apache.org/r/25995/diff/13/?file=793041#file793041line338 Can we make this a reliable commit - i.e., with retries up to the configured retry count? The policy is retry on commit errors during rebalance or shutdown, no need to retry on commit errors during auto-commits. So for e.g., if a mirror maker rebalances and there is simultaneously offset manager movement we would need to retry the commit. This is the motivation for the isAutoCommit flag - however, there seems to be a bug right now which maybe you can fix. i.e., if this is not an auto-commit then set retries to configured retries else no retries. Changed the code based you your suggestion. My original thinking is that in mirror maker one commit failure actually does not matter too much because next commit will succeed if the failure is due to offset topic leader migration, etc. But for a more general purpose, it probably should retry if it is not an auto commit. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 124 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line124 This is a slightly misleading comment. i.e., we do allow a single message that may exceed this right? It is hard to document succinctly though. I think the description seems clear enough for mirror maker. We do have comments in ByteBoundedBlockingQueue about the one more message exceeding the capacity. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 192 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line192 Why do you need a dummy param? Because the Utils.createObjct needs a args parameter and if we pass in a null it will give an NPE... I've changed the code in Utils to allow us to pass in a null which use the no arg constructor. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 477 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line477 Do we need this counter? i.e., there is already a commit meter in the consumer connector. Right, maybe we don't need it. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489 Why not use KafkaScheduler for the offset commit task? Haven't thought that before... But it looks that we need to do some more handling when something wrong happen in the offset commit threads. The KafkaScheduler code seems not do so. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line614 There might be a small memory leak here: if there was an error though (in the branch above) it seems no one removes the offset from the list. Yes, that is a memory leak, but in this case we should not commit the offset of the message that was not sent successfully either. If any exception occurs then the offsets will not advance anymore. We probably should have an alert on the mirror consumer lags. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 658 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line658 Note earlier comment on null vs None As LinkedList seems to be a quite standard data structure, keeping it use null probably makes the code cleaner. On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 666 https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line666 Should we expose a size() method - i.e., increment on add and decrement on remove. We can aggregate the size of all the offset lists outside and emit a gauge. That will give us some assurance that there are no forgotten offsets. Re: the potential leak mentioned above. In fact, I'm a bit nervous about correctness since this is a custom implementation of a semi-non-trivial data structure. We should probably even assert that it is empty when numMessageUnacked goes to zero as part of the rebalance. Ideally, these custom implementations need a full-fledged unit test. That's a good point, we probably need a metric to see if we have some stuck offsets. But those stuck offsets should also not be committed anyways. We need to be alerted on that situation once it happens. Maybe add an assertion on the exception block where stuck offset occurs will be better? On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 16, 2014, 4:03 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 77d951d13b8d8ad00af40257fe51623cc2caa61a Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review64505 --- Haven't looked at your new patch yet. It seems that the following tests start to fail after your commit. They seem to pass if run individually, but always fail if you run all unit tests. Could you take a look and at least understand the cause? kafka.server.ServerShutdownTest testCleanShutdown FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdown(ServerShutdownTest.scala:101) kafka.server.ServerShutdownTest testConsecutiveShutdown PASSED kafka.server.ServerShutdownTest testCleanShutdownWithDeleteTopicEnabled FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdownWithDeleteTopicEnabled(ServerShutdownTest.scala:114) kafka.server.ServerShutdownTest testCleanShutdownAfterFailedStartup FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartup(ServerShutdownTest.scala:141) - Jun Rao On Dec. 8, 2014, 9:36 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 8, 2014, 9:36 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 10, 2014, 5:09 a.m., Jun Rao wrote: Haven't looked at your new patch yet. It seems that the following tests start to fail after your commit. They seem to pass if run individually, but always fail if you run all unit tests. Could you take a look and at least understand the cause? kafka.server.ServerShutdownTest testCleanShutdown FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdown(ServerShutdownTest.scala:101) kafka.server.ServerShutdownTest testConsecutiveShutdown PASSED kafka.server.ServerShutdownTest testCleanShutdownWithDeleteTopicEnabled FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdownWithDeleteTopicEnabled(ServerShutdownTest.scala:114) kafka.server.ServerShutdownTest testCleanShutdownAfterFailedStartup FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartup(ServerShutdownTest.scala:141) KAFKA-1815 is opened for the failures. It turns out because I forgot to shutdown consumer in ZookeeperConsumerConnector in rebalance listener test. So it causes later on server shutdown test failure. Really sorry for this... I could not reproduce the issue on my desktop probably because it is fast. I actually found that I could reproduce it on my laptop. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review64505 --- On Dec. 8, 2014, 9:36 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 8, 2014, 9:36 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 8, 2014, 9:36 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala b06ff6000183b257005b5ac3ccc7ba8976f1ab8d Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530 https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529 Does that imply there is always an offset? Is that always true? I don't quite follow how the logic works. Since the offset for each target partition is updated independantly, I am not sure why you can rely on checking that those offsets are consecutive. Also, does this logic work when there is partitioning key? It would be useful to add some comments to describe why a two-level offset map is needed. Jiangjie Qin wrote: The iterator only iterates over the key that exits in the map, i.e. offset exists. It does seem confusing without explanation... Will it be clearer if the following comments are added? Following is the offset commit logic: We know that: 1. Messages from same source partition end up in the same data channel queue in order and will be sent by the same producer. 2. Messages sent by the same producer could go to different target partitions even if the messages are from the same source partition. 3. The order is only guaranteed for messages sent to the same target partition. That means a (SourceTopicPartition, TargetPartition) combination is needed. 4. For each (SourceTopicParitition, TargetPartition), keeping track of a single offset is sufficient, because if an offset is acked, all the offset smaller than that offset going to the same target partition must have been sent successfully (MaxInFlightRequest=1). That said, if we have multiple producers, after sorting all the last acked offsets of target partitions which are corresponding to the same source partition, we can commit the offsets from the smallest until the acked offset is no longer consecutive. (But we do need to set send retries to be infinite in producer config, otherwise this won't work. I'll add it to the comments.) Based on above logic, we could use Map(SourceTopicParitition, TargetPartition), offset to track the offset. But because the offset commit is based on source topic partitions, it is easier to have a MapSourceTopicPartition, MapTargetPatition, offset to find the offset to commit. That's why there is a 2-level map. The logic above does not rely on any message key, so it works in general for both keyed/non-keyed messages. The part that's not clear to me is until the acked offset is no longer consecutive. Suppose there are 2 target partitions. Offset 1,2,4,5 can be routed to partition 1 and offset 3 can be routed to partition 2. When all messages are sent, you will see partition 1 with offset 5 and partition 2 with offset 3. Since offset 3 and 5 are not consecutive, are you just going to commit offset 3? If so, you may be stuck in offset 3 forever if no new messages are sent to partition 2. Also, does this work with a log configured with compact? Such a log may not have consecutive offsets to begin with. On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 614-618 https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line614 This kind of coordination doesn't quite work. Suppose that we set inRebalance to true and entered the while loop. However, just before we call inRelanace.wait(), the producer could have finished sending all data and called inRebalance.notify(). Then we will be stuck in inRebalance.wait() forever since we missed the notification. One way to do that is to create a lock that protects the read/write of numMessageUnacked. Then we use a condition created from the lock to do the coordination. This way, both the wait/notification and the update/check of numMessageUnacked are protected by the same lock. Jiangjie Qin wrote: Yes, it is clearer to just use a single lock. I'll change the code. But the current code itself seems to work, please corret me if I'm wrong. In this case we essentially guarantees that [enable notify, numMessagesUnacked 0 then wait] is atomic to notify. So either notify is not enabled yet or it occurs after rebalance listener starts to wait. So the notify will not be missed. If rebalance listener grabs the inRebalance lock first and is in while loop then it won't release the inRebalance lock until it enters wait. Because producer have to grab inRebalance lock before calling inReblance.notify(), the producer will not be able to call inRebalance.notify() until the offset commit thread starts to wait. There are some more complicated sequences when producers call notify mutilple times, but as long as 1.[numMessageUnacked 0 then wait] is atomic to notify, and 2.[numMessageUnacked = 0] happens before notify notify should not be missed. Got it. I missed the part that we
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530 https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529 Does that imply there is always an offset? Is that always true? I don't quite follow how the logic works. Since the offset for each target partition is updated independantly, I am not sure why you can rely on checking that those offsets are consecutive. Also, does this logic work when there is partitioning key? It would be useful to add some comments to describe why a two-level offset map is needed. Jiangjie Qin wrote: The iterator only iterates over the key that exits in the map, i.e. offset exists. It does seem confusing without explanation... Will it be clearer if the following comments are added? Following is the offset commit logic: We know that: 1. Messages from same source partition end up in the same data channel queue in order and will be sent by the same producer. 2. Messages sent by the same producer could go to different target partitions even if the messages are from the same source partition. 3. The order is only guaranteed for messages sent to the same target partition. That means a (SourceTopicPartition, TargetPartition) combination is needed. 4. For each (SourceTopicParitition, TargetPartition), keeping track of a single offset is sufficient, because if an offset is acked, all the offset smaller than that offset going to the same target partition must have been sent successfully (MaxInFlightRequest=1). That said, if we have multiple producers, after sorting all the last acked offsets of target partitions which are corresponding to the same source partition, we can commit the offsets from the smallest until the acked offset is no longer consecutive. (But we do need to set send retries to be infinite in producer config, otherwise this won't work. I'll add it to the comments.) Based on above logic, we could use Map(SourceTopicParitition, TargetPartition), offset to track the offset. But because the offset commit is based on source topic partitions, it is easier to have a MapSourceTopicPartition, MapTargetPatition, offset to find the offset to commit. That's why there is a 2-level map. The logic above does not rely on any message key, so it works in general for both keyed/non-keyed messages. Jun Rao wrote: The part that's not clear to me is until the acked offset is no longer consecutive. Suppose there are 2 target partitions. Offset 1,2,4,5 can be routed to partition 1 and offset 3 can be routed to partition 2. When all messages are sent, you will see partition 1 with offset 5 and partition 2 with offset 3. Since offset 3 and 5 are not consecutive, are you just going to commit offset 3? If so, you may be stuck in offset 3 forever if no new messages are sent to partition 2. Also, does this work with a log configured with compact? Such a log may not have consecutive offsets to begin with. I see, you are right. In that case it won't work. How about the following algorithm: 1. We keep a MapSourceTopicPartition, (SetOffset, MaxOffsetInSet) 2. When a message is sent, producer put its offset into the map based on source topic partition, and update the MaxOffsetInSet if necessary 3. In the producer callback, remove the offset from the Setoffset if the message was sent successfully 4. When offset commit thread comes, it gets the smallest offset in the Setoffset and commit this one. If the offset is empty, it commits MaxOffsetInSet+1. This algorithm should be able to handle compacted partitions as well. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review64152 --- On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 7, 2014, 2:59 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530 https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529 Does that imply there is always an offset? Is that always true? I don't quite follow how the logic works. Since the offset for each target partition is updated independantly, I am not sure why you can rely on checking that those offsets are consecutive. Also, does this logic work when there is partitioning key? It would be useful to add some comments to describe why a two-level offset map is needed. Jiangjie Qin wrote: The iterator only iterates over the key that exits in the map, i.e. offset exists. It does seem confusing without explanation... Will it be clearer if the following comments are added? Following is the offset commit logic: We know that: 1. Messages from same source partition end up in the same data channel queue in order and will be sent by the same producer. 2. Messages sent by the same producer could go to different target partitions even if the messages are from the same source partition. 3. The order is only guaranteed for messages sent to the same target partition. That means a (SourceTopicPartition, TargetPartition) combination is needed. 4. For each (SourceTopicParitition, TargetPartition), keeping track of a single offset is sufficient, because if an offset is acked, all the offset smaller than that offset going to the same target partition must have been sent successfully (MaxInFlightRequest=1). That said, if we have multiple producers, after sorting all the last acked offsets of target partitions which are corresponding to the same source partition, we can commit the offsets from the smallest until the acked offset is no longer consecutive. (But we do need to set send retries to be infinite in producer config, otherwise this won't work. I'll add it to the comments.) Based on above logic, we could use Map(SourceTopicParitition, TargetPartition), offset to track the offset. But because the offset commit is based on source topic partitions, it is easier to have a MapSourceTopicPartition, MapTargetPatition, offset to find the offset to commit. That's why there is a 2-level map. The logic above does not rely on any message key, so it works in general for both keyed/non-keyed messages. Jun Rao wrote: The part that's not clear to me is until the acked offset is no longer consecutive. Suppose there are 2 target partitions. Offset 1,2,4,5 can be routed to partition 1 and offset 3 can be routed to partition 2. When all messages are sent, you will see partition 1 with offset 5 and partition 2 with offset 3. Since offset 3 and 5 are not consecutive, are you just going to commit offset 3? If so, you may be stuck in offset 3 forever if no new messages are sent to partition 2. Also, does this work with a log configured with compact? Such a log may not have consecutive offsets to begin with. Jiangjie Qin wrote: I see, you are right. In that case it won't work. How about the following algorithm: 1. We keep a MapSourceTopicPartition, (SetOffset, MaxOffsetInSet) 2. When a message is sent, producer put its offset into the map based on source topic partition, and update the MaxOffsetInSet if necessary 3. In the producer callback, remove the offset from the Setoffset if the message was sent successfully 4. When offset commit thread comes, it gets the smallest offset in the Setoffset and commit this one. If the offset is empty, it commits MaxOffsetInSet+1. This algorithm should be able to handle compacted partitions as well. Yes, but getting the smallest offset from a set can be expensive. I was thinking that we can put the offset of all outstanding messages in a linked list in source offset order. Every produced message will be taken out of the linked list. We also maintain a maxOffsetSeen. The commit offset will be the offset in the first link if the linked list is not empty. Otherwise, it will be maxOffsetSeen. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review64152 --- On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 7, 2014, 2:59 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description ---
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530 https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529 Does that imply there is always an offset? Is that always true? I don't quite follow how the logic works. Since the offset for each target partition is updated independantly, I am not sure why you can rely on checking that those offsets are consecutive. Also, does this logic work when there is partitioning key? It would be useful to add some comments to describe why a two-level offset map is needed. Jiangjie Qin wrote: The iterator only iterates over the key that exits in the map, i.e. offset exists. It does seem confusing without explanation... Will it be clearer if the following comments are added? Following is the offset commit logic: We know that: 1. Messages from same source partition end up in the same data channel queue in order and will be sent by the same producer. 2. Messages sent by the same producer could go to different target partitions even if the messages are from the same source partition. 3. The order is only guaranteed for messages sent to the same target partition. That means a (SourceTopicPartition, TargetPartition) combination is needed. 4. For each (SourceTopicParitition, TargetPartition), keeping track of a single offset is sufficient, because if an offset is acked, all the offset smaller than that offset going to the same target partition must have been sent successfully (MaxInFlightRequest=1). That said, if we have multiple producers, after sorting all the last acked offsets of target partitions which are corresponding to the same source partition, we can commit the offsets from the smallest until the acked offset is no longer consecutive. (But we do need to set send retries to be infinite in producer config, otherwise this won't work. I'll add it to the comments.) Based on above logic, we could use Map(SourceTopicParitition, TargetPartition), offset to track the offset. But because the offset commit is based on source topic partitions, it is easier to have a MapSourceTopicPartition, MapTargetPatition, offset to find the offset to commit. That's why there is a 2-level map. The logic above does not rely on any message key, so it works in general for both keyed/non-keyed messages. Jun Rao wrote: The part that's not clear to me is until the acked offset is no longer consecutive. Suppose there are 2 target partitions. Offset 1,2,4,5 can be routed to partition 1 and offset 3 can be routed to partition 2. When all messages are sent, you will see partition 1 with offset 5 and partition 2 with offset 3. Since offset 3 and 5 are not consecutive, are you just going to commit offset 3? If so, you may be stuck in offset 3 forever if no new messages are sent to partition 2. Also, does this work with a log configured with compact? Such a log may not have consecutive offsets to begin with. Jiangjie Qin wrote: I see, you are right. In that case it won't work. How about the following algorithm: 1. We keep a MapSourceTopicPartition, (SetOffset, MaxOffsetInSet) 2. When a message is sent, producer put its offset into the map based on source topic partition, and update the MaxOffsetInSet if necessary 3. In the producer callback, remove the offset from the Setoffset if the message was sent successfully 4. When offset commit thread comes, it gets the smallest offset in the Setoffset and commit this one. If the offset is empty, it commits MaxOffsetInSet+1. This algorithm should be able to handle compacted partitions as well. Jun Rao wrote: Yes, but getting the smallest offset from a set can be expensive. I was thinking that we can put the offset of all outstanding messages in a linked list in source offset order. Every produced message will be taken out of the linked list. We also maintain a maxOffsetSeen. The commit offset will be the offset in the first link if the linked list is not empty. Otherwise, it will be maxOffsetSeen. Yes, this solution should have no bottleneck. I'll write a raw linked list. I was thinking that offset commit is infrequent and hopefuly we do not have many pending messages for a partition at a given point. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review64152 --- On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 7,
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review64152 --- Thanks for the patch. Some comments below. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment106601 Should this api be in ConsumerConnector? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment106604 Should this be part of the public api in ConsumerConnector? Also, could we restruct the api a bit better? It seems that the public api should just be commitOffsets(offsetMap) since we know this is manual offset commit. We can let the other api commitOffsets(isAutoCommit) gets the internal offsets and pass them to commitOffsets(offsetMap). core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment106606 If we expect consumerConfig to be a singleton, we should just use options.valueOf(), instead of valuesOf().head. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment106605 This is really weird. We get a customized listener from the consumer config, but expects the listener to implement a special class MirrorMakerConsumerRelabanceListener, instead of the standard ConsumerRebalanceListener. It's probably better to get this from a MirrorMaker input param that defaults to MirrorMakerConsumerRelabanceListener. We can then add a description if custoimzation is needed, which class the customized implmenetation needs to extend from. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment106623 Does that imply there is always an offset? Is that always true? I don't quite follow how the logic works. Since the offset for each target partition is updated independantly, I am not sure why you can rely on checking that those offsets are consecutive. Also, does this logic work when there is partitioning key? It would be useful to add some comments to describe why a two-level offset map is needed. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment106622 It's weird to extend from NewShinyProducer but not using its method. Perhaps it will be clearer if we just let MirrorMakerNewProducer implement MirrorMakerBaseProducer. Ditto for MirrorMakerOldProducer. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment106621 This kind of coordination doesn't quite work. Suppose that we set inRebalance to true and entered the while loop. However, just before we call inRelanace.wait(), the producer could have finished sending all data and called inRebalance.notify(). Then we will be stuck in inRebalance.wait() forever since we missed the notification. One way to do that is to create a lock that protects the read/write of numMessageUnacked. Then we use a condition created from the lock to do the coordination. This way, both the wait/notification and the update/check of numMessageUnacked are protected by the same lock. - Jun Rao On Dec. 4, 2014, 7:59 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 4, 2014, 7:59 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 7, 2014, 2:59 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: Thanks for the patch. Some comments below. Thank you very much for the review. On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530 https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529 Does that imply there is always an offset? Is that always true? I don't quite follow how the logic works. Since the offset for each target partition is updated independantly, I am not sure why you can rely on checking that those offsets are consecutive. Also, does this logic work when there is partitioning key? It would be useful to add some comments to describe why a two-level offset map is needed. The iterator only iterates over the key that exits in the map, i.e. offset exists. It does seem confusing without explanation... Will it be clearer if the following comments are added? Following is the offset commit logic: We know that: 1. Messages from same source partition end up in the same data channel queue in order and will be sent by the same producer. 2. Messages sent by the same producer could go to different target partitions even if the messages are from the same source partition. 3. The order is only guaranteed for messages sent to the same target partition. That means a (SourceTopicPartition, TargetPartition) combination is needed. 4. For each (SourceTopicParitition, TargetPartition), keeping track of a single offset is sufficient, because if an offset is acked, all the offset smaller than that offset going to the same target partition must have been sent successfully (MaxInFlightRequest=1). That said, if we have multiple producers, after sorting all the last acked offsets of target partitions which are corresponding to the same source partition, we can commit the offsets from the smallest until the acked offset is no longer consecutive. (But we do need to set send retries to be infinite in producer config, otherwise this won't work. I'll add it to the comments.) Based on above logic, we could use Map(SourceTopicParitition, TargetPartition), offset to track the offset. But because the offset commit is based on source topic partitions, it is easier to have a MapSourceTopicPartition, MapTargetPatition, offset to find the offset to commit. That's why there is a 2-level map. The logic above does not rely on any message key, so it works in general for both keyed/non-keyed messages. On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 558-559 https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line558 It's weird to extend from NewShinyProducer but not using its method. Perhaps it will be clearer if we just let MirrorMakerNewProducer implement MirrorMakerBaseProducer. Ditto for MirrorMakerOldProducer. I was trying to avoid duplicate code seems not saving too much code. I'll get rid of the extention. On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 614-618 https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line614 This kind of coordination doesn't quite work. Suppose that we set inRebalance to true and entered the while loop. However, just before we call inRelanace.wait(), the producer could have finished sending all data and called inRebalance.notify(). Then we will be stuck in inRebalance.wait() forever since we missed the notification. One way to do that is to create a lock that protects the read/write of numMessageUnacked. Then we use a condition created from the lock to do the coordination. This way, both the wait/notification and the update/check of numMessageUnacked are protected by the same lock. Yes, it is clearer to just use a single lock. I'll change the code. But the current code itself seems to work, please corret me if I'm wrong. In this case we essentially guarantees that [enable notify, numMessagesUnacked 0 then wait] is atomic to notify. So either notify is not enabled yet or it occurs after rebalance listener starts to wait. So the notify will not be missed. If rebalance listener grabs the inRebalance lock first and is in while loop then it won't release the inRebalance lock until it enters wait. Because producer have to grab inRebalance lock before calling inReblance.notify(), the producer will not be able to call inRebalance.notify() until the offset commit thread starts to wait. There are some more complicated sequences when producers call notify mutilple times, but as long as 1.[numMessageUnacked 0 then wait] is atomic to notify, and 2.[numMessageUnacked = 0] happens before notify notify should not be missed. - Jiangjie --- This is an automatically generated e-mail. To reply, visit:
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 3, 2014, 11:02 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 4, 2014, 3:02 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613 https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613 I think there may be a race condition here, for example consider this sequence: 1. data channel only contain one message. 2. producer take the message from channel. 3. dataChannel.clear() called. 4. numMessageUnacked.get() == 0, offsets committed. 5. producer.send() called, increment numMessageUnacked. 6. data duplicate happens when the rebalance finished. I think on line 599 we should use while instead of if, but this alone does not fix this. Jiangjie Qin wrote: Yes, I actually have comment on this race condition in line 581. The reason I'm not handling it here is: 1. The chance of this situation is very slight. 2. A single duplicate message does not really hurt. 3. The fix increase the complexity of the code (looking into the producer thread status) and I'm not sure if it worth doing. 4. Even if we fix this, from the producer side, duplicates could still happen. Guozhang Wang wrote: Shall we change line 691 to while (numMessageUnacked.get() 0) at least? Yes, it should be a while loop, forgot to change it... - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review63703 --- On Dec. 4, 2014, 3:02 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 4, 2014, 3:02 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613 https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613 I think there may be a race condition here, for example consider this sequence: 1. data channel only contain one message. 2. producer take the message from channel. 3. dataChannel.clear() called. 4. numMessageUnacked.get() == 0, offsets committed. 5. producer.send() called, increment numMessageUnacked. 6. data duplicate happens when the rebalance finished. I think on line 599 we should use while instead of if, but this alone does not fix this. Yes, I actually have comment on this race condition in line 581. The reason I'm not handling it here is: 1. The chance of this situation is very slight. 2. A single duplicate message does not really hurt. 3. The fix increase the complexity of the code (looking into the producer thread status) and I'm not sure if it worth doing. 4. Even if we fix this, from the producer side, duplicates could still happen. On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 96 https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line96 Could you add a comment here about the in-flight-request config and its effects? The comments was put at the very beginning with a note. Maybe we can put a comment referring to that note. On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 186 https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line186 I am wondering are there some scenarios we want to allow customized rebalance listener? Also if we decide to make this customizable we need to make it clear that the customized listener would expect the datachannel as its constructor since this is not checked at compile time. Yes, we do foresee some usecases for this customized rebalance listener. I'll add the following comments: Customized consumer rebalance listener should extends MirrorMakerConsumerRebalanceListener and take datachannel as argument. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review63703 --- On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 24, 2014, 4:15 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718bc7ca6c835a59ca7c6879f558ec06866ee core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613 https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613 I think there may be a race condition here, for example consider this sequence: 1. data channel only contain one message. 2. producer take the message from channel. 3. dataChannel.clear() called. 4. numMessageUnacked.get() == 0, offsets committed. 5. producer.send() called, increment numMessageUnacked. 6. data duplicate happens when the rebalance finished. I think on line 599 we should use while instead of if, but this alone does not fix this. Jiangjie Qin wrote: Yes, I actually have comment on this race condition in line 581. The reason I'm not handling it here is: 1. The chance of this situation is very slight. 2. A single duplicate message does not really hurt. 3. The fix increase the complexity of the code (looking into the producer thread status) and I'm not sure if it worth doing. 4. Even if we fix this, from the producer side, duplicates could still happen. Shall we change line 691 to while (numMessageUnacked.get() 0) at least? - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review63703 --- On Dec. 3, 2014, 11:02 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 3, 2014, 11:02 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review63703 --- core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment105980 Could you add a comment here about the in-flight-request config and its effects? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment105982 I am wondering are there some scenarios we want to allow customized rebalance listener? Also if we decide to make this customizable we need to make it clear that the customized listener would expect the datachannel as its constructor since this is not checked at compile time. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment105984 I think there may be a race condition here, for example consider this sequence: 1. data channel only contain one message. 2. producer take the message from channel. 3. dataChannel.clear() called. 4. numMessageUnacked.get() == 0, offsets committed. 5. producer.send() called, increment numMessageUnacked. 6. data duplicate happens when the rebalance finished. I think on line 599 we should use while instead of if, but this alone does not fix this. - Guozhang Wang On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 24, 2014, 4:15 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718bc7ca6c835a59ca7c6879f558ec06866ee core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review63878 --- Ship it! Ship It! - Guozhang Wang On Dec. 4, 2014, 3:02 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 4, 2014, 3:02 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 4, 2014, 7:59 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 24, 2014, 4:15 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718bc7ca6c835a59ca7c6879f558ec06866ee core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 20, 2014, 8 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718bc7ca6c835a59ca7c6879f558ec06866ee core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review61528 --- core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103712 based on the hash value of the source topic-partitonId string core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103221 Could you make this consistent with others as offset.commit.internal.ms? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103713 In this case, for MaxInFlightRequests 1 we are not only at the risk of message reordering but also at the risk of message loss upon failures right? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103714 Actually, can we move this comment into the top comment of the MM class as a NOTE under the producer paragraph? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103715 Is it possible that the current chunk has been consumed completely and the fetcher thread has yet put in a new chunk, and hence hasNext() will return false? If this case shall we stop the consumer or just let it block? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103716 Since we already has the logIdent as threadName, I think we do not need this.getName here? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103718 With the logIdent it will become: FATAL [mirrormaker-offset-commit-thread] Offset commit thread exits due to ... which is a bit verbose? - Guozhang Wang On Nov. 12, 2014, 5:51 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 12, 2014, 5:51 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 18, 2014, 2:44 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
On Nov. 17, 2014, 9:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 338 https://reviews.apache.org/r/25995/diff/4/?file=760097#file760097line338 Is it possible that the current chunk has been consumed completely and the fetcher thread has yet put in a new chunk, and hence hasNext() will return false? If this case shall we stop the consumer or just let it block? I dont' think it is possible. If consuemr time out is set to -1, the hasNext() seems only return false when: 1. Shutdown message received. 2. Imcomplete message (which we should probably exit) Otherwise it will block until the next data chunk is put into the queue. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review61528 --- On Nov. 18, 2014, 2:44 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 18, 2014, 2:44 a.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 12, 2014, 5:51 p.m.) Review request for kafka. Summary (updated) - Patch for KAFKA-1650 Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review55612 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment95974 Do we need to add = here? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/25995/#comment95975 We should keep the changes of KAFKA-1647 in its only RB and do not merge them here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95978 Could we add some introduction comment here on: 1. The architecture of the MM: producer / consumer thread, data channel per producer thread, offset commit thread, and how different modules interact with each other. 2. Why we need a separate offset commit thread, and how it works. 3. The startup / shutdown process, like which modules to start / shutdown first (this could be moved to the head of the corresponding functions also). core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95979 Embedded consumer config for consuming from the source cluster. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95980 Embedded producer config for producing to the target cluster. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95981 The offset commit thread periodically commit consumed offsets to the source cluster. With the new producer, the offsets are updated upon the returned future metadata of the send() call; with the old producer, the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer messages inside the data channel could be lost upon mirror maker unclean shutdown. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96019 numMessageCapacity and byteCapacity? numGetters and numPutters (since the producer is the consumer of this buffer and vice versa)? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96021 How about MirrorMaker-DataChannel-queue%d-NumMessages and MirrorMaker-DataChannel-queue%d-Bytes? and variable name channelNumMessageHists and channelByteHists? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96020 Can we define put(record, queueId) and put(record), and the latter includes the logic of determining the queueId and then call the former? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96022 comment on why we use the hashCode of source topic / partition here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96026 Instead of letting the consumer to check on the global shutdown flag, could we just add a shutdown function which sets it own flag like the producer thread and the commit thread? Then the process of the shutdown becomes consumers.shutdown consumers.awaitShutdown producers.shutdown producers.awaitShutdown committer.shutdown committer.awaitShutdown connector.shutdown core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96023 Maybe just // if it exits accidentally, stop the entire mirror maker as we did below? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96024 // if it exits accidentally, stop the entire mirror maker core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96025 // the committed offset will be the first offset of the un-consumed message, hence we need to increment by one. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/25995/#comment96027 queueNumItemCapacity and queueByteCapacity? - Guozhang Wang On Oct. 6, 2014, 5:20 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Oct. 6, 2014, 5:20 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Talked with Joel and decided to remove multi connector support as people can always creat multiple mirror maker instances if they want to consumer from multiple clusters. Diffs -
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Oct. 6, 2014, 5:17 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Oct. 6, 2014, 5:20 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Talked with Joel and decided to remove multi connector support as people can always creat multiple mirror maker instances if they want to consumer from multiple clusters. Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review54620 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment94830 Do we need to do this check every time in the loop? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94831 no need empty line here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94832 No need bracket core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94833 No need bracket core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94835 Maximum bytes that can be buffered in the data channels core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94834 in terms of bytes core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94836 Inconsistency indentation. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94838 Capitalize: Offset commit interval in ms core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94841 Do you need to turn off auto commit on the consumer threads here? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94840 We can add some more comment here, explaning: 1) why we add the offset commit thread for new producer, but not old producer; 2) what risks does the old producer have (for not having offset commit thread). core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94842 For clean shutdown, you need to 1) halt consumer threads first. 2) wait for producer to drain all the messages in data channel. 3) manually commit offsets on consumer threads. 4) shut down consumer threads. Otherwise we will have data duplicates as we commit offsets based on min. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94844 queueId core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94846 How about having a histogram for each queue instead of getting the sum? The update call would be a bit less expensive and we can monitor if some queues are empty while others get all the data. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94847 Ditto above. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94849 Add comments explaining why we force an unclean shutdown with System.exit here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94857 Unfortunately this may not be the case, as we can have multiple connectors which are using different consumer configs with different group ids. We need to either 1) change the config settings to enforce this to be true, or 2) use a separate offset client that remembers which topics belongs to which groups. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94858 Capitalize first word core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94859 Capitalize first word core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94863 Adding comment to the logic of how this works. Also a few questions: 1) is the map() call synchronized with other threads putting new offsets into the map? 2) after the sorting, the logic may be clearer as val commitableOffsetIndex = 0 while (offsets[commitableOffsetIndex] - offsets.head == commitableOffsetIndex) commitableOffsetIndex += 1 offsetToCommit = offsets[commitableOffsetIndex] + 1 core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94855 The send().get() call is missing. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/25995/#comment94853 Apache header missing. - Guozhang Wang On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Sept. 24, 2014, 4:26 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description
Re: Review Request 25995: Patch for KAFKA-1650
On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 299 https://reviews.apache.org/r/25995/diff/1/?file=704523#file704523line299 Do we need to do this check every time in the loop? Maybe we can put this check out of while loop but that would probably introduce more duplicate code. Since the offset commit is not that frequent and the retry is hopefully rare, it should not create much overhead. On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 168 https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line168 Do you need to turn off auto commit on the consumer threads here? I thought offset commit should be turned off in the consumer config. Is that the case? On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 207 https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line207 For clean shutdown, you need to 1) halt consumer threads first. 2) wait for producer to drain all the messages in data channel. 3) manually commit offsets on consumer threads. 4) shut down consumer threads. Otherwise we will have data duplicates as we commit offsets based on min. Talked to Guozhang, changed the process to be as below: 1. shutdown consumer threads. 2. shutdown producer 3. commit offsets 4. shutdown consumer connectors On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 447 https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line447 Adding comment to the logic of how this works. Also a few questions: 1) is the map() call synchronized with other threads putting new offsets into the map? 2) after the sorting, the logic may be clearer as val commitableOffsetIndex = 0 while (offsets[commitableOffsetIndex] - offsets.head == commitableOffsetIndex) commitableOffsetIndex += 1 offsetToCommit = offsets[commitableOffsetIndex] + 1 We are using a concurrent map, that guarantees a single put/get operation is atomic. Although its possible that the offset we get for different partitions might reflects different time point's value. But it should not matter that much because later commits will get updated value. And the offset we commit when exiting is guaranteed to be after the producer is shutdown. So I think the commits during running time does not needs to be 100% accurate. On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 483 https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line483 The send().get() call is missing. I put it in side the put(). - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review54620 --- On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Sept. 24, 2014, 4:26 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description --- mirror maker redesign; adding byte bounded blocking queue. Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description --- mirror maker redesign; adding byte bounded blocking queue. Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin