Re: Review Request 26373: Patch for KAFKA-1647

2014-10-21 Thread Jiangjie Qin


 On Oct. 21, 2014, 12:18 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/ReplicaManager.scala, lines 481-506
  https://reviews.apache.org/r/26373/diff/3/?file=725140#file725140line481
 
  This doesn't quite fix the original problem though. The original 
  problem is that if the leader is not alive, we won't call 
  partition.makeFollower(), in which the local replica is created. If a local 
  replica is not created, the partition will be ignored when checkingpoint HW 
  and we lose the last checkpointed HW.
  
  So, we have to call partition.makerFollower() for every follower, 
  whether its leader is live or not. After this, we can proceed with the rest 
  of the steps for only those partitions with a live leader. We can log a 
  warning for those partitions w/o a live leader.

Hi Jun, thanks for the review. 
I think the high watermark is actually read in partition.getOrCreateReplica(). 
As you said, that means in order to preserve the high watermark, the local 
replica is supposed to be created. The original code did not create local 
replica when remote leader is not up so it lost the high watermark.
partition.getOrCreateReplica() is actually called in the following 2 places:
1. partition.makeFollower()
2. ReplicaManager line 515, when we truncate the log for the partitions in 
partitionsToMakeFollower.
Both of them could preserve high watermark. The difference between them is that 
in (1) all the replica for the partition, including the not-existing-yet remote 
replica, will be created. However in (2) only local replica will be created. 
Because I'm not 100% sure if it could cause some other issue if we create a 
remote replica when it is actually not up yet, so I chose to preserve the high 
watermark in (2) instead of in (1), which has less change compared with 
original code.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review57490
---


On Oct. 18, 2014, 7:26 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 18, 2014, 7:26 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Joel's comments.
 
 
 the version 2 code seems to be submitted by mistake... This should be the 
 code for review that addressed Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-21 Thread Vladimir Tretyakov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Tretyakov updated KAFKA-1481:
--
Attachment: KAFKA-1481_2014-10-21_09-14-35.patch

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-21 Thread Vladimir Tretyakov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178039#comment-14178039
 ] 

Vladimir Tretyakov commented on KAFKA-1481:
---

Hi, added last patch (KAFKA-1481_2014-10-21_09-14-35.patch). Worked for me 
locally (previous worked too:( ).
There are commands I've used:
{code}
PATCH CREATION
wawanawna@wawanawna:/home/storage/sematext/src/kfk2/kafka$ git status
On branch 0.8.2
Your branch is up-to-date with 'origin/0.8.2'.

Changes to be committed:
  (use git reset HEAD file... to unstage)

new file:   core/src/main/scala/kafka/common/BrokerInfo.scala
new file:   core/src/main/scala/kafka/common/ClientIdTopic.scala
new file:   core/src/main/scala/kafka/common/Taggable.scala
renamed:core/src/main/scala/kafka/common/ClientIdAndTopic.scala - 
core/src/main/scala/kafka/common/TopicInfo.scala
new file:   
core/src/main/scala/kafka/consumer/ConsumerFetcherThreadId.scala
new file:   core/src/main/scala/kafka/consumer/ConsumerId.scala

Changes not staged for commit:
  (use git add file... to update what will be committed)
  (use git checkout -- file... to discard changes in working directory)

modified:   core/src/main/scala/kafka/admin/AdminUtils.scala
modified:   
core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
modified:   
core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
modified:   
core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
modified:   
core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
modified:   core/src/main/scala/kafka/api/RequestKeys.scala
modified:   core/src/main/scala/kafka/cluster/Partition.scala
...
...
...
wawanawna@wawanawna:/home/storage/sematext/src/kfk2/kafka$ git commit -a -m 
'kafka-1481; JMX renaming'
[0.8.2 a232af6] kafka-1481; JMX renaming
 73 files changed, 666 insertions(+), 435 deletions(-)
 create mode 100644 core/src/main/scala/kafka/common/BrokerInfo.scala
 create mode 100644 core/src/main/scala/kafka/common/ClientIdTopic.scala
 create mode 100644 core/src/main/scala/kafka/common/Taggable.scala
 rename core/src/main/scala/kafka/common/{ClientIdAndTopic.scala = 
TopicInfo.scala} (68%)
 create mode 100644 
core/src/main/scala/kafka/consumer/ConsumerFetcherThreadId.scala
 create mode 100644 core/src/main/scala/kafka/consumer/ConsumerId.scala
wawanawna@wawanawna:/home/storage/sematext/src/kfk2/kafka$ git log | head -n 20
commit a232af63b8a1f43054994a74520b31ef7b9b347c
Author: wawanawna sema...@mail.com
Date:   Tue Oct 21 09:29:31 2014 +0300

kafka-1481; JMX renaming

commit eb7ac9eb7eebc4e0655b65e07cae594e61a6c05e
Author: Jun Rao jun...@gmail.com
Date:   Mon Oct 20 11:09:31 2014 -0700

kafka-1717; remove netty dependency through ZK 3.4.x; patched by Jun Rao; 
reviewed by Sriharsha Chintalapani and Neha Narkhede


wawanawna@wawanawna:/home/storage/sematext/src/kfk2/kafka$ git diff 
eb7ac9eb7eebc4e0655b65e07cae594e61a6c05e 
a232af63b8a1f43054994a74520b31ef7b9b347c  
/tmp/KAFKA-1481_2014-10-21_09-14-35.patch

PATCH APPLYING

wawanawna@wawanawna$ git clone https://github.com/apache/kafka.git


wawanawna@wawanawna:/home/storage/sematext/src/kfk3/kafka$ git checkout -b 
0.8.2 origin/0.8.2
Branch 0.8.2 set up to track remote branch 0.8.2 from origin.
Switched to a new branch '0.8.2'


wawanawna@wawanawna:/home/storage/sematext/src/kfk3/kafka$ git apply 
/tmp/KAFKA-1481_2014-10-21_09-14-35.patch
/tmp/KAFKA-1481_2014-10-21_09-14-35.patch:1410: trailing whitespace.
   })  
warning: 1 line adds whitespace errors.
{code}

There is warning, but everything works (compilation/tests passed without errors)

PS: please tell me what I am doing wrong.
wawanawna@wawanawna:/home/storage/sematext/src/kfk3/kafka$ git --version
git version 1.9.1


 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 

[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-21 Thread Wojciech Kuranowski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178396#comment-14178396
 ] 

Wojciech Kuranowski commented on KAFKA-1718:


I have compiled kafka 0.8.2 with different messages for validation and 
revalidation, and it seems that this issue is triggered in revalidation after 
recompression. In my case:
kafka.common.MessageSizeTooLargeException: revalidate - Message size is 
3382345 bytes which exceeds the maximum configured message size of 100.

It's strange that message after recompression is 3 times bigger than the limit. 
Is broker miscalculating something?


 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-21 Thread Joel Koshy
KAFKA-1634 is orthogonal to 1583 right? (BTW sorry about the delay in
reviewing 1583 - I'm on it.)

KAFKA-1634 shouldn't be too difficult to address so I'm hoping we can
get that in 0.8.2 if not the beta release.

Joel

On Mon, Oct 20, 2014 at 09:14:17PM -0700, Jun Rao wrote:
 I move KAFKA-1634 to 0.8.3 since it's easier to fix after KAFKA-1583 is
 committed. With this, we have resolved all known blocker issues for 0.8.2.
 Thanks everyone.
 
 Joe,
 
 Do you want to help start rolling an 0.8.2 beta release? We can decide if
 we need to include any other fixes (e.g., KAFKA-1481) in the 0.8.2 final
 release.
 
 Thanks,
 
 Jun
 
 On Thu, Oct 16, 2014 at 3:53 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
  +1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2
  so that will only go out with 0.8.3.
 
  Joel
 
  On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote:
   Agree.
   On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote:
  
+1 on doing an 0.8.2 beta.
   
Guozhang,
   
kafka-1583 is relatively large. Given that we are getting close to
releasing 0.8.2 beta, my feeling is that we probably shouldn't include
  it
in 0.8.2 beta even if we can commit it in a few days.
   
Thanks,
   
Jun
   
On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com
  wrote:
   
 Regarding 1634, I was intended to work on that after 1583 since it
  will
 changes the commit offset request handling logic a lot. If people
  think
 1583 is only a few days away before check-in, we can leave in in
 0.8.2-beta; otherwise we can push to 0.8.3.

 Guozhang

 On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly
  wrote:

  +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
 
  I agree to the tickets you brought up to have in 0.8.2-beta and
  also
  https://issues.apache.org/jira/browse/KAFKA-1493 for lz4
  compression.
 
  /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop
  /
  On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com
  
 wrote:
 
   Another JIRA that will be nice to include as part of 0.8.2-beta
  is
   https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the
mbean
   naming. Looking for people's thoughts on 2 things here -
  
   1. How do folks feel about doing a 0.8.2-beta release right now
  and
 0.8.2
   final 4-5 weeks later?
   2. Do people want to include any JIRAs (other than the ones
  mentioned
   above) in 0.8.2-beta? If so, it will be great to know now so it
  will
  allow
   us to move forward with the beta release quickly.
  
   Thanks,
   Neha
  
   On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   wrote:
  
Hi,
   
We have accumulated an impressive list of pretty major
  features in
  0.8.2
   -
Delete topic
Automated leader rebalancing
Controlled shutdown
Offset management
Parallel recovery
min.isr and
clean leader election
   
In the past, what has worked for major feature releases is a
  beta
  release
prior to a final release. I'm proposing we do the same for
  0.8.2.
The
   only
blockers for 0.8.2-beta, that I know of are -
   
https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major
change
  and
requires some thinking about the new dependency. Since it is
  not
 fully
ready and there are things to think about, I suggest we take it
out,
   think
it end to end and then include it in 0.8.3.)
https://issues.apache.org/jira/browse/KAFKA-1634 (This has an
owner:
Guozhang Wang)
https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch
  and
is
waiting on a review by Joe Stein)
   
It seems that 1634 and 1671 can get wrapped up in a week. Do
  people
  think
we should cut 0.8.2-beta by next week?
   
Thanks,
Neha
   
  
 



 --
 -- Guozhang

   
 
 



[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-21 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178519#comment-14178519
 ] 

Sriharsha Chintalapani commented on KAFKA-1718:
---

I ran sizeInBytes for test.getBytes
test.getBytes size 4 
test message(Message.scala) size 18 
test ByteBufferedMessageSet size 30 

Per each message there is additional data being added.
 * 1. 4 byte CRC32 of the message
 * 2. 1 byte magic identifier to allow format changes, value is 2 currently
 * 3. 1 byte attributes identifier to allow annotations on the message 
independent of the version (e.g. compression enabled, type of codec used)
 * 4. 4 byte key length, containing length K
 * 5. K byte key
 * 6. 4 byte payload length, containing length V
 * 7. V byte payload

for the message test with key being null
the size comes to 18 and if you add this message to ByteBufferMessageSet it 
will be 30 (12 being the LogOverHead)



 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-21 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1721:
-
Component/s: compression

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava

 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1678) add new options for reassign partition to better manager dead brokers

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1678:
-

Hey [~gwenshap], I have a freed up engineer this week and next to work on this 
ticket is it ok if I assign it over to him?

 add new options for reassign partition to better manager dead brokers
 -

 Key: KAFKA-1678
 URL: https://issues.apache.org/jira/browse/KAFKA-1678
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
Assignee: Gwen Shapira
  Labels: operations
 Fix For: 0.8.3


 this is in two forms
 --replace-replica 
 which is from broker.id to broker.id
 and 
 --remove-replica
 which is just a single broker.id



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-21 Thread Joe Stein
Can Kafka-1583 go into 0.8.3 on trunk once ready? It is marked now for
0.9.0.

I am about to move the JIRAs open and patch resolved to 0.8.3 from 0.8.2.

