Re: Review Request 27391: Fix KAFKA-1634

2014-12-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review65462
---



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment108626

can you add a comment: // only v0, v1 of offsetcommitrequest



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment108637

Not introduced by your patch, but it is odd that these are named 
topicResponseObj and partitionResponse below - probably an artifact of 
copy/paste. Can you do a rename here before checking in?



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
https://reviews.apache.org/r/27391/#comment108638

I think we discussed before that timestamp does not need to be a var. We 
can use the case class copy method to make a copy + edit.



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/27391/#comment108642

Thanks for fixing this



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment108643

I think it would be better to move this to just before the call to 
offsetCommitValue in the loop in line 228. This method should only be 
responsible for taking the offsetAndMetadata and converting that into the 
on-disk bytes and should not concern itself with setting a critical field like 
the expiration timestamp. I was actually looking for where this happens (i.e., 
setting the expiration time) and took me a while to realize it was hidden in 
here.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment108644

I think we can make this and some other methods here private.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment108645

private



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment108646

Also, let us use a case class instead of a tuple



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/27391/#comment108655

Rather than sleep, we should improve OffsetManager to take in a 
MockScheduler instance - we can pass through the time instance from KafkaServer 
to offsetManager as we do for LogManager and replicaManager. That way we can 
advance time with MockTime. This test will need to change from OffsetCommitTest 
to OffsetManagerTest and we will just test the OffsetManager. Can you file a 
jira for that? Although that would make sense only after you check this in.



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/27391/#comment108647

If the offset in fact did expire, the assertion itself won't fail - i.e., 
you will get a NoSuchElementException

Same comments apply to checks below.


- Joel Koshy


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Dec. 2, 2014, 2:03 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Add another api in offset manager to return the struct, and the cache layer 
 will only read its expiration timestamp while the offset formatter will read 
 the struct as a whole
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 da29a8cb461099eb675161db2f11a9937424a5c6 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 3c79428962604800983415f6f705e04f52acb8fb 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Joel Koshy


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
  338
  https://reviews.apache.org/r/25995/diff/13/?file=793041#file793041line338
 
  Can we make this a reliable commit - i.e., with retries up to the 
  configured retry count? The policy is retry on commit errors during 
  rebalance or shutdown, no need to retry on commit errors during 
  auto-commits.
  
  So for e.g., if a mirror maker rebalances and there is simultaneously 
  offset manager movement we would need to retry the commit.
  
  This is the motivation for the isAutoCommit flag - however, there seems 
  to be a bug right now which maybe you can fix. i.e., if this is not an 
  auto-commit then set retries to configured retries else no retries.
 
 Jiangjie Qin wrote:
 Changed the code based you your suggestion. My original thinking is that 
 in mirror maker one commit failure actually does not matter too much because 
 next commit will succeed if the failure is due to offset topic leader 
 migration, etc. But for a more general purpose, it probably should retry if 
 it is not an auto commit.

I was thinking more about the shutdown and rebalance cases. We ideally want the 
commits to be reliable for those cases.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 192
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line192
 
  Why do you need a dummy param?
 
 Jiangjie Qin wrote:
 Because the Utils.createObjct needs a args parameter and if we pass in a 
 null it will give an NPE...
 I've changed the code in Utils to allow us to pass in a null which use 
 the no arg constructor.

See comment in latest RB.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489
 
  Why not use KafkaScheduler for the offset commit task?
 
 Jiangjie Qin wrote:
 Haven't thought that before... But it looks that we need to do some more 
 handling when something wrong happen in the offset commit threads. The 
 KafkaScheduler code seems not do so.

So you can make the task itself catch throwables. So it would look something 
like this:

scheduler.schedule(mirrorMakerOffsetsCommiter, commitTask, ...)

And in commitTask:
try {
  commitOffsets()
}
catch {
  case t: Throwable =
// handle
}

That said, I don't think connector.commitOffsets will throw anything - since we 
catch all throwables there.

The only additional detail is that after you shutdown the scheduler you will 
need to call commitOffsets() manually one last time.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line614
 
  There might be a small memory leak here: if there was an error though 
  (in the branch above) it seems no one removes the offset from the list.
 
 Jiangjie Qin wrote:
 Yes, that is a memory leak, but in this case we should not commit the 
 offset of the message that was not sent successfully either. If any exception 
 occurs then the offsets will not advance anymore. We probably should have an 
 alert on the mirror consumer lags.

Hmm.. not sure if I'm on the same page here. See comments on new RB.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 705
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line705
 
  You need to also set tail.next = null (or None)
 
 Jiangjie Qin wrote:
 tail.next = null will be handled in the previous if...else... the old 
 tail.prev.next will become new tail.next, which will be null.

Makes sense.


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 666
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line666
 
  Should we expose a size() method - i.e., increment on add and decrement 
  on remove. We can aggregate the size of all the offset lists outside and 
  emit a gauge. That will give us some assurance that there are no 
  forgotten offsets. Re: the potential leak mentioned above.
  
  In fact, I'm a bit nervous about correctness since this is a custom 
  implementation of a semi-non-trivial data structure. We should probably 
  even assert that it is empty when numMessageUnacked goes to zero as part of 
  the rebalance.
  
  Ideally, these custom implementations need a full-fledged unit test.
 
 Jiangjie Qin wrote:
 That's a good point, we probably need a metric to see if we have some 
 stuck offsets. But those stuck offsets should also not be committed anyways. 
 We need to be alerted on that situation once it happens. Maybe add an 
 assertion on the exception block where stuck 

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment108686