On Tue, Oct 21, 2014 at 11:00 AM, Joel Koshy jjkosh...@gmail.com wrote:

 KAFKA-1634 is orthogonal to 1583 right? (BTW sorry about the delay in
 reviewing 1583 - I'm on it.)

 KAFKA-1634 shouldn't be too difficult to address so I'm hoping we can
 get that in 0.8.2 if not the beta release.

 Joel

 On Mon, Oct 20, 2014 at 09:14:17PM -0700, Jun Rao wrote:
  I move KAFKA-1634 to 0.8.3 since it's easier to fix after KAFKA-1583 is
  committed. With this, we have resolved all known blocker issues for
 0.8.2.
  Thanks everyone.
 
  Joe,
 
  Do you want to help start rolling an 0.8.2 beta release? We can decide if
  we need to include any other fixes (e.g., KAFKA-1481) in the 0.8.2 final
  release.
 
  Thanks,
 
  Jun
 
  On Thu, Oct 16, 2014 at 3:53 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   +1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2
   so that will only go out with 0.8.3.
  
   Joel
  
   On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote:
Agree.
On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote:
   
 +1 on doing an 0.8.2 beta.

 Guozhang,

 kafka-1583 is relatively large. Given that we are getting close to
 releasing 0.8.2 beta, my feeling is that we probably shouldn't
 include
   it
 in 0.8.2 beta even if we can commit it in a few days.

 Thanks,

 Jun

 On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com
 
   wrote:

  Regarding 1634, I was intended to work on that after 1583 since
 it
   will
  changes the commit offset request handling logic a lot. If people
   think
  1583 is only a few days away before check-in, we can leave in in
  0.8.2-beta; otherwise we can push to 0.8.3.
 
  Guozhang
 
  On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly
 
   wrote:
 
   +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks
 later.
  
   I agree to the tickets you brought up to have in 0.8.2-beta and
   also
   https://issues.apache.org/jira/browse/KAFKA-1493 for lz4
   compression.
  
   /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop
   /
   On Oct 16, 2014 12:55 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   
  wrote:
  
Another JIRA that will be nice to include as part of
 0.8.2-beta
   is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes
 the
 mbean
naming. Looking for people's thoughts on 2 things here -
   
1. How do folks feel about doing a 0.8.2-beta release right
 now
   and
  0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones
   mentioned
above) in 0.8.2-beta? If so, it will be great to know now so
 it
   will
   allow
us to move forward with the beta release quickly.
   
Thanks,
Neha
   
On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
  neha.narkh...@gmail.com
wrote:
   
 Hi,

 We have accumulated an impressive list of pretty major
   features in
   0.8.2
-
 Delete topic
 Automated leader rebalancing
 Controlled shutdown
 Offset management
 Parallel recovery
 min.isr and
 clean leader election

 In the past, what has worked for major feature releases is
 a
   beta
   release
 prior to a final release. I'm proposing we do the same for
   0.8.2.
 The
only
 blockers for 0.8.2-beta, that I know of are -

 https://issues.apache.org/jira/browse/KAFKA-1493 (Is a
 major
 change
   and
 requires some thinking about the new dependency. Since it
 is
   not
  fully
 ready and there are things to think about, I suggest we
 take it
 out,
think
 it end to end and then include it in 0.8.3.)
 https://issues.apache.org/jira/browse/KAFKA-1634 (This
 has an
 owner:
 Guozhang Wang)
 https://issues.apache.org/jira/browse/KAFKA-1671 (Has a
 patch
   and
 is
 waiting on a review by Joe Stein)

 It seems that 1634 and 1671 can get wrapped up in a week.
 Do
   people
   think
 we should cut 0.8.2-beta by next week?

 Thanks,
 Neha

   
  
 
 
 
  --
  -- Guozhang
 

  
  




[jira] [Updated] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1566:
-
Fix Version/s: (was: 0.8.2)
   (was: 0.9.0)
   0.8.3

 Kafka environment configuration (kafka-env.sh)
 --

 Key: KAFKA-1566
 URL: https://issues.apache.org/jira/browse/KAFKA-1566
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Cosmin Lehene
Assignee: Cosmin Lehene
  Labels: newbie
 Fix For: 0.8.3


 It would be useful (especially for automated deployments) to have an 
 environment configuration file that could be sourced from the launcher files 
 (e.g. kafka-run-server.sh). 
 This is how this could look like kafka-env.sh 
 {code}
 export KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseCompressedOops 
 -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
 -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
 -XX:InitiatingHeapOccupancyPercent=35' % 
 export KAFKA_HEAP_OPTS='-Xmx1G -Xms1G' % 
 export KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=/var/log/kafka 
 {code} 
 kafka-server-start.sh 
 {code} 
 ... 
 source $base_dir/config/kafka-env.sh 
 ... 
 {code} 
 This approach is consistent with Hadoop and HBase. However the idea here is 
 to be able to set these values in a single place without having to edit 
 startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1700) examples directory - README and shell scripts are out of date

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1700:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 examples directory - README and shell scripts are out of date
 -

 Key: KAFKA-1700
 URL: https://issues.apache.org/jira/browse/KAFKA-1700
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Geoffrey Anderson
Priority: Minor
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1700.patch


 sbt build files were removed during resolution of KAFKA-1254, so the README 
 under the examples directory should no longer make reference to sbt.
 Also, the paths added to CLASSPATH variable in the example shell script are 
 no longer correct.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-686) 0.8 Kafka broker should give a better error message when running against 0.7 zookeeper

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-686:

Fix Version/s: (was: 0.8.2)
   0.8.3

 0.8 Kafka broker should give a better error message when running against 0.7 
 zookeeper
 --

 Key: KAFKA-686
 URL: https://issues.apache.org/jira/browse/KAFKA-686
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Jay Kreps
  Labels: newbie, patch
 Fix For: 0.8.3

 Attachments: KAFAK-686-null-pointer-fix.patch, 
 KAFKA-686-null-pointer-fix-2.patch


 People will not know that the zookeeper paths are not compatible. When you 
 try to start the 0.8 broker pointed at a 0.7 zookeeper you get a 
 NullPointerException. We should detect this and give a more sane error.
 Error:
 kafka.common.KafkaException: Can't parse json string: null
 at kafka.utils.Json$.liftedTree1$1(Json.scala:20)
 at kafka.utils.Json$.parseFull(Json.scala:16)
 at 
 kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$2.apply(ZkUtils.scala:498)
 at 
 kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$2.apply(ZkUtils.scala:494)
 at 
 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
 at 
 kafka.utils.ZkUtils$.getReplicaAssignmentForTopics(ZkUtils.scala:494)
 at 
 kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:446)
 at 
 kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:220)
 at 
 kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:85)
 at 
 kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
 at 
 kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:43)
 at kafka.controller.KafkaController.startup(KafkaController.scala:381)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:90)
 at 
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
 at kafka.Kafka$.main(Kafka.scala:46)
 at kafka.Kafka.main(Kafka.scala)
 Caused by: java.lang.NullPointerException
 at 
 scala.util.parsing.combinator.lexical.Scanners$Scanner.init(Scanners.scala:52)
 at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:71)
 at scala.util.parsing.json.JSON$.parseFull(JSON.scala:85)
 at kafka.utils.Json$.liftedTree1$1(Json.scala:17)
 ... 16 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-438) Code cleanup in MessageTest

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-438:

Fix Version/s: (was: 0.8.2)
   0.8.3

 Code cleanup in MessageTest
 ---

 Key: KAFKA-438
 URL: https://issues.apache.org/jira/browse/KAFKA-438
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7.1
Reporter: Jim Plush
Priority: Trivial
 Fix For: 0.8.3

 Attachments: KAFKA-438


 While exploring the Unit Tests this class had an unused import statement, 
 some ambiguity on which HashMap implementation was being used and assignments 
 of function returns when not required. 
 Trivial stuff



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1351) String.format is very expensive in Scala

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1351:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 String.format is very expensive in Scala
 

 Key: KAFKA-1351
 URL: https://issues.apache.org/jira/browse/KAFKA-1351
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7.2, 0.8.0, 0.8.1
Reporter: Neha Narkhede
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1351.patch, KAFKA-1351_2014-04-07_18:02:18.patch, 
 KAFKA-1351_2014-04-09_15:40:11.patch


 As found in KAFKA-1350, logging is causing significant overhead in the 
 performance of a Kafka server. There are several info statements that use 
 String.format which is particularly expensive. We should investigate adding 
 our own version of String.format that merely uses string concatenation under 
 the covers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1420:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with 
 TestUtils.createTopic in unit tests
 --

 Key: KAFKA-1420
 URL: https://issues.apache.org/jira/browse/KAFKA-1420
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Jonathan Natkins
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1420.patch, KAFKA-1420_2014-07-30_11:18:26.patch, 
 KAFKA-1420_2014-07-30_11:24:55.patch, KAFKA-1420_2014-08-02_11:04:15.patch, 
 KAFKA-1420_2014-08-10_14:12:05.patch, KAFKA-1420_2014-08-10_23:03:46.patch


 This is a follow-up JIRA from KAFKA-1389.
 There are a bunch of places in the unit tests where we misuse 
 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK to create topics, 
 where TestUtils.createTopic needs to be used instead.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1561) Data Loss for Incremented Replica Factor and Leader Election

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1561:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 Data Loss for Incremented Replica Factor and Leader Election
 

 Key: KAFKA-1561
 URL: https://issues.apache.org/jira/browse/KAFKA-1561
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.8.3

 Attachments: broker0.log, broker2.log, consumer.log, producer.log


 This is reported on the mailing list (thanks to Jad).
 {quote}
 Hi,
 I have a test that continuously sends messages to one broker, brings up
 another broker, and adds it as a replica for all partitions, with it being
 the preferred replica for some. I have auto.leader.rebalance.enable=true,
 so replica election gets triggered. Data is being pumped to the old broker
 all the while. It seems that some data gets lost while switching over to
 the new leader. Is this a bug, or do I have something misconfigured? I also
 have request.required.acks=-1 on the producer.
 Here's what I think is happening:
 1. Producer writes message to broker 0, [EventServiceUpsertTopic,13], w/
 broker 0 currently leader, with ISR=(0), so write returns successfully,
 even when acks = -1. Correlation id 35836
 Producer log:
 [2014-07-24 14:44:26,991]  [DEBUG]  [dw-97 - PATCH
 /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField_mergeFields=field1]
 [kafka.producer.BrokerPartitionInfo]  Partition
 [EventServiceUpsertTopic,13] has leader 0
 [2014-07-24 14:44:26,993]  [DEBUG]  [dw-97 - PATCH
 /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField_mergeFields=field1]
 [k.producer.async.DefaultEventHandler]  Producer sent messages with
 correlation id 35836 for topics [EventServiceUpsertTopic,13] to broker 0 on
 localhost:56821
 2. Broker 1 is still catching up
 Broker 0 Log:
 [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
 [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13] on broker
 0: Old hw for partition [EventServiceUpsertTopic,13] is 971. New hw is 971.
 All leo's are 975,971
 [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
 [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in 0 ms
 [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-processor-56821-0]
 [kafka.request.logger]  Completed request:Name: ProducerRequest; Version:
 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1; AckTimeoutMs: 1
 ms from client /127.0.0.1:57086
 ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0
 3. Leader election is triggered by the scheduler:
 Broker 0 Log:
 [2014-07-24 14:44:26,991]  [INFO ]  [kafka-scheduler-0]
 [k.c.PreferredReplicaPartitionLeaderSelector]
 [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for partition [
 EventServiceUpsertTopic,13] is not the preferred replica. Trigerring
 preferred replica leader election
 [2014-07-24 14:44:26,993]  [DEBUG]  [kafka-scheduler-0]
 [kafka.utils.ZkUtils$]  Conditional update of path
 /brokers/topics/EventServiceUpsertTopic/partitions/13/state with value
 {controller_epoch:1,leader:1,version:1,leader_epoch:3,isr:[0,1]}
 and expected version 3 succeeded, returning the new version: 4
 [2014-07-24 14:44:26,994]  [DEBUG]  [kafka-scheduler-0]
 [k.controller.PartitionStateMachine]  [Partition state machine on
 Controller 0]: After leader election, leader cache is updated to
 Map(Snipped(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),EndSnip)
 [2014-07-24 14:44:26,994]  [INFO ]  [kafka-scheduler-0]
 [kafka.controller.KafkaController]  [Controller 0]: Partition [
 EventServiceUpsertTopic,13] completed preferred replica leader election.
 New leader is 1
 4. Broker 1 is still behind, but it sets the high water mark to 971!!!
 Broker 1 Log:
 [2014-07-24 14:44:26,999]  [INFO ]  [kafka-request-handler-6]
 [kafka.server.ReplicaFetcherManager]  [ReplicaFetcherManager on broker 1]
 Removed fetcher for partitions [EventServiceUpsertTopic,13]
 [2014-07-24 14:44:27,000]  [DEBUG]  [kafka-request-handler-6]
 [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13] on broker
 1: Old hw for partition [EventServiceUpsertTopic,13] is 970. New hw is -1.
 All leo's are -1,971
 [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
 [kafka.server.KafkaApis]  [KafkaApi-1] Maybe update partition HW due to
 fetch request: Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId:
 ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes;
 RequestInfo: [EventServiceUpsertTopic,13] -
 PartitionFetchInfo(971,1048576), Snipped
 [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
 

[jira] [Updated] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1374:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 LogCleaner (compaction) does not support compressed topics
 --

 Key: KAFKA-1374
 URL: https://issues.apache.org/jira/browse/KAFKA-1374
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, 
 KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, 
 KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch


 This is a known issue, but opening a ticket to track.
 If you try to compact a topic that has compressed messages you will run into
 various exceptions - typically because during iteration we advance the
 position based on the decompressed size of the message. I have a bunch of
 stack traces, but it should be straightforward to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-795) Improvements to PreferredReplicaLeaderElection tool

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-795:

Fix Version/s: (was: 0.8.2)
   0.8.3

 Improvements to PreferredReplicaLeaderElection tool
 ---

 Key: KAFKA-795
 URL: https://issues.apache.org/jira/browse/KAFKA-795
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Swapnil Ghike
Assignee: Swapnil Ghike
 Fix For: 0.8.3


 We can make some improvements to the PreferredReplicaLeaderElection tool:
 1. Terminate the tool if a controller is not up and running. Currently we can 
 run the tool without having any broker running, which is kind of confusing. 
 2. Should we delete /admin zookeeper path in PreferredReplicaLeaderElection 
 (and ReassignPartition) tool at the end? Otherwise the next run of the tool 
 complains that a replica election is already in progress. 
 3. If there is an error, we can see it in cotroller.log. Should the tool also 
 throw an error?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1660:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 Ability to call close() with a timeout on the Java Kafka Producer. 
 ---

 Key: KAFKA-1660
 URL: https://issues.apache.org/jira/browse/KAFKA-1660
 Project: Kafka
  Issue Type: Improvement
  Components: clients, producer 
Affects Versions: 0.8.2
Reporter: Andrew Stein
Assignee: Jun Rao
 Fix For: 0.8.3


 I would like the ability to call {{close}} with a timeout on the Java 
 Client's KafkaProducer.
 h6. Workaround
 Currently, it is possible to ensure that {{close}} will return quickly by 
 first doing a {{future.get(timeout)}} on the last future produced on each 
 partition, but this means that the user has to define the partitions up front 
 at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1659) Ability to cleanly abort the KafkaProducer

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1659:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 Ability to cleanly abort the KafkaProducer
 --

 Key: KAFKA-1659
 URL: https://issues.apache.org/jira/browse/KAFKA-1659
 Project: Kafka
  Issue Type: Improvement
  Components: clients, producer 
Affects Versions: 0.8.2
Reporter: Andrew Stein
Assignee: Jun Rao
 Fix For: 0.8.3


 I would like the ability to abort the Java Client's KafkaProducer. This 
 includes the stopping the writing of buffered records.
 The motivation for this is described 
 [here|http://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3CCAOk4UxB7BJm6HSgLXrR01sksB2dOC3zdt0NHaKHz1EALR6%3DCTQ%40mail.gmail.com%3E].
 A sketch of this method is:
 {code}
 public void abort() {
 try {
 ioThread.interrupt();
 ioThread.stop(new ThreadDeath());
 } catch (IllegalAccessException e) {
 }
 }
 {code}
 but of course it is preferable to stop the {{ioThread}} by cooperation, 
 rather than use the deprecated {{Thread.stop(new ThreadDeath())}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1499) Broker-side compression configuration

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1499:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 Broker-side compression configuration
 -

 Key: KAFKA-1499
 URL: https://issues.apache.org/jira/browse/KAFKA-1499
 Project: Kafka
  Issue Type: New Feature
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, 
 KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, 
 KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, 
 KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, 
 KAFKA-1499_2014-09-25_11:05:57.patch

   Original Estimate: 72h
  Remaining Estimate: 72h

 A given topic can have messages in mixed compression codecs. i.e., it can
 also have a mix of uncompressed/compressed messages.
 It will be useful to support a broker-side configuration to recompress
 messages to a specific compression codec. i.e., all messages (for all
 topics) on the broker will be compressed to this codec. We could have
 per-topic overrides as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-934) kafka hadoop consumer and producer use older 0.19.2 hadoop api's

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-934:

Fix Version/s: (was: 0.8.2)
   0.8.3

 kafka hadoop consumer and producer use older 0.19.2 hadoop api's
 

 Key: KAFKA-934
 URL: https://issues.apache.org/jira/browse/KAFKA-934
 Project: Kafka
  Issue Type: Bug
  Components: contrib
Affects Versions: 0.8.0
 Environment: [amilkowski@localhost impl]$ uname -a
 Linux localhost.localdomain 3.9.4-200.fc18.x86_64 #1 SMP Fri May 24 20:10:49 
 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Andrew Milkowski
Assignee: Sriharsha Chintalapani
  Labels: hadoop, hadoop-2.0, newbie
 Fix For: 0.8.3


 New hadoop api present in 0.20.1 especially package  
 org.apache.hadoop.mapredude.lib is not used 
 code affected is both consumer and producer in kafka in the contrib package
 [amilkowski@localhost contrib]$ pwd
 /opt/local/git/kafka/contrib
 [amilkowski@localhost contrib]$ ls -lt
 total 12
 drwxrwxr-x 8 amilkowski amilkowski 4096 May 30 11:14 hadoop-consumer
 drwxrwxr-x 6 amilkowski amilkowski 4096 May 29 19:31 hadoop-producer
 drwxrwxr-x 6 amilkowski amilkowski 4096 May 29 16:43 target
 [amilkowski@localhost contrib]$ 
 in example
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TextOutputFormat;
 use 0.19.2 hadoop api format, this prevents merging of hadoop feature into 
 more modern hadoop implementation
 instead of drawing from 0.20.1 api set in import org.apache.hadoop.mapreduce



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-313) Add JSON output and looping options to ConsumerOffsetChecker

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-313:

Fix Version/s: (was: 0.8.2)
   0.8.3

 Add JSON output and looping options to ConsumerOffsetChecker
 

 Key: KAFKA-313
 URL: https://issues.apache.org/jira/browse/KAFKA-313
 Project: Kafka
  Issue Type: Improvement
Reporter: Dave DeMaagd
Priority: Minor
  Labels: newbie, patch
 Fix For: 0.8.3

 Attachments: KAFKA-313-2012032200.diff


 Adds:
 * '--loop N' - causes the program to loop forever, sleeping for up to N 
 seconds between loops (loop time minus collection time, unless that's less 
 than 0, at which point it will just run again immediately)
 * '--asjson' - display as a JSON string instead of the more human readable 
 output format.
 Neither of the above  depend on each other (you can loop in the human 
 readable output, or do a single shot execution with JSON output).  Existing 
 behavior/output maintained if neither of the above are used.  Diff Attached.
 Impacted files:
 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1230) shell script files under bin don't work with cygwin (bash on windows)

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1230:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 shell script files under bin don't work with cygwin (bash on windows)
 -

 Key: KAFKA-1230
 URL: https://issues.apache.org/jira/browse/KAFKA-1230
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.0
 Environment: The change have been tested under GNU bash, version 
 4.1.11(2)-release (x86_64-unknown-cygwin) running on Windows 7 Enterprise.
Reporter: Alok Lal
 Fix For: 0.8.3

 Attachments: 
 0001-Added-changes-so-that-bin-.sh-files-can-work-with-CY.patch

   Original Estimate: 24h
  Remaining Estimate: 24h

 h3. Introduction
 This bug is being created for a pull request that I had submitted earlier for 
 these.  Per Jun this is so changes confirm to Apache license.
 h3. Background
 The script files to run Kafka under Windows don't work as is. One needs to 
 hand tweak them since their location is not bin but bin/windows. Further, the 
 script files under bin/windows are not a complete replica of those under bin. 
 To be sure, this isn't a complaint. To the contrary most projects now-a-days 
 don't bother to support running on Windows or do so very late. Just that 
 because of these limitation it might be more prudent to make the script files 
 under bin itself run under windows rather than trying to make the files under 
 bin/windows work or to make them complete.
 h3. Change Summary
 Most common unix-like shell on windows is the bash shell which is a part of 
 the cygwin project. Out of the box the scripts don't work mostly due to 
 peculiarities of the directory paths and class path separators. This change 
 set makes a focused change to a single file under bin so that all of the 
 script files under bin would work as is on windows platform when using bash 
 shell of Cygwin distribution.
 h3. Motivation
 Acceptance of this change would enable a vast body of developers that use (or 
 have to use) Windows as their development/testing/production platform to use 
 Kafka's with ease. More importantly by making the running of examples 
 smoothly on Windoes+Cygwin-bash it would make the process of evaluation of 
 Kafka simpler and smoother and potentially make for a favorable evaluation. 
 For, it would show commitment of the Kafka team to espouse deployments on 
 Windows (albeit only under cygwin). Further, as the number of people whom use 
 Kafka on Windows increases, one would attract people who can eventually fix 
 the script files under bin/Windows itself so that need to run under Cygwin 
 would also go away, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1675) bootstrapping tidy-up

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1675:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 bootstrapping tidy-up
 -

 Key: KAFKA-1675
 URL: https://issues.apache.org/jira/browse/KAFKA-1675
 Project: Kafka
  Issue Type: Bug
Reporter: Szczepan Faber
Assignee: Ivan Lyutov
 Fix For: 0.8.3

 Attachments: KAFKA-1675.patch


 I'd like to suggest following changes:
 1. remove the 'gradlew' and 'gradlew.bat' scripts from the source tree. Those 
 scripts don't work, e.g. they fail with exception when invoked. I just got a 
 user report where those scripts were invoked by the user and it led to an 
 exception that was not easy to grasp. Bootstrapping step will generate those 
 files anyway.
 2. move the 'gradleVersion' extra property from the 'build.gradle' into 
 'gradle.properties'. Otherwise it is hard to automate the bootstrapping 
 process - in order to find out the gradle version, I need to evaluate the 
 build script, and for that I need gradle with correct version (kind of a 
 vicious circle). Project properties declared in the gradle.properties file 
 can be accessed exactly the same as the 'ext' properties, for example: 
 'project.gradleVersion'.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1651) Removed some extra whitespace in KafkaServer.scala

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1651:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 Removed some extra whitespace in KafkaServer.scala
 --

 Key: KAFKA-1651
 URL: https://issues.apache.org/jira/browse/KAFKA-1651
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jonathan Creasy
Priority: Trivial
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1651.patch, KAFKA-1651_2014-09-25_00:49:36.patch, 
 KAFKA-1651_2014-09-25_00:50:11.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1481:
-
Fix Version/s: (was: 0.8.2)
   0.8.3

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.3

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Java New Producer] Snappy NPE Issue

2014-10-21 Thread Bhavesh Mistry
Hi Ewen,

It seems Leo has fixed the snappy lib for this issue.  Here are changes:
https://github.com/xerial/snappy-java/commit/7b86642f75c280debf3c1983053ea7f8635b48a5


Here is Jar with fix:
https://oss.sonatype.org/content/repositories/snapshots/org/xerial/snappy/snappy-java/1.1.1.4-SNAPSHOT/


I will try this today afternoon.  If it works, would you be able to upgrade
Kafka trunk with this version.

Thanks,

Bhavesh

On Mon, Oct 20, 2014 at 9:53 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi Ewen,

 Thanks for doing the deep analysis on this issue.  I have file this issue
 with Snappy project and linked this Kafka Issues.  Here is details about
 the git hub issue:  https://github.com/xerial/snappy-java/issues/88

 I will follow-up with snappy guys to figure out how to solve this
 problem.  For us, this is typical use case of running web app J2EE
 container  with thread pool and recycled threads.

 Thanks,

 Bhavesh

 On Mon, Oct 20, 2014 at 6:56 PM, Ewen Cheslack-Postava m...@ewencp.org
 wrote:

 Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
 since it either requires an updated version of the upstream library, a
 workaround by us, or at a bare minimum clear documentation of the issue.

 On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
  I took a quick look at this since I noticed the same issue when testing
  your code for the issues you filed. I think the issue is that you're
  using all your producers across a thread pool and the snappy library
  uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
  they may be allocated from the same thread (e.g. one of your MyProducer
  classes calls Producer.send() on multiple producers from the same
  thread) and therefore use the same BufferRecycler. Eventually you hit
  the code in the stacktrace, and if two producer send threads hit it
  concurrently they improperly share the unsynchronized BufferRecycler.
 
  This seems like a pain to fix -- it's really a deficiency of the snappy
  library and as far as I can see there's no external control over
  BufferRecycler in their API. One possibility is to record the thread ID
  when we generate a new stream in Compressor and use that to synchronize
  access to ensure no concurrent BufferRecycler access. That could be made
  specific to snappy so it wouldn't impact other codecs. Not exactly
  ideal, but it would work. Unfortunately I can't think of any way for you
  to protect against this in your own code since the problem arises in the
  producer send thread, which your code should never know about.
 
  Another option would be to setup your producers differently to avoid the
  possibility of unsynchronized access from multiple threads (i.e. don't
  use the same thread pool approach), but whether you can do that will
  depend on your use case.
 
  -Ewen
 
  On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
   Hi Kafka Dev,
  
   I am getting following issue with Snappy Library.  I checked code for
   Snappy lib it seems to be fine.  Have you guys seen this issue ?
  
   2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
   org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
   kafka producer I/O thread:
   *java.lang.NullPointerException*
   at
  
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
   at
  
 org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
   at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
   at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
   at
  
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
   at
  
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
   at
  
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
   at
  
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
   at java.lang.Thread.run(Thread.java:744)
  
  
   Here is code for Snappy
  
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
   :
  
   153
   
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
 
*if* (inputBuffer
   
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
 
   == *null* || (buffer != *null*  buffer.length  inputBuffer
   
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
 .length
   
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
 ))
   {
  
  
   Thanks,
  
   Bhavesh





Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-21 Thread Jun Rao
KAFKA-1583 will only be in trunk. It would be nice if we fix KAFKA-1634 in
0.8.2 too (may need a separate patch). If we can get a patch ready, we can
include it in 0.8.2 final.

Thanks,

Jun

On Tue, Oct 21, 2014 at 8:00 AM, Joel Koshy jjkosh...@gmail.com wrote:

 KAFKA-1634 is orthogonal to 1583 right? (BTW sorry about the delay in
 reviewing 1583 - I'm on it.)

 KAFKA-1634 shouldn't be too difficult to address so I'm hoping we can
 get that in 0.8.2 if not the beta release.

 Joel

 On Mon, Oct 20, 2014 at 09:14:17PM -0700, Jun Rao wrote:
  I move KAFKA-1634 to 0.8.3 since it's easier to fix after KAFKA-1583 is
  committed. With this, we have resolved all known blocker issues for
 0.8.2.
  Thanks everyone.
 
  Joe,
 
  Do you want to help start rolling an 0.8.2 beta release? We can decide if
  we need to include any other fixes (e.g., KAFKA-1481) in the 0.8.2 final
  release.
 
  Thanks,
 
  Jun
 
  On Thu, Oct 16, 2014 at 3:53 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   +1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2
   so that will only go out with 0.8.3.
  
   Joel
  
   On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote:
Agree.
On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote:
   
 +1 on doing an 0.8.2 beta.

 Guozhang,

 kafka-1583 is relatively large. Given that we are getting close to
 releasing 0.8.2 beta, my feeling is that we probably shouldn't
 include
   it
 in 0.8.2 beta even if we can commit it in a few days.

 Thanks,

 Jun

 On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com
 
   wrote:

  Regarding 1634, I was intended to work on that after 1583 since
 it
   will
  changes the commit offset request handling logic a lot. If people
   think
  1583 is only a few days away before check-in, we can leave in in
  0.8.2-beta; otherwise we can push to 0.8.3.
 
  Guozhang
 
  On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly
 
   wrote:
 
   +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks
 later.
  
   I agree to the tickets you brought up to have in 0.8.2-beta and
   also
   https://issues.apache.org/jira/browse/KAFKA-1493 for lz4
   compression.
  
   /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop
   /
   On Oct 16, 2014 12:55 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   
  wrote:
  
Another JIRA that will be nice to include as part of
 0.8.2-beta
   is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes
 the
 mbean
naming. Looking for people's thoughts on 2 things here -
   
1. How do folks feel about doing a 0.8.2-beta release right
 now
   and
  0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones
   mentioned
above) in 0.8.2-beta? If so, it will be great to know now so
 it
   will
   allow
us to move forward with the beta release quickly.
   
Thanks,
Neha
   
On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
  neha.narkh...@gmail.com
wrote:
   
 Hi,

 We have accumulated an impressive list of pretty major
   features in
   0.8.2
-
 Delete topic
 Automated leader rebalancing
 Controlled shutdown
 Offset management
 Parallel recovery
 min.isr and
 clean leader election

 In the past, what has worked for major feature releases is
 a
   beta
   release
 prior to a final release. I'm proposing we do the same for
   0.8.2.
 The
only
 blockers for 0.8.2-beta, that I know of are -

 https://issues.apache.org/jira/browse/KAFKA-1493 (Is a
 major
 change
   and
 requires some thinking about the new dependency. Since it
 is
   not
  fully
 ready and there are things to think about, I suggest we
 take it
 out,
think
 it end to end and then include it in 0.8.3.)
 https://issues.apache.org/jira/browse/KAFKA-1634 (This
 has an
 owner:
 Guozhang Wang)
 https://issues.apache.org/jira/browse/KAFKA-1671 (Has a
 patch
   and
 is
 waiting on a review by Joe Stein)

 It seems that 1634 and 1671 can get wrapped up in a week.
 Do
   people
   think
 we should cut 0.8.2-beta by next week?

 Thanks,
 Neha

   
  
 
 
 
  --
  -- Guozhang
 

  
  




Re: Review Request 26373: Patch for KAFKA-1647

2014-10-21 Thread Jun Rao


 On Oct. 21, 2014, 12:18 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/ReplicaManager.scala, lines 481-506
  https://reviews.apache.org/r/26373/diff/3/?file=725140#file725140line481
 
  This doesn't quite fix the original problem though. The original 
  problem is that if the leader is not alive, we won't call 
  partition.makeFollower(), in which the local replica is created. If a local 
  replica is not created, the partition will be ignored when checkingpoint HW 
  and we lose the last checkpointed HW.
  
  So, we have to call partition.makerFollower() for every follower, 
  whether its leader is live or not. After this, we can proceed with the rest 
  of the steps for only those partitions with a live leader. We can log a 
  warning for those partitions w/o a live leader.
 
 Jiangjie Qin wrote:
 Hi Jun, thanks for the review. 
 I think the high watermark is actually read in 
 partition.getOrCreateReplica(). As you said, that means in order to preserve 
 the high watermark, the local replica is supposed to be created. The original 
 code did not create local replica when remote leader is not up so it lost the 
 high watermark.
 partition.getOrCreateReplica() is actually called in the following 2 
 places:
 1. partition.makeFollower()
 2. ReplicaManager line 515, when we truncate the log for the partitions 
 in partitionsToMakeFollower.
 Both of them could preserve high watermark. The difference between them 
 is that in (1) all the replica for the partition, including the 
 not-existing-yet remote replica, will be created. However in (2) only local 
 replica will be created. Because I'm not 100% sure if it could cause some 
 other issue if we create a remote replica when it is actually not up yet, so 
 I chose to preserve the high watermark in (2) instead of in (1), which has 
 less change compared with original code.
 
 Neha Narkhede wrote:
 Did you get a chance to repeat the kind of testing that was done to find 
 this bug? I'd be more comfortable accepting this patch if we did that sort of 
 testing since this change is tricky.

Thanks for the explanation. Got it. On the other hand, if the leader is not 
ready, perhaps it's better not to truncate the data in the follower since this 
could cause some unnecessary data loss if the follower is later selected as the 
new leader.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review57490
---


On Oct. 18, 2014, 7:26 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 18, 2014, 7:26 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Joel's comments.
 
 
 the version 2 code seems to be submitted by mistake... This should be the 
 code for review that addressed Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26666: Patch for KAFKA-1653

2014-10-21 Thread Neha Narkhede


 On Oct. 21, 2014, 4:44 p.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala, line 111
  https://reviews.apache.org/r/2/diff/2/?file=723448#file723448line111
 
  I ran a quick test on the following reassignment file and it didn't 
  warn me about the duplicates in 1) the replica list and 2) the partition 
  itself
  
  While running this test, I think it may also be worth de-duping the 
  topic list.

Actually ignore the first part of the comment above. My kafka jar was stale. It 
did warn me about the duplicate replica list, but not about the partition. It 
will be helpful to get an error if there are duplicate partitions as well as 
topics in the reassignment file. Realized this only while running this test 
example


- Neha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/#review57602
---


On Oct. 16, 2014, 9:54 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/2/
 ---
 
 (Updated Oct. 16, 2014, 9:54 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1653
 https://issues.apache.org/jira/browse/KAFKA-1653
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Generate error for duplicates in PreferredLeaderElectionCommand instead of 
 just swallowing duplicates.
 
 
 Report which entries are duplicated for ReassignPartitionCommand since they 
 may be difficult to find in large reassignments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
 c7918483c02040a7cc18d6e9edbd20a3025a3a55 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/admin/TopicCommand.scala 
 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
   core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
 d298e7e81acc7427c6cf4796b445966267ca54eb 
 
 Diff: https://reviews.apache.org/r/2/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-21 Thread Wojciech Kuranowski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178396#comment-14178396
 ] 

Wojciech Kuranowski edited comment on KAFKA-1718 at 10/21/14 4:47 PM:
--

I have compiled Kafka 0.8.2 with different error messages for validation and 
revalidation and it seems that this issue is triggered in revalidation after 
recompression. In my case:
kafka.common.MessageSizeTooLargeException: revalidate - Message size is 
3382345 bytes which exceeds the maximum configured message size of 100.

It's strange that message after recompression is 3 times bigger than the limit. 
Is broker miscalculating something?



was (Author: noxis):
I have compiled kafka 0.8.2 with different messages for validation and 
revalidation, and it seems that this issue is triggered in revalidation after 
recompression. In my case:
kafka.common.MessageSizeTooLargeException: revalidate - Message size is 
3382345 bytes which exceeds the maximum configured message size of 100.

It's strange that message after recompression is 3 times bigger than the limit. 
Is broker miscalculating something?


 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by 

[jira] [Commented] (KAFKA-1678) add new options for reassign partition to better manager dead brokers

2014-10-21 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178620#comment-14178620
 ] 

Neha Narkhede commented on KAFKA-1678:
--

There are a few more usability issues with the reassignment tool. For example, 
it allows the user to start reassignment for a topic that doesn't exist and 
also reassign to brokers that don't exist. I think it is important to fix some 
usability issues with this tool before adding more options to it. 

 add new options for reassign partition to better manager dead brokers
 -

 Key: KAFKA-1678
 URL: https://issues.apache.org/jira/browse/KAFKA-1678
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
Assignee: Gwen Shapira
  Labels: operations
 Fix For: 0.8.3


 this is in two forms
 --replace-replica 
 which is from broker.id to broker.id
 and 
 --remove-replica
 which is just a single broker.id



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1678) add new options for reassign partition to better manager dead brokers

2014-10-21 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178630#comment-14178630
 ] 

Joe Stein commented on KAFKA-1678:
--

Agreed, can this ticket be an update for those two items also? I figure the 
work can all be done in one patch?

 add new options for reassign partition to better manager dead brokers
 -

 Key: KAFKA-1678
 URL: https://issues.apache.org/jira/browse/KAFKA-1678
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
Assignee: Gwen Shapira
  Labels: operations
 Fix For: 0.8.3


 this is in two forms
 --replace-replica 
 which is from broker.id to broker.id
 and 
 --remove-replica
 which is just a single broker.id



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1678) add new options for reassign partition to better manager dead brokers

2014-10-21 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178637#comment-14178637
 ] 

Neha Narkhede commented on KAFKA-1678:
--

Yes, I don't mind if it is part of the same patch. Just wanted to point out 
that we need to simplify this tool :)

 add new options for reassign partition to better manager dead brokers
 -

 Key: KAFKA-1678
 URL: https://issues.apache.org/jira/browse/KAFKA-1678
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
Assignee: Gwen Shapira
  Labels: operations
 Fix For: 0.8.3


 this is in two forms
 --replace-replica 
 which is from broker.id to broker.id
 and 
 --remove-replica
 which is just a single broker.id



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1678) add new options for reassign partition to better manager dead brokers

2014-10-21 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178646#comment-14178646
 ] 

Gwen Shapira commented on KAFKA-1678:
-

All yours, [~joestein].

 add new options for reassign partition to better manager dead brokers
 -

 Key: KAFKA-1678
 URL: https://issues.apache.org/jira/browse/KAFKA-1678
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
Assignee: Gwen Shapira
  Labels: operations
 Fix For: 0.8.3


 this is in two forms
 --replace-replica 
 which is from broker.id to broker.id
 and 
 --remove-replica
 which is just a single broker.id



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-21 Thread Gwen Shapira
Note that I added a patch with 0.8.2 documentation to KAFKA-1555 (in
order to document the new min.isr behavior).

We need to get this committed in SVN before the release, and probably
document few more things (new producer for example).

Gwen

On Tue, Oct 21, 2014 at 12:07 PM, Jun Rao jun...@gmail.com wrote:
 KAFKA-1583 will only be in trunk. It would be nice if we fix KAFKA-1634 in
 0.8.2 too (may need a separate patch). If we can get a patch ready, we can
 include it in 0.8.2 final.

 Thanks,

 Jun

 On Tue, Oct 21, 2014 at 8:00 AM, Joel Koshy jjkosh...@gmail.com wrote:

 KAFKA-1634 is orthogonal to 1583 right? (BTW sorry about the delay in
 reviewing 1583 - I'm on it.)

 KAFKA-1634 shouldn't be too difficult to address so I'm hoping we can
 get that in 0.8.2 if not the beta release.

 Joel

 On Mon, Oct 20, 2014 at 09:14:17PM -0700, Jun Rao wrote:
  I move KAFKA-1634 to 0.8.3 since it's easier to fix after KAFKA-1583 is
  committed. With this, we have resolved all known blocker issues for
 0.8.2.
  Thanks everyone.
 
  Joe,
 
  Do you want to help start rolling an 0.8.2 beta release? We can decide if
  we need to include any other fixes (e.g., KAFKA-1481) in the 0.8.2 final
  release.
 
  Thanks,
 
  Jun
 
  On Thu, Oct 16, 2014 at 3:53 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   +1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2
   so that will only go out with 0.8.3.
  
   Joel
  
   On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote:
Agree.
On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote:
   
 +1 on doing an 0.8.2 beta.

 Guozhang,

 kafka-1583 is relatively large. Given that we are getting close to
 releasing 0.8.2 beta, my feeling is that we probably shouldn't
 include
   it
 in 0.8.2 beta even if we can commit it in a few days.

 Thanks,

 Jun

 On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com
 
   wrote:

  Regarding 1634, I was intended to work on that after 1583 since
 it
   will
  changes the commit offset request handling logic a lot. If people
   think
  1583 is only a few days away before check-in, we can leave in in
  0.8.2-beta; otherwise we can push to 0.8.3.
 
  Guozhang
 
  On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly
 
   wrote:
 
   +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks
 later.
  
   I agree to the tickets you brought up to have in 0.8.2-beta and
   also
   https://issues.apache.org/jira/browse/KAFKA-1493 for lz4
   compression.
  
   /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop
   /
   On Oct 16, 2014 12:55 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   
  wrote:
  
Another JIRA that will be nice to include as part of
 0.8.2-beta
   is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes
 the
 mbean
naming. Looking for people's thoughts on 2 things here -
   
1. How do folks feel about doing a 0.8.2-beta release right
 now
   and
  0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones
   mentioned
above) in 0.8.2-beta? If so, it will be great to know now so
 it
   will
   allow
us to move forward with the beta release quickly.
   
Thanks,
Neha
   
On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
  neha.narkh...@gmail.com
wrote:
   
 Hi,

 We have accumulated an impressive list of pretty major
   features in
   0.8.2
-
 Delete topic
 Automated leader rebalancing
 Controlled shutdown
 Offset management
 Parallel recovery
 min.isr and
 clean leader election

 In the past, what has worked for major feature releases is
 a
   beta
   release
 prior to a final release. I'm proposing we do the same for
   0.8.2.
 The
only
 blockers for 0.8.2-beta, that I know of are -

 https://issues.apache.org/jira/browse/KAFKA-1493 (Is a
 major
 change
   and
 requires some thinking about the new dependency. Since it
 is
   not
  fully
 ready and there are things to think about, I suggest we
 take it
 out,
think
 it end to end and then include it in 0.8.3.)
 https://issues.apache.org/jira/browse/KAFKA-1634 (This
 has an
 owner:
 Guozhang Wang)
 https://issues.apache.org/jira/browse/KAFKA-1671 (Has a
 patch
   and
 is
 waiting on a review by Joe Stein)

 It seems that 1634 and 1671 can get wrapped up in a week.
 Do
   people
   think
 we 

[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178739#comment-14178739
 ] 

Jun Rao commented on KAFKA-1481:


Otis,

Yes, we can include this in the 0.8.2 final release. Does that sound good? 
Thanks,

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.3

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1683) Implement a session concept in the socket server

2014-10-21 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira reassigned KAFKA-1683:
---

Assignee: Gwen Shapira

 Implement a session concept in the socket server
 --

 Key: KAFKA-1683
 URL: https://issues.apache.org/jira/browse/KAFKA-1683
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Gwen Shapira

 To implement authentication we need a way to keep track of some things 
 between requests. The initial use for this would be remembering the 
 authenticated user/principle info, but likely more uses would come up (for 
 example we will also need to remember whether and which encryption or 
 integrity measures are in place on the socket so we can wrap and unwrap 
 writes and reads).
 I was thinking we could just add a Session object that might have a user 
 field. The session object would need to get added to RequestChannel.Request 
 so it is passed down to the API layer with each request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1687) SASL tests

2014-10-21 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178791#comment-14178791
 ] 

Gwen Shapira commented on KAFKA-1687:
-

MiniKDC can be used to test with Kerberos

 SASL tests
 --

 Key: KAFKA-1687
 URL: https://issues.apache.org/jira/browse/KAFKA-1687
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps

 We need tests for our SASL/Kerberos setup. This is not that easy to do with 
 Kerberos because of the dependency on the KDC. However possibly we can test 
 with another SASL mechanism that doesn't have that dependency?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1722) static analysis code coverage for pci audit needs

2014-10-21 Thread Joe Stein (JIRA)
Joe Stein created KAFKA-1722:


 Summary: static analysis code coverage for pci audit needs
 Key: KAFKA-1722
 URL: https://issues.apache.org/jira/browse/KAFKA-1722
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
 Fix For: 0.9.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1722) static analysis code coverage for pci audit needs

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1722:
-
Component/s: security

 static analysis code coverage for pci audit needs
 -

 Key: KAFKA-1722
 URL: https://issues.apache.org/jira/browse/KAFKA-1722
 Project: Kafka
  Issue Type: Bug
  Components: security
Reporter: Joe Stein
 Fix For: 0.9.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1688) Add authorization interface and naive implementation

2014-10-21 Thread Don Bosco Durai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178799#comment-14178799
 ] 

Don Bosco Durai edited comment on KAFKA-1688 at 10/21/14 6:33 PM:
--

Joe/Jay, I can look into this. Can you assign this to me? Thanks


was (Author: bosco):
Joe/Jay, I can look into this. Thanks

 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps

 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1555:
--
Reviewer: Joel Koshy  (was: Jun Rao)

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555.0.patch, 
 KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, 
 KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, 
 KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-21 Thread Vladimir Tretyakov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178815#comment-14178815
 ] 

Vladimir Tretyakov commented on KAFKA-1481:
---

Hi, Jun, will be perfect!

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.3

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1684) Implement TLS/SSL authentication

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein reassigned KAFKA-1684:


Assignee: Ivan Lyutov

 Implement TLS/SSL authentication
 

 Key: KAFKA-1684
 URL: https://issues.apache.org/jira/browse/KAFKA-1684
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Ivan Lyutov

 Add an SSL port to the configuration and advertise this as part of the 
 metadata request.
 If the SSL port is configured the socket server will need to add a second 
 Acceptor thread to listen on it. Connections accepted on this port will need 
 to go through the SSL handshake prior to being registered with a Processor 
 for request processing.
 SSL requests and responses may need to be wrapped or unwrapped using the 
 SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
 is very similar to what will need to be done for SASL-based authentication 
 schemes. We should have a uniform interface that covers both of these and we 
 will need to store the instance in the session with the request. The socket 
 server will have to use this object when reading and writing requests. We 
 will need to take care with the FetchRequests as the current 
 FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
 can only use this optimization for unencrypted sockets that don't require 
 userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26666: Patch for KAFKA-1653

2014-10-21 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/
---

(Updated Oct. 21, 2014, 6:58 p.m.)


Review request for kafka.


Bugs: KAFKA-1653
https://issues.apache.org/jira/browse/KAFKA-1653


Repository: kafka


Description (updated)
---

Generate error for duplicates in PreferredLeaderElectionCommand instead of just 
swallowing duplicates.


Report which entries are duplicated for ReassignPartitionCommand since they may 
be difficult to find in large reassignments.


Report duplicate topics and duplicate topic partitions in 
ReassignPartitionsCommand. Make all duplication error messagse include details 
about what item was duplicated.


Diffs (updated)
-

  core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
c7918483c02040a7cc18d6e9edbd20a3025a3a55 
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
691d69a49a240f38883d2025afaec26fd61281b5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 
7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
  core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
d298e7e81acc7427c6cf4796b445966267ca54eb 
  core/src/main/scala/kafka/utils/Utils.scala 