This should be like the previous code - i.e., 1 + ...

E.g., if config.offsetsCommitMaxRetries is one, then we can have two 
attempts. In this version at most one attempt will be made.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment108687

Actually we should fix this. If this is the final commit during shutting 
down (and NOT autocommit) then we need to retry on error up to retries 
remaining.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108691

Can you use a named parameter here? i.e., `valueFactory = Some(...`



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108690

Should this be fatal? i.e., fatal is normally used before exiting 
(abnormally). WARN would be more suitable.

I don't think it makes sense to not advance the offset here especially if 
you will still keep sending messages. I think you need to remove it from the 
unacked offset list. E.g., you may configure your mirror maker producer to only 
few retries (in which case you are okay with data loss). In this scenario you 
should just let the error go and allow the mirror maker to proceed normally.

If someone wants zero data loss the MM should be configured with required 
acks -1 and infinite retries.

Maybe I'm misunderstanding what zero data loss really means - can you 
clarify? (Especially if someone configures the producer with acks (say) one and 
limited retries)



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108689

Should we make this a generic DoublyLinkedList data structure in utils or 
some other suitable place and unit test it as well?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment108688

Does not seem to be used. (but it probably should be used)



core/src/main/scala/kafka/utils/Utils.scala
https://reviews.apache.org/r/25995/#comment108692

I don't think this is necessary right? i.e., args.map won't throw an NPE if 
you don't provide any additional arguments.

scala def f(args: Int*) {println(args.size)}
f: (args: Int*)Unit

scala f(1,2)
2

scala f()
0


- Joel Koshy


On Dec. 17, 2014, 8:29 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 17, 2014, 8:29 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 

[jira] [Commented] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14251549#comment-14251549
 ] 

Joel Koshy commented on KAFKA-1823:
---

This was caused by KAFKA-1684 - it can be reproduced by setting MaxTopicCount 
to 0. Will upload a patch in a minute.

 transient unit test failure in PartitionAssignorTest
 

 Key: KAFKA-1823
 URL: https://issues.apache.org/jira/browse/KAFKA-1823
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao

 Saw the following transient unit test failure.
 unit.kafka.consumer.PartitionAssignorTest  testRoundRobinPartitionAssignor 
 FAILED
 java.lang.UnsupportedOperationException: empty.max
 at 
 scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
 at scala.collection.AbstractIterator.max(Iterator.scala:1157)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 29203: Patch for KAFKA-1823

2014-12-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29203/
---

Review request for kafka.


Bugs: KAFKA-1823
https://issues.apache.org/jira/browse/KAFKA-1823


Repository: kafka


Description
---

KAFKA-1823; Fix transient PartitionAssignorTest failure (triggered when there 
are no topics)


Diffs
-

  core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala 
24954de66ccc5158696166b7e2aabad0f1b1f287 

Diff: https://reviews.apache.org/r/29203/diff/


Testing
---


Thanks,

Joel Koshy



[jira] [Updated] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1823:
--
Attachment: KAFKA-1823.patch

 transient unit test failure in PartitionAssignorTest
 

 Key: KAFKA-1823
 URL: https://issues.apache.org/jira/browse/KAFKA-1823
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
 Attachments: KAFKA-1823.patch


 Saw the following transient unit test failure.
 unit.kafka.consumer.PartitionAssignorTest  testRoundRobinPartitionAssignor 
 FAILED
 java.lang.UnsupportedOperationException: empty.max
 at 
 scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
 at scala.collection.AbstractIterator.max(Iterator.scala:1157)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1823:
--
Assignee: Joel Koshy
  Status: Patch Available  (was: Open)

 transient unit test failure in PartitionAssignorTest
 

 Key: KAFKA-1823
 URL: https://issues.apache.org/jira/browse/KAFKA-1823
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Joel Koshy
 Attachments: KAFKA-1823.patch


 Saw the following transient unit test failure.
 unit.kafka.consumer.PartitionAssignorTest  testRoundRobinPartitionAssignor 
 FAILED
 java.lang.UnsupportedOperationException: empty.max
 at 
 scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
 at scala.collection.AbstractIterator.max(Iterator.scala:1157)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1823) transient unit test failure in PartitionAssignorTest

2014-12-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14251551#comment-14251551
 ] 

Joel Koshy commented on KAFKA-1823:
---

Created reviewboard https://reviews.apache.org/r/29203/
 against branch origin/trunk

 transient unit test failure in PartitionAssignorTest
 

 Key: KAFKA-1823
 URL: https://issues.apache.org/jira/browse/KAFKA-1823
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
 Attachments: KAFKA-1823.patch


 Saw the following transient unit test failure.
 unit.kafka.consumer.PartitionAssignorTest  testRoundRobinPartitionAssignor 
 FAILED
 java.lang.UnsupportedOperationException: empty.max
 at 
 scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
 at scala.collection.AbstractIterator.max(Iterator.scala:1157)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-1824:
---

 Summary: in ConsoleProducer - properties key.separator and 
parse.key no longer work
 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira


Looks like the change in kafka-1711 breaks them accidentally.

reader.init is called with readerProps which is initialized with commandline 
properties as defaults.

the problem is that reader.init checks:
if(props.containsKey(parse.key))
and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer

2014-12-18 Thread lokesh Birla (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14251940#comment-14251940
 ] 

lokesh Birla commented on KAFKA-1806:
-

Hi Neha,

What is the status of fixing this issue? This issue happens on every run. I 
have seen, if I use: num.replica.fetchers=1, then sometimes this issue goes 
away however I see other problem of leadership changes very often even when all 
brokers are running. 

If I set: num.replica.fetchers=4, then I can reproduce this issue on every run. 

Please let me or Evan (from sarama) know if you need any help to fix this. 

 broker can still expose uncommitted data to a consumer
 --

 Key: KAFKA-1806
 URL: https://issues.apache.org/jira/browse/KAFKA-1806
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: lokesh Birla
Assignee: Neha Narkhede

 Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
 is marked fixed but I still see this issue in 0.8.1.1. I am able to 
 reproducer the issue consistently. 
 [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch 
 request for partition [mmetopic4,2] offset 1940029 from consumer with 
 correlation id 21 (kafka.server.Kaf
 kaApis)
 java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
 (1818353) less than the start offset (1940029).
 at kafka.log.LogSegment.read(LogSegment.scala:136)
 at kafka.log.Log.read(Log.scala:386)
 at 
 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
 at 
 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
 at 
 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
 at scala.collection.immutable.Map$Map1.map(Map.scala:107)
 at 
 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
 at 
 kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
 at 
 kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
 at 
 kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
 at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 29210: Patch for KAFKA-1819

2014-12-18 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29210/
---

Review request for kafka.


Bugs: KAFKA-1819
https://issues.apache.org/jira/browse/KAFKA-1819


Repository: kafka


Description
---

added locking


Diffs
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
f8fcb843c80eec3cf3c931df6bb472c019305253 
  core/src/main/scala/kafka/log/LogCleanerManager.scala 
bcfef77ed53f94017c06a884e4db14531774a0a2 
  core/src/main/scala/kafka/log/LogManager.scala 
4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
29cc01bcef9cacd8dec1f5d662644fc6fe4994bc 

Diff: https://reviews.apache.org/r/29210/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1819:

Attachment: KAFKA-1819.patch

 Cleaner gets confused about deleted and re-created topics
 -

 Key: KAFKA-1819
 URL: https://issues.apache.org/jira/browse/KAFKA-1819
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1819.patch


 I get an error like this after deleting a compacted topic and re-creating it. 
 I think it's because the brokers don't remove cleaning checkpoints from the 
 cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 587607 but segment base offset is 0 for log foo-6.
 at scala.Predef$.require(Predef.scala:233)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
 at kafka.log.Cleaner.clean(LogCleaner.scala:300)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252039#comment-14252039
 ] 

Gwen Shapira commented on KAFKA-1819:
-

Created reviewboard https://reviews.apache.org/r/29210/diff/
 against branch trunk

 Cleaner gets confused about deleted and re-created topics
 -

 Key: KAFKA-1819
 URL: https://issues.apache.org/jira/browse/KAFKA-1819
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1819.patch


 I get an error like this after deleting a compacted topic and re-creating it. 
 I think it's because the brokers don't remove cleaning checkpoints from the 
 cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 587607 but segment base offset is 0 for log foo-6.
 at scala.Predef$.require(Predef.scala:233)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
 at kafka.log.Cleaner.clean(LogCleaner.scala:300)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1819:

Assignee: Gwen Shapira
  Status: Patch Available  (was: Open)

 Cleaner gets confused about deleted and re-created topics
 -

 Key: KAFKA-1819
 URL: https://issues.apache.org/jira/browse/KAFKA-1819
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Assignee: Gwen Shapira
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1819.patch


 I get an error like this after deleting a compacted topic and re-creating it. 
 I think it's because the brokers don't remove cleaning checkpoints from the 
 cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 587607 but segment base offset is 0 for log foo-6.
 at scala.Predef$.require(Predef.scala:233)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
 at kafka.log.Cleaner.clean(LogCleaner.scala:300)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252061#comment-14252061
 ] 

Gwen Shapira commented on KAFKA-1824:
-

Created reviewboard https://reviews.apache.org/r/29211/diff/
 against branch trunk

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
 Attachments: KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1824:

Attachment: KAFKA-1824.patch

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
 Attachments: KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1824:

Assignee: Gwen Shapira
  Status: Patch Available  (was: Open)

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: API Annotations

2014-12-18 Thread Gwen Shapira
Thanks for the comments, Joe and Jay.

Doing the public / private designation at package level is definitely
easier than going method-by-method :)

If I get your comments right, the idea is to publish java docs for the
public apis on our website (or archive.apache.org) and not publish
java docs for the private apis.

The pros to that approach is that it doesn't require going through
every single public method in our code-base and marking it private or
public. I'm definitely not looking forward to that patch.
The cons is that open source developers don't necessarily assume that
lack of java docs imply that a method is not public... they usually
assume that the project was sloppy about its docs.

I think the pros win here :)

Regarding the location of the javadocs - the website, via SVN is
traditional, and probably more google-able. Updating the SVN shouldn't
be too-painful to do as part of the release process?  Right now
googling for kafka javadoc does not lead the to latest 0.8.2 docs.
I'm getting github, stackoverflow and Neha's private apache page.

Then there's the other attribute - which API is stable.
Currently Kafka pretty much assumes an API is stable from the moment
its committed to the trunk, I believe. It may make the development
process slightly easier if we can mark new APIs as evolving until we
are certain we are happy with them. It will allow us to iterate faster
and let users try out newer APIs.

Do you think adding this will be helpful?



On Tue, Dec 16, 2014 at 11:24 AM, Jay Kreps jay.kr...@gmail.com wrote:
 Hey Gwen,

 We discussed this a bit about this when starting on the new clients.

 We were super sloppy about this in initial Kafka development--single jar,
 no real differentiation between public and private apis.

 The plan was something like the following:
 1. Start to consider this with the new clients.
 2. Do the public/private designation at the package level. The public
 packages are o.a.k.common, o.a.k.errors, o.a.k.producer, o.a.k.consumer,
 o.a.k.tools. This makes javadoc and things like that easier, and it makes
 it easy to see at a glance all the public classes. It would be even better
 to enforce this in the build if that is possible (i.e. no class from a
 non-public package is leaked) but we haven't done this. This approach
 obviously wasn't possible in Hadoop since they started without a clear
 delineation as we did in the original scala code.

 Thoughts?

 -Jay


 On Tue, Dec 16, 2014 at 10:04 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

 Hi,

 Kafka has public APIs in Java and Scala, intended for use by external
 developers.
 In addition, Kafka also exposes many public methods that are intended
 to use within Kafka but are not intended to be called by external
 developers.
 Also, some of the external APIs are less stable than others (the new
 producer for example).

 In Hadoop we have a similar situation, and to avoid misunderstandings
 or miscommunications on which APIs are external and which are stable,
 we use annotations to communicate this information.
 We find it very useful in preventing our customers from accidentally
 getting into trouble by using internal methods or unstable APIs.

 Here are the annotations Hadoop uses:

 https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html

 https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html

 I'm wondering what others think about using something similar in Kafka.

 Gwen



Re: Review Request 29211: Patch for KAFKA-1824

2014-12-18 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29211/#review65574
---

Ship it!


Ship It!

- Neha Narkhede


On Dec. 18, 2014, 7:08 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29211/
 ---
 
 (Updated Dec. 18, 2014, 7:08 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1824
 https://issues.apache.org/jira/browse/KAFKA-1824
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1824 - fix ConsoleProducer so parse.key and key.separator will work 
 again
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
 1061cc74fac69693836f1e75add06b09d459a764 
 
 Diff: https://reviews.apache.org/r/29211/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1824:
-
Fix Version/s: 0.8.3

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1824:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patch. Pushed to trunk.

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252706#comment-14252706
 ] 

Gwen Shapira commented on KAFKA-1824:
-

Thanks for the quick review [~nehanarkhede]!

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29210: Patch for KAFKA-1819

2014-12-18 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29210/#review65578
---


Overall, looks good. Have one suggestion below.


core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
https://reviews.apache.org/r/29210/#comment108824

Since the bug is about entries related to deleted topics, it will be good 
to add that verification step to all tests in DeleteTopicTest.


- Neha Narkhede