29d5a17d4a03cfd3f3cdd2994cbd783a4be2732e 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
a7b1fdcb50d5cf930352d37e39cb4fc9a080cb12 

Diff: https://reviews.apache.org/r/2/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-21 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178845#comment-14178845
 ] 

Ewen Cheslack-Postava commented on KAFKA-1653:
--

Updated reviewboard https://reviews.apache.org/r/2/diff/
 against branch origin/trunk

 Duplicate broker ids allowed in replica assignment
 --

 Key: KAFKA-1653
 URL: https://issues.apache.org/jira/browse/KAFKA-1653
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch, 
 KAFKA-1653_2014-10-21_11:57:50.patch


 The reassign partitions command and the controller do not ensure that all 
 replicas for a partition are on different brokers. For example, you could set 
 1,2,2 as the list of brokers for the replicas.
 kafka-topics.sh --describe --under-replicated will list these partitions as 
 under-replicated, but I can't see a reason why the controller should allow 
 this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-21 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1653:
-
Attachment: KAFKA-1653_2014-10-21_11:57:50.patch

 Duplicate broker ids allowed in replica assignment
 --

 Key: KAFKA-1653
 URL: https://issues.apache.org/jira/browse/KAFKA-1653
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch, 
 KAFKA-1653_2014-10-21_11:57:50.patch


 The reassign partitions command and the controller do not ensure that all 
 replicas for a partition are on different brokers. For example, you could set 
 1,2,2 as the list of brokers for the replicas.
 kafka-topics.sh --describe --under-replicated will list these partitions as 
 under-replicated, but I can't see a reason why the controller should allow 
 this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-21 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178856#comment-14178856
 ] 

Bhavesh Mistry commented on KAFKA-1721:
---

I have filled https://github.com/xerial/snappy-java/issues/88 for tracking for 
Snappy. 

There is patch provided and Thanks to [~ewencp] for testing the patch.  Please 
see above link for more details.


Thanks,

Bhavesh 

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava

 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Security

2014-10-21 Thread Joe Stein
Hi, quick circle back around so folks are aware.

The security tickets are tracked here
https://issues.apache.org/jira/browse/KAFKA/component/12324383/?selectedTab=com.atlassian.jira.jira-projects-plugin:component-issues-panel

If you want to jump in on a ticket please do!  If no one is assigned then
assign it to your self, ask questions, start coding and give it a go! If
you can't assign to yourself send email on list with the ticket number and
your user name.  Someone can then assign it to you and also add perms in
JIRA so moving forward you can do that too. If someone is already assigned
they may be working on it already.  If it has been 3 weeks (lets say) and
no patches/comments, etc then a ping in any case I think is good to-do
regardless.

A patch should be going up tomorrow for
https://issues.apache.org/jira/browse/KAFKA-1684 that includes the
MetaDataResponse changes required too for channel (plaintext and ssl) port
advertisement.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Mon, Sep 22, 2014 at 8:38 PM, Joe Stein joe.st...@stealth.ly wrote:

 At the request of the folks that were at the first meeting and can't
 attend tomorrow I am moving tomorrow's meeting to next Tuesday (same bat
 time, same bat channel).

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Tue, Sep 16, 2014 at 4:59 PM, Joe Stein joe.st...@stealth.ly wrote:

 yup, yup, yup | done, done, done

 On Tue, Sep 16, 2014 at 1:54 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Joe,

 Can you add me, Jun, and Neha too.

 -Jay

 On Tue, Sep 16, 2014 at 10:37 AM, Joe Stein joe.st...@stealth.ly
 wrote:
  Hi Andrew, yes the meeting took place and we plan to-do it every two
 weeks
  (same bat time, same bat channel) moving forward.
 
  In attendance was Michael Herstine (LinkedIn), Arvind Mani (LinkedIn),
 Gwen
  Shapira (Cloudera) and myself.
 
  Gwen updated the wiki after our discussion.  Basically we are thinking
 of
  using 3 ports one for plain text (so like it is now), one for SASL
  (implementing username/password and kerberos at least) and one for SSL
 and
  they will all be configurable on/off.  Some investigation is going on
 now
  to see about how we can do this without making any wire protocol
 changes
  (ideal) or minimal changes at least.
 
  Let me know and I can add you to the invite if you would like to
 contribute
  the more help and input the better for sure.
 
  Also in regards to KAFKA-1477 I just asked Ivan to update the patch to
  latest trunk and we could (demand required) make a patch that works
 with
  0.8.1.X too for folks to use... This doesn't work yet with the new
 producer
  (TBD) or any other clients so be aware it is not yet baked in and from
  release project perspective I don't know what in that patch will
 survive
  (hopefully all of it).
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
  On Tue, Sep 16, 2014 at 10:17 AM, Andrew Psaltis 
 psaltis.and...@gmail.com
  wrote:
 
  Hi,
  I was just reading the recent changes to:
  https://cwiki.apache.org/confluence/display/KAFKA/Security after
 getting
  off a call about Kafka security and how we are jumping through hoops
 --
  like having PGP keys on the consumers and producers to get around the
 lack
  of SSL support. Did the meeting that Joe proposed happen for Sept 9th
  happen? If not is there a plan to have it? I was also looking at:
  https://issues.apache.org/jira/browse/KAFKA-1477 and it seems like
 there
  have been no comments since 11/08/2014. I would be interested in
 helping
  with the TLS/SSL support as we have a need for it now.
 
  Thanks,
  Andrew
 






[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-21 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179014#comment-14179014
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---

[~jkreps],

I am sorry I did not get back to you soon.  The cost of enqueue a message into 
single partition only is ~54%. 

Here is test I have done:

To *single* partition:
Throughput per Thread=2666.5  byte(s)/microsecond
All done...!

To *all* partition:
Throughput per Thread=5818.181818181818  byte(s)/microsecond
All done...!


The cost of sync block in roughly around  

{code}
package org.kafka.test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 75;
static CountDownLatch latch = new CountDownLatch(numberTh);
public static void main(String[] args) throws IOException, 
InterruptedException {

//Thread.sleep(6);

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = logmon.test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
int numberOfLoop = 5000;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 1;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));
MyProducer [] producerThResult = new MyProducer [numberTh];
for(int i = 0 ; i  numberTh;i++){
producerThResult[i] = new 
MyProducer(producer,numberOfLoop,builder.toString(), topic);
service.execute(producerThResult[i]);
}   
latch.await();
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All Producers done...!);
// now interpret the result... of this...
long lowestTime = 0 ;
for(int i =0 ; i  producerThResult.length;i++){
if(i == 1){
lowestTime = 
producerThResult[i].totalTimeinNano;
}else if ( producerThResult[i].totalTimeinNano  
lowestTime){
lowestTime = 
producerThResult[i].totalTimeinNano;
}
}
long bytesSend = msgLenth * numberOfLoop;
long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, 
TimeUnit.NANOSECONDS);

double throughput = (bytesSend * 1.0) / (durationInMs);
System.out.println(Throughput per Thread= + throughput +   
byte(s)/microsecond);

System.out.println(All done...!);

}



static class MyProducer implements CallableLong , Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;
long totalTimeinNano = 0;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
// ALWAYS SEND DATA TO PARTITION 1 only...  
//ProducerRecord record = new ProducerRecord(topic, 
1,null,msg.toString().getBytes());
ProducerRecord record = new 

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-21 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179014#comment-14179014
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/21/14 8:26 PM:
-

[~jkreps],

I am sorry I did not get back to you soon.  The cost of enqueue a message into 
single partition is ~54% as compare to round-robin.  (test with 32 partition to 
single topic and 3 cluster)  The throughput is measuring the cost  of put data 
into buffer.

Here is test I have done:

To *single* partition:
Throughput per Thread=2666.5  byte(s)/microsecond
All done...!

To *all* partition:
Throughput per Thread=5818.181818181818  byte(s)/microsecond
All done...!

{code}
package org.kafka.test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 75;
static CountDownLatch latch = new CountDownLatch(numberTh);
public static void main(String[] args) throws IOException, 
InterruptedException {

//Thread.sleep(6);

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = logmon.test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
int numberOfLoop = 5000;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 1;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));
MyProducer [] producerThResult = new MyProducer [numberTh];
for(int i = 0 ; i  numberTh;i++){
producerThResult[i] = new 
MyProducer(producer,numberOfLoop,builder.toString(), topic);
service.execute(producerThResult[i]);
}   
latch.await();
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All Producers done...!);
// now interpret the result... of this...
long lowestTime = 0 ;
for(int i =0 ; i  producerThResult.length;i++){
if(i == 1){
lowestTime = 
producerThResult[i].totalTimeinNano;
}else if ( producerThResult[i].totalTimeinNano  
lowestTime){
lowestTime = 
producerThResult[i].totalTimeinNano;
}
}
long bytesSend = msgLenth * numberOfLoop;
long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, 
TimeUnit.NANOSECONDS);

double throughput = (bytesSend * 1.0) / (durationInMs);
System.out.println(Throughput per Thread= + throughput +   
byte(s)/microsecond);

System.out.println(All done...!);

}



static class MyProducer implements CallableLong , Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;
long totalTimeinNano = 0;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
// ALWAYS SEND DATA TO PARTITION 1 only...  
   

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-21 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179014#comment-14179014
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/21/14 8:28 PM:
-

[~jkreps],

I am sorry I did not get back to you so soon.  The cost of enqueue a message 
into single partition is ~54% as compare to round-robin.  (test with 32 
partitions to single topic and 3 broker cluster)  The throughput is measuring 
the cost  of put data into buffer only not cost of sending data to brokers.

Here is test I have done:

To *single* partition:
Throughput per Thread=2666.5  byte(s)/microsecond
All done...!

To *all* partition:
Throughput per Thread=5818.181818181818  byte(s)/microsecond
All done...!

{code}
package org.kafka.test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 75;
static CountDownLatch latch = new CountDownLatch(numberTh);
public static void main(String[] args) throws IOException, 
InterruptedException {

//Thread.sleep(6);

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = logmon.test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
int numberOfLoop = 5000;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 1;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));
MyProducer [] producerThResult = new MyProducer [numberTh];
for(int i = 0 ; i  numberTh;i++){
producerThResult[i] = new 
MyProducer(producer,numberOfLoop,builder.toString(), topic);
service.execute(producerThResult[i]);
}   
latch.await();
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All Producers done...!);
// now interpret the result... of this...
long lowestTime = 0 ;
for(int i =0 ; i  producerThResult.length;i++){
if(i == 1){
lowestTime = 
producerThResult[i].totalTimeinNano;
}else if ( producerThResult[i].totalTimeinNano  
lowestTime){
lowestTime = 
producerThResult[i].totalTimeinNano;
}
}
long bytesSend = msgLenth * numberOfLoop;
long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, 
TimeUnit.NANOSECONDS);

double throughput = (bytesSend * 1.0) / (durationInMs);
System.out.println(Throughput per Thread= + throughput +   
byte(s)/microsecond);

System.out.println(All done...!);

}



static class MyProducer implements CallableLong , Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;
long totalTimeinNano = 0;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
// ALWAYS SEND DATA TO 

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-21 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179014#comment-14179014
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/21/14 8:29 PM:
-

[~jkreps],

I am sorry I did not get back to you so soon.  The cost of enqueue a message 
into single partition is ~54% as compare to round-robin.  (test with 32 
partitions to single topic and 3 broker cluster)  The throughput is measuring 
the cost  of put data into buffer only not cost of sending data to brokers.

Here is test I have done:

To *single* partition:
Throughput per Thread=2666.5  byte(s)/microsecond
All done...!

To *all* partition:
Throughput per Thread=5818.181818181818  byte(s)/microsecond
All done...!

Here is test program for this:
{code}
package org.kafka.test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 75;
static CountDownLatch latch = new CountDownLatch(numberTh);
public static void main(String[] args) throws IOException, 
InterruptedException {

//Thread.sleep(6);

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = logmon.test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
int numberOfLoop = 5000;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 1;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));
MyProducer [] producerThResult = new MyProducer [numberTh];
for(int i = 0 ; i  numberTh;i++){
producerThResult[i] = new 
MyProducer(producer,numberOfLoop,builder.toString(), topic);
service.execute(producerThResult[i]);
}   
latch.await();
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All Producers done...!);
// now interpret the result... of this...
long lowestTime = 0 ;
for(int i =0 ; i  producerThResult.length;i++){
if(i == 1){
lowestTime = 
producerThResult[i].totalTimeinNano;
}else if ( producerThResult[i].totalTimeinNano  
lowestTime){
lowestTime = 
producerThResult[i].totalTimeinNano;
}
}
long bytesSend = msgLenth * numberOfLoop;
long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, 
TimeUnit.NANOSECONDS);

double throughput = (bytesSend * 1.0) / (durationInMs);
System.out.println(Throughput per Thread= + throughput +   
byte(s)/microsecond);

System.out.println(All done...!);

}



static class MyProducer implements CallableLong , Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;
long totalTimeinNano = 0;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
 

Review Request 26994: Patch for KAFKA-1719

2014-10-21 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/
---

Review request for kafka.


Bugs: KAFKA-1719
https://issues.apache.org/jira/browse/KAFKA-1719


Repository: kafka


Description
---

make mirror maker exit when one consumer/producer thread exits.


Diffs
-

  core/src/main/scala/kafka/tools/MirrorMaker.scala 
b8698ee1469c8fbc92ccc176d916eb3e28b87867 

Diff: https://reviews.apache.org/r/26994/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.

2014-10-21 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1719:

Attachment: KAFKA-1719.patch

 Make mirror maker exit when one consumer/producer thread exits.
 ---

 Key: KAFKA-1719
 URL: https://issues.apache.org/jira/browse/KAFKA-1719
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1719.patch


 When one of the consumer/producer thread exits, the entire mirror maker will 
 be blocked. In this case, it is better to make it exit. It seems a single 
 ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't 
 need a list for the connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.

2014-10-21 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179047#comment-14179047
 ] 

Jiangjie Qin commented on KAFKA-1719:
-

Created reviewboard https://reviews.apache.org/r/26994/diff/
 against branch origin/trunk

 Make mirror maker exit when one consumer/producer thread exits.
 ---

 Key: KAFKA-1719
 URL: https://issues.apache.org/jira/browse/KAFKA-1719
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1719.patch


 When one of the consumer/producer thread exits, the entire mirror maker will 
 be blocked. In this case, it is better to make it exit. It seems a single 
 ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't 
 need a list for the connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-10-21 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179077#comment-14179077
 ] 

Andrew Olson commented on KAFKA-1647:
-

[~nehanarkhede] Will this correction be included in 0.8.2-beta? It sounds like 
a blocker for the 0.8.2 release, in any case.

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Joel Koshy
Assignee: Jiangjie Qin
Priority: Critical
  Labels: newbie++
 Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, 
 KAFKA-1647_2014-10-18_00:26:51.patch


 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 p0: leader = b0, ISR = {b0, b1}
 p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make itself (b1) the leader for p1 (and create the local replica 
 object corresponding to p1). It will however abort the become follower 
 transition for p0 because the designated leader b0 is offline. So it will not 
 create the local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1477) add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication

2014-10-21 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179081#comment-14179081
 ] 

Jay Kreps commented on KAFKA-1477:
--

I took a look at this patch in a little more detail, I think there is likely a 
fair bit of work to do before we can check this in.

For example, some things that concern me: The SSLSocketChannel class extends 
SocketChannel. We seem to be simulating blocking on a non-blocking socket using 
sleep calls in a loop. Then even lots of minor things like channelFor is doing 
handshaking and some odd unfinished looking code.

I suspect some of this may be done this way to minimize impact to existing code 
since it was being maintained as a patch, but that won't make sense once it is 
committed.

What about this as a path forward. Let's take this patch and extract just the 
server-side SSL support in SocketServer and try to get that into shape to be 
something we can commit. I think we can do this without simultaneously doing 
the clients. I think if we try to do this all at once we aren't going to get 
there. We can test this by adding to SocketServerTest and just using a blocking 
SSL connection. Here is what I think we need to do:
1. Do we need SSLSocketChannel? I think as long as the acceptor completes the 
handshake from then on all that is needed is to wrap/unwrap bytes, right?
2. Modify the acceptor in SocketServer to do non-blocking handling of the SSL 
handshake. By the time the socket is accepted and handed over to the processor 
the ssl handshake should be complete.
3. Create some kind of generic interface for wrap/upwrap (SecurityCodec?) as we 
will need to implement this for both ssl and for kerberos. This interface will 
wrap the SSLEngine (or SASL engine) associated with a given connection.








 add authentication layer and initial JKS x509 implementation for brokers, 
 producers and consumer for network communication
 --

 Key: KAFKA-1477
 URL: https://issues.apache.org/jira/browse/KAFKA-1477
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Joe Stein
Assignee: Ivan Lyutov
 Fix For: 0.8.3

 Attachments: KAFKA-1477-binary.patch, KAFKA-1477.patch, 
 KAFKA-1477_2014-06-02_16:59:40.patch, KAFKA-1477_2014-06-02_17:24:26.patch, 
 KAFKA-1477_2014-06-03_13:46:17.patch, KAFKA-1477_trunk.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-21 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179092#comment-14179092
 ] 

Bhavesh Mistry commented on KAFKA-1481:
---

Hi [~junrao],

Can you please let me know if this will also address the [New Java Producer] 
metrics() method and which client.id or topic has special chars ?  So we have 
consistent naming across all JMX name bean or metrics() methods ?

Here is background on this:

{code}
Bhavesh,

Yes, allowing dot in clientId and topic makes it a bit harder to define the
JMX bean names. I see a couple of solutions here.

1. Disable dot in clientId and topic names. The issue is that dot may
already be used in existing deployment.

2. We can represent the JMX bean name differently in the new producer.
Instead of
  kafka.producer.myclientid:type=mytopic
we could change it to
  kafka.producer:clientId=myclientid,topic=mytopic

I felt that option 2 is probably better since it doesn't affect existing
users.

Otis,

We probably can also use option 2 to address KAFKA-1481. For topic/clientid
specific metrics, we could explicitly specify the metric name so that it
contains topic=mytopic,clientid=myclientid. That seems to be a much
cleaner way than having all parts included in a single string separated by
'|'.

Thanks,

Jun
{code}

Thanks,

Bhavesh 

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.3

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[VOTE] 0.8.2-beta Release Candidate 1

2014-10-21 Thread Joe Stein
This is the first candidate for release of Apache Kafka 0.8.2-beta

Release Notes for the 0.8.2-beta release
https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html

*** Please download, test and vote by Friday, October 24th, 2pm PT

Kafka's KEYS file containing PGP keys we use to sign the release:
https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1
and sha2 (SHA256) checksum.

* Release artifacts to be voted upon (source and binary):
https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/

* Maven artifacts to be voted upon prior to release:
https://repository.apache.org/content/groups/staging/

* scala-doc
https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/

* java-doc
https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/

* The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179102#comment-14179102
 ] 

Jun Rao commented on KAFKA-1481:


Thanks for the patch. Some comments below.

20. KafkaMetricsGroup:
20.1 In the following, instead of doing map(kv = ), could we do map { 
case(key, value) = }?
.filter(_._2 != ).map(kv = %s=%s.format(kv._1, kv._2))
20.2 In the following, shouldn't the pattern be (.* + clientId= + clientId 
+ .*).r
  val pattern = (.* + clientId + .*).r

21. TaggableInfo: tags should be an immutable hashmap, right?
case class TaggableInfo(tags: mutable.LinkedHashMap[String, String]) extends 
Taggable {

22. New files:
22.1 The license header should be at the very beginning, before the package and 
import.
22.2 I am not sure if we really need to have the Taggable trait. For example, 
in ConsumerTopicMetrics, we can just take the original clientIdAndTopic and 
explicitly create the tag for clientId and topic when creating those metrics. 
The tags are really specific to the metrics. So, we probably don't need to 
expose them in places other than where the metrics are created.

23. ReplicaFetcherManager: The first statement in shutdown() is on the same 
line as the method. A similar issue happens in a few other classes like 
RequestChannel and ConsumerFetcherManager. Perhaps you can follow the patch 
creation process in 
https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review#Patchsubmissionandreview-Simplecontributorworkflow

24. 
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats():
 Should we use clientId= + clientId in pattern?

25. SimpleConsumer: Ideally, we shouldn't change the constructor since this 
will be an api change.

26. Could you add a unit test to make sure that after a producer/consumer is 
closed, all metrics specific to the producer/consumer are removed?

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.3

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-10-21 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1647:
-
Fix Version/s: 0.8.2

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Joel Koshy
Assignee: Jiangjie Qin
Priority: Critical
  Labels: newbie++
 Fix For: 0.8.2

 Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, 
 KAFKA-1647_2014-10-18_00:26:51.patch


 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 p0: leader = b0, ISR = {b0, b1}
 p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make itself (b1) the leader for p1 (and create the local replica 
 object corresponding to p1). It will however abort the become follower 
 transition for p0 because the designated leader b0 is offline. So it will not 
 create the local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1723) make the metrics name in new producer more standard

2014-10-21 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1723:
--

 Summary: make the metrics name in new producer more standard
 Key: KAFKA-1723
 URL: https://issues.apache.org/jira/browse/KAFKA-1723
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Affects Versions: 0.8.2
Reporter: Jun Rao


The jmx name in the new producer looks like the following:
kafka.producer.myclientid:type=mytopic

However, this can be ambiguous since we allow . in client id and topic.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1723) make the metrics name in new producer more standard

2014-10-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179128#comment-14179128
 ] 

Jun Rao commented on KAFKA-1723:



One way to address this issue is to change the jmx name to the following.
  kafka.producer:clientId=myclientid,topic=mytopic

This will make sure the jmx name is consistent with the fix in KAFKA-1481.

 make the metrics name in new producer more standard
 ---

 Key: KAFKA-1723
 URL: https://issues.apache.org/jira/browse/KAFKA-1723
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Affects Versions: 0.8.2
Reporter: Jun Rao
 Fix For: 0.8.3


 The jmx name in the new producer looks like the following:
 kafka.producer.myclientid:type=mytopic
 However, this can be ambiguous since we allow . in client id and topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179132#comment-14179132
 ] 

Jun Rao commented on KAFKA-1481:


Bhavesh,

Created a subtask to track the metric name in the new producer. Thanks,

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.3

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26994: Patch for KAFKA-1719

2014-10-21 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review57680
---



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98497

Just a stylish comment: could you group java imports with scala / other lib 
imports, and leave kafka imports at the top?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98503

Can we add a FATAL log entry here: Consumer thread existed abnormally, 
stopping the whole mirror maker?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98501

Ditto above.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98499

Is this change intended?


- Guozhang Wang


On Oct. 21, 2014, 8:37 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 21, 2014, 8:37 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 make mirror maker exit when one consumer/producer thread exits.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [DISCUSSION] Message Metadata

2014-10-21 Thread Guozhang Wang
Hi Jun,

Regarding 4) in your comment, after thinking it for a while I cannot come
up a way to it along with log compaction without adding new fields into the
current format on message set. Do you have a better way that do not require
protocol changes?

Guozhang