On Dec. 18, 2014, 6:59 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29210/
 ---
 
 (Updated Dec. 18, 2014, 6:59 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1819
 https://issues.apache.org/jira/browse/KAFKA-1819
 
 
 Repository: kafka
 
 
 Description
 ---
 
 added locking
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/LogCleaner.scala 
 f8fcb843c80eec3cf3c931df6bb472c019305253 
   core/src/main/scala/kafka/log/LogCleanerManager.scala 
 bcfef77ed53f94017c06a884e4db14531774a0a2 
   core/src/main/scala/kafka/log/LogManager.scala 
 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc 
 
 Diff: https://reviews.apache.org/r/29210/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




Build failed in Jenkins: Kafka-trunk #357

2014-12-18 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/357/changes

Changes:

[neha.narkhede] KAFKA-1824 - fix ConsoleProducer so parse.key and key.separator 
will work again; reviewed by Neha Narkhede

--
[...truncated 1707 lines...]
kafka.admin.AdminTest  testShutdownBroker PASSED

kafka.admin.AdminTest  testTopicConfigChange PASSED

kafka.admin.AddPartitionsTest  testTopicDoesNotExist PASSED

kafka.admin.AddPartitionsTest  testWrongReplicaCount PASSED

kafka.admin.AddPartitionsTest  testIncrementPartitions PASSED

kafka.admin.AddPartitionsTest  testManualAssignmentOfReplicas PASSED

kafka.admin.AddPartitionsTest  testReplicaPlacement PASSED

kafka.admin.DeleteTopicTest  testDeleteTopicWithAllAliveReplicas PASSED

kafka.admin.DeleteTopicTest  testResumeDeleteTopicWithRecoveredFollower PASSED

kafka.admin.DeleteTopicTest  testResumeDeleteTopicOnControllerFailover PASSED

kafka.admin.DeleteTopicTest  testPartitionReassignmentDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest  testDeleteTopicDuringAddPartition PASSED

kafka.admin.DeleteTopicTest  testAddPartitionDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest  testRecreateTopicAfterDeletion PASSED

kafka.admin.DeleteTopicTest  testDeleteNonExistingTopic PASSED

kafka.api.RequestResponseSerializationTest  
testSerializationAndDeserialization PASSED

kafka.api.ApiUtilsTest  testShortStringNonASCII PASSED

kafka.api.ApiUtilsTest  testShortStringASCII PASSED

kafka.api.test.ProducerSendTest  testSendOffset PASSED

kafka.api.test.ProducerSendTest  testClose PASSED

kafka.api.test.ProducerSendTest  testSendToPartition PASSED

kafka.api.test.ProducerSendTest  testAutoCreateTopic PASSED

kafka.api.test.ProducerFailureHandlingTest  testNotEnoughReplicas PASSED

kafka.api.test.ProducerFailureHandlingTest  testInvalidPartition PASSED

kafka.api.test.ProducerFailureHandlingTest  testTooLargeRecordWithAckZero 
PASSED

kafka.api.test.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne PASSED

kafka.api.test.ProducerFailureHandlingTest  testNonExistentTopic PASSED

kafka.api.test.ProducerFailureHandlingTest  testWrongBrokerList PASSED

kafka.api.test.ProducerFailureHandlingTest  testNoResponse PASSED

kafka.api.test.ProducerFailureHandlingTest  testSendAfterClosed PASSED

kafka.api.test.ProducerFailureHandlingTest  testBrokerFailure PASSED

kafka.api.test.ProducerFailureHandlingTest  testCannotSendToInternalTopic 
PASSED

kafka.api.test.ProducerFailureHandlingTest  
testNotEnoughReplicasAfterBrokerShutdown PASSED

kafka.api.test.ProducerCompressionTest  testCompression[0] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[1] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[2] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[3] PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest  testBasic PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testSizeInBytes PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testEqualsWithCompression 
PASSED

kafka.integration.UncleanLeaderElectionTest  testUncleanLeaderElectionEnabled 
PASSED

kafka.integration.UncleanLeaderElectionTest  testUncleanLeaderElectionDisabled 
FAILED
kafka.common.KafkaException: Socket server failed to bind to 
localhost:53018: Address already in use.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:260)
at kafka.network.Acceptor.init(SocketServer.scala:205)
at kafka.network.SocketServer.startup(SocketServer.scala:86)
at kafka.server.KafkaServer.startup(KafkaServer.scala:98)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:134)
at 
kafka.integration.UncleanLeaderElectionTest$$anonfun$startBrokers$1.apply(UncleanLeaderElectionTest.scala:95)
at 
kafka.integration.UncleanLeaderElectionTest$$anonfun$startBrokers$1.apply(UncleanLeaderElectionTest.scala:93)
at scala.collection.immutable.List.foreach(List.scala:383)
at 
kafka.integration.UncleanLeaderElectionTest.startBrokers(UncleanLeaderElectionTest.scala:93)
at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:115)

Caused by:
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)

Re: API Annotations

2014-12-18 Thread Jay Kreps
Yes, exactly--the goal is to just publish the javadoc on the public classes
and all methods on a public class are considered public. Right now we
really haven't done proper docs for the new producer--no javadoc and no
examples either. This needs to get done before 0.8.2.

The idea of an @Experimental annotation sounds like a good way to introduce
features we aren't sure about. I wouldn't want us to abuse it since in
practice I suspect people expect binary compatability and often breaks are
transitive (i.e. some common library in you stack uses an experimental
feature then you can't upgrade Kafka, and the person impacted won't know
that the api had an annotation).

-Jay