On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang wangg...@gmail.com wrote:

 I have updated the wiki page incorporating received comments. We can
 discuss some more details on:

 1. How we want to do audit? Whether we want to have in-built auditing on
 brokers or even MMs or use  an audit consumer to fetch all messages from
 just brokers.

 2. How we can avoid de-/re-compression on brokers and MMs with log
 compaction turned on.

 3. How we can resolve unclean leader election resulted data inconsistency
 with control messages.

 Guozhang

 On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com
 wrote:

 Thanks for the detailed comments Jun! Some replies inlined.

 On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for the writeup.

 A few high level comments.

 1. Associating (versioned) schemas to a topic can be a good thing
 overall.
 Yes, this could add a bit more management overhead in Kafka. However, it
 makes sure that the data format contract between a producer and a
 consumer
 is kept and managed in a central place, instead of in the application.
 The
 latter is probably easier to start with, but is likely to be brittle in
 the
 long run.


 I am actually not proposing to not support associated versioned schemas
 for topics, but to not let some core Kafka functionalities like auditing
 being depend on schemas. I think this alone can separate the schema
 management from Kafka piping management (i.e. making sure every single
 message is delivered, and within some latency, etc). Adding additional
 auditing info into an existing schema will force Kafka to be aware of the
 schema systems (Avro, JSON, etc).



 2. Auditing can be a general feature that's useful for many applications.
 Such a feature can be implemented by extending the low level message
 format
 with a header. However, it can also be added as part of the schema
 management. For example, you can imagine a type of audited schema that
 adds
 additional auditing info to an existing schema automatically. Performance
 wise, it probably doesn't make a big difference whether the auditing info
 is added in the message header or the schema header.


 See replies above.


 3. We talked about avoiding the overhead of decompressing in both the
 broker and the mirror maker. We probably need to think through how this
 works with auditing. In the more general case where you want to audit
 every
 message, one has to do the decompression to get the individual message,
 independent of how the auditing info is stored. This means that if we
 want
 to audit the broker directly or the consumer in mirror maker, we have to
 pay the decompression cost anyway. Similarly, if we want to extend mirror
 maker to support some customized filtering/transformation logic, we also
 have to pay the decompression cost.


 I see your point. For that I would prefer to have a MM implementation
 that is able to do de-compress / re-compress ONLY if required, for example
 by auditing, etc. I agree that we have not thought through whether we
 should enable auditing on MM, and if yes how to do that, and we could
 discuss about that in a different thread. Overall, this proposal is not
 just for tackling de-compression on MM but about the feasibility of
 extending Kafka message header for system properties / app properties.


 Some low level comments.

 4. Broker offset reassignment (kafka-527):  This probably can be done
 with
 just a format change on the compressed message set.

 That is true. As I mentioned in the wiki each of the problems may be
 resolvable separately but I am thinking about a general way to get all of
 them.


 5. MirrorMaker refactoring: We probably can think through how general we
 want mirror maker to be. If we want to it to be more general, we likely
 need to decompress every message just like in a normal consumer. There
 will
 definitely be overhead. However, as long as mirror maker is made
 scalable,
 we can overcome the overhead by just running more instances on more
 hardware resources. As for the proposed message format change, we
 probably
 need to think through it a bit more. The honor-ship flag seems a bit
 hacky
 to me.


 Replied as part of 3). Sure we can discuss more about that, will update
 the wiki for collected comments.


 6. Adding a timestamp in each message can be a useful thing. It (1)
 allows
 log segments to be rolled more accurately; (2) allows finding an offset
 for
 a particular timestamp more accurately. I am thinking that the timestamp
 in
 the message should probably be the time when the leader receives the
 message. Followers preserve the timestamp set by leader. To avoid time
 going back during leader change, the leader can 

[jira] [Reopened] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy reopened KAFKA-1555:
---

I'm reopening this so we won't forget about the documentation patch.

If you prefer, we can open a separate jira for that and close this.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555.0.patch, 
 KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, 
 KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, 
 KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 24676: Rebase KAFKA-1583

2014-10-21 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57235
---


Very nicely done.

These are all minor comments - all but one concerning emptying the producer 
request that should be easily fixable if it is an issue. (It is the top comment)


core/src/main/scala/kafka/api/ProducerRequest.scala
https://reviews.apache.org/r/24676/#comment98557

I have a concern that this may actually be still needed. See comment under 
handleProducerRequest.sendResponseCallback



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/24676/#comment97784

Maybe use this:
Recorded replica %d log end offset (LEO)...

Also, instead of an explicit [%s,%d] format specifier I think we should 
start doing the following:

%s.format(TopicAndPartition(topic, partition))

That way we ensure a canonical toString for topic/partition pairs and can 
change it in one place in the future.

There are some places where we don't log with this agreed-upon format and 
it is a bit annoying, so going forward I think we should use the above. Can we 
add it to the logging improvements wiki?



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/24676/#comment97788

Since we still may update the HW shall we rename this to 
maybeUpdateHWAndExpandIsr



core/src/main/scala/kafka/log/Log.scala
https://reviews.apache.org/r/24676/#comment97797

Since this contains hw (which is a replication detail) should it really be 
in the replica manager?



core/src/main/scala/kafka/server/DelayedFetch.scala
https://reviews.apache.org/r/24676/#comment97809

How about just calling this responseCallback? It is slightly confusing to 
see references to callbackOnComplete and onComplete in the same class.



core/src/main/scala/kafka/server/DelayedFetch.scala
https://reviews.apache.org/r/24676/#comment97801

The earlier comment was useful. i.e., (in which case we return whatever 
data is available for the partitions that are currently led by this broker)



core/src/main/scala/kafka/server/DelayedFetch.scala
https://reviews.apache.org/r/24676/#comment97805

I'm a bit confused by case C. It can also happen if the delayed fetch 
happens to straddle a segment roll event; the comment seems a bit 
misleading/incomplete without that.

In fact, if it is lagging shouldn't it have been satisfied immediately 
without having to create a DelayedFetch in the first place?



core/src/main/scala/kafka/server/DelayedProduce.scala
https://reviews.apache.org/r/24676/#comment98139

Similar comment as in DelayedFetch on naming this.



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/24676/#comment98165

Why is this additional logging necessary?

KafkaApis currently has catch-all for unhandled exceptions.

Error codes can be inspected via public access logs if required right?



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/24676/#comment98166

Same here.



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/24676/#comment98558

I'm not sure how scala treats this under the hood, but it _has_ to hold a 
reference to request until the callback is executed. i.e., we probably still 
want to empty the request data.



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/24676/#comment98180

to fetch messages



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/24676/#comment98182

Are these changes intentional?



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/24676/#comment98184

commitStatusView

Also, can we just compute the final status once at the end as opposed to 
preparing an initial response status and modifying later?



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/24676/#comment98194

Do you think it would be clearer to name this onAppend or something similar?



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/24676/#comment97799

Should we rename ReplicaManager to ReplicatedLogManager?



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/24676/#comment98372

(for regular consumer fetch)



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/24676/#comment98380

This is old code and we don't need to address it in this patch, but I was 
wondering if it makes sense to respond sooner if there is at least one error in 
the local append. What do you think? i.e., I don't remember a good reason for 
holding on to the request if there are i  numPartitions errors in local append.



core/src/main/scala/kafka/utils/DelayedItem.scala

[jira] [Commented] (KAFKA-1583) Kafka API Refactoring

2014-10-21 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179460#comment-14179460
 ] 

Joel Koshy commented on KAFKA-1583:
---

I posted review comments, mostly minor.

Did you get a chance to run the system tests with this patch?

 Kafka API Refactoring
 -

 Key: KAFKA-1583
 URL: https://issues.apache.org/jira/browse/KAFKA-1583
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
 KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
 KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
 KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
 KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, 
 KAFKA-1583_2014-10-17_09:56:33.patch


 This is the next step of KAFKA-1430. Details can be found at this page:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179472#comment-14179472
 ] 

Jun Rao commented on KAFKA-1718:


Yes, the broker does recompress all messages in the messageSet passed to 
Log.append together into a single compressed message. In the java/scala 
producer, it'a always the case that a messageSet for a partition in a produce 
request always contains a single compressed message. I guess your go producer 
can send multiple compressed messages for a single partition. Is there any 
benefit in doing that?

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-21 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179485#comment-14179485
 ] 

Evan Huus commented on KAFKA-1718:
--

??I guess your go producer can send multiple compressed messages for a single 
partition??

Yes, that's exactly what it's doing. If it collects enough messages for a 
partition that they would exceed {{message.max.bytes}} when compressed 
together, it batches them and sends each batch as a compressed message in the 
same messageSet.

??Is there any benefit in doing that???

More-or-less to get around the limit on message sizes, which I guess doesn't 
work so well :)

A few points on this then:
* Currently (with default broker settings) you can produce just under 100MiB 
(socket.request.max.bytes) of messages to the broker uncompressed in a single 
request, but you can't produce that seem batch of messages in compressed form 
since the resulting compressed message would almost certainly be larger than 
1MB (message.max.bytes). This discrepancy seems odd to me.
* I understand the desire to limit real message sizes to prevent misbehaving 
producers from causing problems. However, I don't think the limit is 
particularly useful when applied to the compressed meta-messages; why 
shouldn't they be arbitrarily large, within the limits of 
{{socket.request.max.bytes}}?
* I don't think the broker should assume there's only one compressed message 
per message-set; if a message-set contains multiple compressed messages, it 
should process them one-at-a-time and store each individually, rather than 
trying to do them all at once.

Thanks for all your help!

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that 

[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2014-10-21 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179485#comment-14179485
 ] 

Evan Huus edited comment on KAFKA-1718 at 10/22/14 2:26 AM:


??I guess your go producer can send multiple compressed messages for a single 
partition??

Yes, that's exactly what it's doing. If it collects enough messages for a 
partition that they would exceed {{message.max.bytes}} when compressed 
together, it batches them and sends each batch as a compressed message in the 
same messageSet.

??Is there any benefit in doing that???

More-or-less to get around the limit on message sizes, which I guess doesn't 
work so well :)

A few points on this then:
* Currently (with default broker settings) you can produce just under 100MiB 
(socket.request.max.bytes) of messages to the broker uncompressed in a single 
request, but you can't produce that seem batch of messages in compressed form 
since the resulting compressed message would almost certainly be larger than 
1MB (message.max.bytes). This discrepancy seems odd to me.
* I understand the desire to limit real message sizes to prevent misbehaving 
producers from causing problems. However, I don't think the limit is 
particularly useful when applied to the compressed meta-messages; why 
shouldn't they be arbitrarily large, within the limits of 
{{socket.request.max.bytes}}?
* I don't think the broker should assume there's only one compressed message 
per message-set; if a message-set contains multiple compressed messages, it 
should process them one-at-a-time and store each individually, rather than 
trying to do them all at once.

Thanks for all your help!

Edit: If for some reason you decide to keep the current behaviour as-is, please 
document this in the protocol spec on the wiki, since as far as I can the spec 
gives no reason to believe that multiple compressed messages will be combined, 
and that the _combined_ length will be relevant.


was (Author: eapache):
??I guess your go producer can send multiple compressed messages for a single 
partition??

Yes, that's exactly what it's doing. If it collects enough messages for a 
partition that they would exceed {{message.max.bytes}} when compressed 
together, it batches them and sends each batch as a compressed message in the 
same messageSet.

??Is there any benefit in doing that???

More-or-less to get around the limit on message sizes, which I guess doesn't 
work so well :)

A few points on this then:
* Currently (with default broker settings) you can produce just under 100MiB 
(socket.request.max.bytes) of messages to the broker uncompressed in a single 
request, but you can't produce that seem batch of messages in compressed form 
since the resulting compressed message would almost certainly be larger than 
1MB (message.max.bytes). This discrepancy seems odd to me.
* I understand the desire to limit real message sizes to prevent misbehaving 
producers from causing problems. However, I don't think the limit is 
particularly useful when applied to the compressed meta-messages; why 
shouldn't they be arbitrarily large, within the limits of 
{{socket.request.max.bytes}}?
* I don't think the broker should assume there's only one compressed message 
per message-set; if a message-set contains multiple compressed messages, it 
should process them one-at-a-time and store each individually, rather than 
trying to do them all at once.

Thanks for all your help!

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which