On Thu, Dec 18, 2014 at 4:43 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks for the comments, Joe and Jay.

 Doing the public / private designation at package level is definitely
 easier than going method-by-method :)

 If I get your comments right, the idea is to publish java docs for the
 public apis on our website (or archive.apache.org) and not publish
 java docs for the private apis.

 The pros to that approach is that it doesn't require going through
 every single public method in our code-base and marking it private or
 public. I'm definitely not looking forward to that patch.
 The cons is that open source developers don't necessarily assume that
 lack of java docs imply that a method is not public... they usually
 assume that the project was sloppy about its docs.

 I think the pros win here :)

 Regarding the location of the javadocs - the website, via SVN is
 traditional, and probably more google-able. Updating the SVN shouldn't
 be too-painful to do as part of the release process?  Right now
 googling for kafka javadoc does not lead the to latest 0.8.2 docs.
 I'm getting github, stackoverflow and Neha's private apache page.

 Then there's the other attribute - which API is stable.
 Currently Kafka pretty much assumes an API is stable from the moment
 its committed to the trunk, I believe. It may make the development
 process slightly easier if we can mark new APIs as evolving until we
 are certain we are happy with them. It will allow us to iterate faster
 and let users try out newer APIs.

 Do you think adding this will be helpful?



 On Tue, Dec 16, 2014 at 11:24 AM, Jay Kreps jay.kr...@gmail.com wrote:
  Hey Gwen,
 
  We discussed this a bit about this when starting on the new clients.
 
  We were super sloppy about this in initial Kafka development--single jar,
  no real differentiation between public and private apis.
 
  The plan was something like the following:
  1. Start to consider this with the new clients.
  2. Do the public/private designation at the package level. The public
  packages are o.a.k.common, o.a.k.errors, o.a.k.producer, o.a.k.consumer,
  o.a.k.tools. This makes javadoc and things like that easier, and it makes
  it easy to see at a glance all the public classes. It would be even
 better
  to enforce this in the build if that is possible (i.e. no class from a
  non-public package is leaked) but we haven't done this. This approach
  obviously wasn't possible in Hadoop since they started without a clear
  delineation as we did in the original scala code.
 
  Thoughts?
 
  -Jay
 
 
  On Tue, Dec 16, 2014 at 10:04 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
  Hi,
 
  Kafka has public APIs in Java and Scala, intended for use by external
  developers.
  In addition, Kafka also exposes many public methods that are intended
  to use within Kafka but are not intended to be called by external
  developers.
  Also, some of the external APIs are less stable than others (the new
  producer for example).
 
  In Hadoop we have a similar situation, and to avoid misunderstandings
  or miscommunications on which APIs are external and which are stable,
  we use annotations to communicate this information.
  We find it very useful in preventing our customers from accidentally
  getting into trouble by using internal methods or unstable APIs.
 
  Here are the annotations Hadoop uses:
 
 
 https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html
 
 
 https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html
 
  I'm wondering what others think about using something similar in Kafka.
 
  Gwen
 



Review Request 29231: Patch for KAFKA-1824

2014-12-18 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29231/
---

Review request for kafka.


Bugs: KAFKA-1824
https://issues.apache.org/jira/browse/KAFKA-1824


Repository: kafka


Description
---

fixing accidental return of WARN Property topic is not valid


Diffs
-

  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
1061cc74fac69693836f1e75add06b09d459a764 

Diff: https://reviews.apache.org/r/29231/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1824:

Attachment: KAFKA-1824.patch

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1824.patch, KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-766) Isr shrink/expand check is fragile

2014-12-18 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-766:

Fix Version/s: 0.8.3

 Isr shrink/expand check is fragile
 --

 Key: KAFKA-766
 URL: https://issues.apache.org/jira/browse/KAFKA-766
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Sriram Subramanian
Assignee: Neha Narkhede
 Fix For: 0.8.3


 Currently the isr check is coupled tightly with the produce batch size. For 
 example, if the producer batch size is 1 messages and isr check is 4000 
 messages, we continuously oscillate between shrinking isr and expanding isr 
 every second. This is because a single produce request throws the replica out 
 of the isr. This results in hundreds of calls to ZK (we still dont have multi 
 write). This can be alleviated by making the producer batch size smaller than 
 the isr check size. 
 Going forward, we should try to not have this coupling. It is worth 
 investigating if we can make the check more robust under such scenarios. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252774#comment-14252774
 ] 

Gwen Shapira commented on KAFKA-1824:
-

Created reviewboard https://reviews.apache.org/r/29231/diff/
 against branch trunk

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1824.patch, KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252777#comment-14252777
 ] 

Gwen Shapira commented on KAFKA-1824:
-

My apologies! 

Additional round of tests revealed that my first patch accidentally breaks what 
was fixed in KAFKA-1711 - i.e. the WARN Property topic is not valid message 
returned. Those VerifiableProperties are tricky!

I added a new patch, on top of the one already committed that removes the extra 
properties before creating the producer and eliminates the WARN messages.

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1824.patch, KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252805#comment-14252805
 ] 

Neha Narkhede commented on KAFKA-1824:
--

[~gwenshap] Good catch. I wonder if we should just add tests given the tricky 
logic involved.

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1824.patch, KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer

2014-12-18 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252807#comment-14252807
 ] 

Neha Narkhede commented on KAFKA-1806:
--

[~lokeshbirla] I was looking for steps to reproduce this. So if I download 
0.8.2-beta and go through your steps, I should be able to see the same error 
you see.

 broker can still expose uncommitted data to a consumer
 --

 Key: KAFKA-1806
 URL: https://issues.apache.org/jira/browse/KAFKA-1806
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: lokesh Birla
Assignee: Neha Narkhede

 Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
 is marked fixed but I still see this issue in 0.8.1.1. I am able to 
 reproducer the issue consistently. 
 [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch 
 request for partition [mmetopic4,2] offset 1940029 from consumer with 
 correlation id 21 (kafka.server.Kaf
 kaApis)
 java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
 (1818353) less than the start offset (1940029).
 at kafka.log.LogSegment.read(LogSegment.scala:136)
 at kafka.log.Log.read(Log.scala:386)
 at 
 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
 at 
 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
 at 
 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
 at scala.collection.immutable.Map$Map1.map(Map.scala:107)
 at 
 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
 at 
 kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
 at 
 kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
 at 
 kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
 at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29210: Patch for KAFKA-1819

2014-12-18 Thread Gwen Shapira


 On Dec. 19, 2014, 1:23 a.m., Neha Narkhede wrote:
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, line 231
  https://reviews.apache.org/r/29210/diff/1/?file=796108#file796108line231
 
  Since the bug is about entries related to deleted topics, it will be 
  good to add that verification step to all tests in DeleteTopicTest.

I added the delete.topic.enable to createTestTopicAndCluster(), which is used 
by all tests in DeleteTopicTest. This exercises the part of the code-path where 
we abort and checkpoint the cleaner.

However, this does not provide any verification that the cleaner checkpoint 
file was correctly updated. I wanted to add that, but it looks like getting the 
content of the checkpoint file from the information available at the delete 
topic tests will require quite intrusive modification to unrelated parts of the 
code (the cleaner being private to the log, for example). So I left this part 
out, and did some manual testing instead.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29210/#review65578
---


On Dec. 18, 2014, 6:59 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29210/
 ---
 
 (Updated Dec. 18, 2014, 6:59 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1819
 https://issues.apache.org/jira/browse/KAFKA-1819
 
 
 Repository: kafka
 
 
 Description
 ---
 
 added locking
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/LogCleaner.scala 
 f8fcb843c80eec3cf3c931df6bb472c019305253 
   core/src/main/scala/kafka/log/LogCleanerManager.scala 
 bcfef77ed53f94017c06a884e4db14531774a0a2 
   core/src/main/scala/kafka/log/LogManager.scala 
 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc 
 
 Diff: https://reviews.apache.org/r/29210/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Commented] (KAFKA-1824) in ConsoleProducer - properties key.separator and parse.key no longer work

2014-12-18 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252809#comment-14252809
 ] 

Gwen Shapira commented on KAFKA-1824:
-

Yes, lets do that. Will help us avoid another round of the break-and-fix cycle.

Hold off on this patch and I'll provide tests in a day or two.

 in ConsoleProducer - properties key.separator and parse.key no longer work
 --

 Key: KAFKA-1824
 URL: https://issues.apache.org/jira/browse/KAFKA-1824
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1824.patch, KAFKA-1824.patch


 Looks like the change in kafka-1711 breaks them accidentally.
 reader.init is called with readerProps which is initialized with commandline 
 properties as defaults.
 the problem is that reader.init checks:
 if(props.containsKey(parse.key))
 and defaults don't return true in this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29231: Patch for KAFKA-1824

2014-12-18 Thread Eric Olander

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29231/#review65582
---



core/src/main/scala/kafka/tools/ConsoleProducer.scala
https://reviews.apache.org/r/29231/#comment108830

remove() returns the value assigned to the key being removed, so you could 
simply do:

topic = props.remove(topic)

instead of the getProperty() and remove()


- Eric Olander


On Dec. 19, 2014, 1:56 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29231/
 ---
 
 (Updated Dec. 19, 2014, 1:56 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1824
 https://issues.apache.org/jira/browse/KAFKA-1824
 
 
 Repository: kafka
 
 
 Description
 ---
 
 fixing accidental return of WARN Property topic is not valid
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
 1061cc74fac69693836f1e75add06b09d459a764 
 
 Diff: https://reviews.apache.org/r/29231/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 2:48 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-18_18:48:18.patch

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
 KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14252817#comment-14252817
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
 KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
  https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line614
 
  Should this be fatal? i.e., fatal is normally used before exiting 
  (abnormally). WARN would be more suitable.
  
  I don't think it makes sense to not advance the offset here 
  especially if you will still keep sending messages. I think you need to 
  remove it from the unacked offset list. E.g., you may configure your mirror 
  maker producer to only few retries (in which case you are okay with data 
  loss). In this scenario you should just let the error go and allow the 
  mirror maker to proceed normally.
  
  If someone wants zero data loss the MM should be configured with 
  required acks -1 and infinite retries.
  
  Maybe I'm misunderstanding what zero data loss really means - can you 
  clarify? (Especially if someone configures the producer with acks (say) one 
  and limited retries)

That makes sense. So I've changed the code to work in the following way:

1. If retries is set to infinite, the producer will keep retrying and the 
entire pipeline will finally be blocked. (This is strict data-loss free.)
2. If retries are not set to infinite, after the retries are exhausted, it will 
remove the offset from unacked list and record it as a skippedUnackedMessage, 
which is an exposed metric.


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 668
  https://reviews.apache.org/r/25995/diff/14/?file=795052#file795052line668
 
  Should we make this a generic DoublyLinkedList data structure in utils 
  or some other suitable place and unit test it as well?

I'm not sure if this is generic enough to put it into utils. This raw linked 
list seems only serve the purpose of removing/inserting node in the middle in 
O(1), which cannot be achieved in java linkedlist. Maybe we can keep it here 
now. And if later on there are some other use cases, we can refactor the code 
to create a raw LinkedList in utils and use that one. What do you think?


 On Dec. 18, 2014, 10:42 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/Utils.scala, line 441
  https://reviews.apache.org/r/25995/diff/14/?file=795053#file795053line441
 
  I don't think this is necessary right? i.e., args.map won't throw an 
  NPE if you don't provide any additional arguments.
  
  scala def f(args: Int*) {println(args.size)}
  f: (args: Int*)Unit
  
  scala f(1,2)
  2
  
  scala f()
  0

Yes, it seems to be working.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65477
---


On Dec. 19, 2014, 2:48 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 19, 2014, 2:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 

Re: Review Request 29231: Patch for KAFKA-1824

2014-12-18 Thread Gwen Shapira


 On Dec. 19, 2014, 2:36 a.m., Eric Olander wrote:
  core/src/main/scala/kafka/tools/ConsoleProducer.scala, line 269
  https://reviews.apache.org/r/29231/diff/1/?file=797000#file797000line269
 
  remove() returns the value assigned to the key being removed, so you 
  could simply do:
  
  topic = props.remove(topic)
  
  instead of the getProperty() and remove()

Will do. Thanks for the tip, Eric :)


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29231/#review65582
---


On Dec. 19, 2014, 1:56 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29231/
 ---
 
 (Updated Dec. 19, 2014, 1:56 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1824
 https://issues.apache.org/jira/browse/KAFKA-1824
 
 
 Repository: kafka
 
 
 Description
 ---
 
 fixing accidental return of WARN Property topic is not valid
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
 1061cc74fac69693836f1e75add06b09d459a764 
 
 Diff: https://reviews.apache.org/r/29231/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 6:17 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-18_22:17:08.patch

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
 KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
 KAFKA-1650_2014-12-18_22:17:08.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14253019#comment-14253019
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
 KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
 KAFKA-1650_2014-12-18_22:17:08.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 6:53 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Missed some change in the prvevious patch submission, submit patch again.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14253071#comment-14253071
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
 KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
 KAFKA-1650_2014-12-18_22:17:08.patch, KAFKA-1650_2014-12-18_22:53:26.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-18_22:53:26.patch

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
 KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
 KAFKA-1650_2014-12-18_22:17:08.patch, KAFKA-1650_2014-12-18_22:53:26.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 19, 2014, 7:41 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Addressed Joel's comments.


Addressed Joel's comments.


Incorporated Joel's comments


Incorporated Joel's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Joel's comments


Fix a bug in metric.


Missed some change in the prvevious patch submission, submit patch again.


change offset commit thread to use scheduler.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-18 Thread Jiangjie Qin


 On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
  https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489
 
  Why not use KafkaScheduler for the offset commit task?
 
 Jiangjie Qin wrote:
 Haven't thought that before... But it looks that we need to do some more 
 handling when something wrong happen in the offset commit threads. The 
 KafkaScheduler code seems not do so.
 
 Joel Koshy wrote:
 So you can make the task itself catch throwables. So it would look 
 something like this:
 
 scheduler.schedule(mirrorMakerOffsetsCommiter, commitTask, ...)
 
 And in commitTask:
 try {
   commitOffsets()
 }
 catch {
   case t: Throwable =
 // handle
 }
 
 That said, I don't think connector.commitOffsets will throw anything - 
 since we catch all throwables there.
 
 The only additional detail is that after you shutdown the scheduler you 
 will need to call commitOffsets() manually one last time.

I changed the code to use scheduler, it seems that the try catch block only 
handles the kafka based offset commit and it did not include 
ensureOffsetsManager connected. Also, theoretically OOM could be thrown when 
create a super big offsetmap, so I kept the catch block to make mirror maker 
exit in that case.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65306
---


On Dec. 19, 2014, 7:41 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Dec. 19, 2014, 7:41 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
 
 
 Added consumer rebalance listener to mirror maker, will test it later.
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 Conflicts:
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   
 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
 
 added custom config for consumer rebalance listener
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Add configurable consumer rebalance listener
 
 
 Incorporated Guozhang's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Guozhang's comments.
 
 
 Addressed Guozhang's comment.
 
 
 numMessageUnacked should be decremented no matter the send was successful or 
 not.
 
 
 Addressed Jun's comments.
 
 
 Incorporated Jun's comments
 
 
 Incorporated Jun's comments and rebased on trunk
 
 
 Rebased on current trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments.
 
 
 Incorporated Joel's comments
 
 
 Incorporated Joel's comments
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Incorporated Joel's comments
 
 
 Fix a bug in metric.
 
 
 Missed some change in the prvevious patch submission, submit patch again.
 
 
 change offset commit thread to use scheduler.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
 62c0686e816d2888772d5a911becf625eedee397 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e991d2187d03241f639eeaf6769fb59c8c99664c 
   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
 9baad34a9793e5067d11289ece2154ba87b388af 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-18_23:41:16.patch

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
 KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
 KAFKA-1650_2014-12-18_22:17:08.patch, KAFKA-1650_2014-12-18_22:53:26.patch, 
 KAFKA-1650_2014-12-18_23:41:16.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14253103#comment-14253103
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch, 
 KAFKA-1650_2014-12-17_12:29:23.patch, KAFKA-1650_2014-12-18_18:48:18.patch, 
 KAFKA-1650_2014-12-18_22:17:08.patch, KAFKA-1650_2014-12-18_22:53:26.patch, 
 KAFKA-1650_2014-12-18_23:41:16.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)