Re: Review Request 26755: Patch for KAFKA-1706

2014-10-27 Thread Jiangjie Qin


 On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
 

Joel, thanks a lot for the review! Some comments on your comments.


 On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 18
  https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line18
 
  I'm wondering if this is specific and nuanced enough to make it 
  entirely private to MirrorMaker.scala
  
  OR
  
  if you think it is useful as a generic utility consider putting in 
  org.apache.kafka.clients.common.utils

My first thinking is that this could help provide better control on memory 
management in broader cases in addition to mirror maker, such as consumer side 
data chunk queue, and maybe also for controller message queue.


 On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 36
  https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line36
 
  This can be paraphrased to be simpler:
  
  An element can be enqueued provided the current size (in number of 
  elements) is within the configured capacity and the current size in bytes 
  of the queue is within the configured byte capacity. i.e., the element may 
  be enqueued even if adding it causes the queue's size in bytes to exceed 
  the byte capacity.
  
  Ok, so while I was thinking through the above: is there any benefit to 
  having a count-based capacity when you have a byte-based capacity? i.e., 
  why not have byte-capacity only?

I think there might be three cases where a queue size in number of message 
could help:
1. Because the size function is actually provided by user, if the message has a 
small payload with big overhead on other stuff, the size of bytes might not be 
working well.
2. The queue num size could be used to control the number of message buffered 
in the middle, i.e the failure boundry. For instance if one of the mirror maker 
bounced, if we buffered too many messages in mirror maker, we need to reconsume 
all of them again.
3. In cases where the byte limit only used to protect against of running out of 
memory, but users don't expect the queue to consume that much memory all the 
time. (I'm not sure if it is a valid use case though...)


 On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 82
  https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line82
 
  One significant caveat to this approach (and in the timed variant 
  above) is that if a single large element needs to be enqueued it could 
  potentially block a number of smaller elements from being enqueued. This 
  may be okay in the case of mirror maker though but would make it less 
  useful as a generic utility.

I'm not sure why the big put could block small ones... It is possible that 
there is a super big item put into the queue and makes the queue to pass the 
byte limit by a lot. In that case, all the put will be blocked until a bunch of 
small messages are taken out of the queue. But it seems to be the purpose of 
having a byte limit for the queue.


 On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 100
  https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line100
 
  Can you clarify what this means?

I was trying to say that the poll method does not contend lock with offer. I 
saw similar description from some queue's javadoc, it is probably better to 
remove it...


 On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 109
  https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line109
 
  getAndDecrement(sizeFunction.get(e))

It seems getAndDecrement() does not take argument and will always decrement by 
1.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/#review58497
---


On Oct. 15, 2014, 4:28 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26755/
 ---
 
 (Updated Oct. 15, 2014, 4:28 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1706
 https://issues.apache.org/jira/browse/KAFKA-1706
 
 
 Repository: kafka
 
 
 Description
 ---
 
 changed arguments name
 
 
 correct typo.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/26755/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.

2014-10-27 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1706:

Attachment: KAFKA-1706_2014-10-26_23:47:31.patch

 Adding a byte bounded blocking queue to util.
 -

 Key: KAFKA-1706
 URL: https://issues.apache.org/jira/browse/KAFKA-1706
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, 
 KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch


 We saw many out of memory issues in Mirror Maker. To enhance memory 
 management we want to introduce a ByteBoundedBlockingQueue that has limit on 
 both number of messages and number of bytes in it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26755: Patch for KAFKA-1706

2014-10-27 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/
---

(Updated Oct. 27, 2014, 6:47 a.m.)


Review request for kafka.


Bugs: KAFKA-1706
https://issues.apache.org/jira/browse/KAFKA-1706


Repository: kafka


Description
---

changed arguments name


correct typo.


Diffs (updated)
-

  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/26755/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-1706) Adding a byte bounded blocking queue to util.

2014-10-27 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14184891#comment-14184891
 ] 

Jiangjie Qin commented on KAFKA-1706:
-

Updated reviewboard https://reviews.apache.org/r/26755/diff/
 against branch origin/trunk

 Adding a byte bounded blocking queue to util.
 -

 Key: KAFKA-1706
 URL: https://issues.apache.org/jira/browse/KAFKA-1706
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, 
 KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch


 We saw many out of memory issues in Mirror Maker. To enhance memory 
 management we want to introduce a ByteBoundedBlockingQueue that has limit on 
 both number of messages and number of bytes in it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1706) Adding a byte bounded blocking queue to util.

2014-10-27 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14184893#comment-14184893
 ] 

Jiangjie Qin commented on KAFKA-1706:
-

Updated reviewboard https://reviews.apache.org/r/26755/diff/
 against branch origin/trunk

 Adding a byte bounded blocking queue to util.
 -

 Key: KAFKA-1706
 URL: https://issues.apache.org/jira/browse/KAFKA-1706
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, 
 KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, 
 KAFKA-1706_2014-10-26_23:50:07.patch


 We saw many out of memory issues in Mirror Maker. To enhance memory 
 management we want to introduce a ByteBoundedBlockingQueue that has limit on 
 both number of messages and number of bytes in it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.

2014-10-27 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1706:

Attachment: KAFKA-1706_2014-10-26_23:50:07.patch

 Adding a byte bounded blocking queue to util.
 -

 Key: KAFKA-1706
 URL: https://issues.apache.org/jira/browse/KAFKA-1706
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, 
 KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, 
 KAFKA-1706_2014-10-26_23:50:07.patch


 We saw many out of memory issues in Mirror Maker. To enhance memory 
 management we want to introduce a ByteBoundedBlockingQueue that has limit on 
 both number of messages and number of bytes in it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-260) Add audit trail to kafka

2014-10-27 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185220#comment-14185220
 ] 

Stas Levin commented on KAFKA-260:
--

Hi guys,

We've adopted the data model above in Aletheia 
(https://github.com/outbrain/Aletheia), an open source data delivery framework 
we've been working on here at Outbrain. 
In Aletheia we call these audit trails Breadcrumbs, and have them generated 
by the producer and consumer sides. We're working towards integrating the above 
mentioned patch in order to provide a client side dashboard.

Aletheia is by no means meant to replace Kafka, it is rather an abstraction 
layer on top of Kafka and other messaging systems, as we point out in the wiki.
Having audit capabilities built into Kafka would be really great, meanwhile, 
you're most welcome to check out Aletheia, perhaps you'll find it useful as it 
provides the Breadcrumb generation out of the box.

-Stas

 Add audit trail to kafka
 

 Key: KAFKA-260
 URL: https://issues.apache.org/jira/browse/KAFKA-260
 Project: Kafka
  Issue Type: New Feature
Affects Versions: 0.8.0
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: Picture 18.png, kafka-audit-trail-draft.patch


 LinkedIn has a system that does monitoring on top of our data flow to ensure 
 all data is delivered to all consumers of data. This works by having each 
 logical tier through which data passes produce messages to a central 
 audit-trail topic; these messages give a time period and the number of 
 messages that passed through that tier in that time period. Example of tiers 
 for data might be producer, broker, hadoop-etl, etc. This makes it 
 possible to compare the total events for a given time period to ensure that 
 all events that are produced are consumed by all consumers.
 This turns out to be extremely useful. We also have an application that 
 balances the books and checks that all data is consumed in a timely 
 fashion. This gives graphs for each topic and shows any data loss and the lag 
 at which the data is consumed (if any).
 This would be an optional feature that would allow you to to this kind of 
 reconciliation automatically for all the topics kafka hosts against all the 
 tiers of applications that interact with the data.
 Some details, the proposed format of the data is JSON using the following 
 format for messages:
 {
   time:1301727060032,  // the timestamp at which this audit message is sent
   topic: my_topic_name, // the topic this audit data is for
   tier:producer, // a user-defined tier name
   bucket_start: 130172640, // the beginning of the time bucket this 
 data applies to
   bucket_end: 130172700, // the end of the time bucket this data 
 applies to
   host:my_host_name.datacenter.linkedin.com, // the server that this was 
 sent from
   datacenter:hlx32, // the datacenter this occurred in
   application:newsfeed_service, // a user-defined application name
   guid:51656274-a86a-4dff-b824-8e8e20a6348f, // a unique identifier for 
 this message
   count:43634
 }
 DISCUSSION
 Time is complex:
 1. The audit data must be based on a timestamp in the events not the time on 
 machine processing the event. Using this timestamp means that all downstream 
 consumers will report audit data on the right time bucket. This means that 
 there must be a timestamp in the event, which we don't currently require. 
 Arguably we should just add a timestamp to the events, but I think it is 
 sufficient for now just to allow the user to provide a function to extract 
 the time from their events.
 2. For counts to reconcile exactly we can only do analysis at a granularity 
 based on the least common multiple of the bucket size used by all tiers. The 
 simplest is just to configure them all to use the same bucket size. We 
 currently use a bucket size of 10 mins, but anything from 1-60 mins is 
 probably reasonable.
 For analysis purposes one tier is designated as the source tier and we do 
 reconciliation against this count (e.g. if another tier has less, that is 
 treated as lost, if another tier has more that is duplication).
 Note that this system makes false positives possible since you can lose an 
 audit message. It also makes false negatives possible since if you lose both 
 normal messages and the associated audit messages it will appear that 
 everything adds up. The later problem is astronomically unlikely to happen 
 exactly, though.
 This would integrate into the client (producer and consumer both) in the 
 following way:
 1. The user provides a way to get timestamps from messages (required)
 2. The user configures the tier name, host name, datacenter name, and 
 application name as part of the consumer and producer config. We can provide 
 reasonable defaults if not supplied (e.g. if it is a Producer then 

Re: Review Request 26885: Patch for KAFKA-1642

2014-10-27 Thread Ewen Cheslack-Postava


 On Oct. 27, 2014, 12:13 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 122
  https://reviews.apache.org/r/26885/diff/3/?file=731636#file731636line122
 
  The comments When connecting or connected, this handles slow/stalled 
  connections here are a bit misleading: after checking the code I realize 
  connectionDelay is only triggered to detemine the delay in milis that we 
  can re-check connectivity for node that is not connected, and hence if the 
  node is connected again while we are determining its delay, we just set it 
  to MAX.
  
  Instead of making it general to the KafkaClient interface, shall we 
  just add this to the code block of line 155?

It gets triggered any time NetworkClient.ready returns false for a node. The 
obvious case is that it will return not ready when disconnected, but it also 
does so when connecting or when connected but inFlightRequests.canSendMore() 
returns false (thus the mention of slow/stalled connections. The important 
thing is that the value returned *is* MAX_VALUE in those latter cases because 
neither one will be resolved by polling -- they both require an external event 
(connection established/failed or outstanding request receives a response) 
which should wake up the event loop when there's something to do. That keeps us 
from polling unnecessarily. Previously there were conditions in which 
connections in these states could trigger busy waiting of the poll loop.

I don't think we can get the same effect just inlining the code because it uses 
state that's only available through ClusterConnectionStates, which is private 
to NetworkClient. The KafkaClient only exposes the higher level concept of 
ready.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review58575
---


On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26885/
 ---
 
 (Updated Oct. 23, 2014, 11:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1642
 https://issues.apache.org/jira/browse/KAFKA-1642
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fixes two issues with the computation of ready nodes and poll timeouts in
 Sender/RecordAccumulator:
 
 1. The timeout was computed incorrectly because it took into account all 
 nodes,
 even if they had data to send such that their timeout would be 0. However, 
 nodes
 were then filtered based on whether it was possible to send (i.e. their
 connection was still good) which could result in nothing to send and a 0
 timeout, resulting in busy looping. Instead, the timeout needs to be computed
 only using data that cannot be immediately sent, i.e. where the timeout will 
 be
 greater than 0. This timeout is only used if, after filtering by whether
 connections are ready for sending, there is no data to be sent. Other events 
 can
 wake the thread up earlier, e.g. a client reconnects and becomes ready again.
 
 2. One of the conditions indicating whether data is sendable is whether a
 timeout has expired -- either the linger time or the retry backoff. This
 condition wasn't accounting for both cases properly, always using the linger
 time. This means the retry backoff was probably not being respected.
 
 KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
 none can send data because they are in a connection backoff period.
 
 
 Addressing Jun's comments.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 d304660f29246e9600efe3ddb28cfcc2b074bed3 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 29658d4a15f112dc0af5ce517eaab93e6f00134b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  c5d470011d334318d5ee801021aadd0c000974a6 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 aae8d4a1e98279470587d397cc779a9baf6fee6c 
   
 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  0762b35abba0551f23047348c5893bb8c9acff14 
 
 Diff: https://reviews.apache.org/r/26885/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Review Request 27232: Patch for KAFKA-559

2014-10-27 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27232/
---

Review request for kafka.


Bugs: KAFKA-559
https://issues.apache.org/jira/browse/KAFKA-559


Repository: kafka


Description
---

Addressing Joel's comments.


Fix naming: entires - entries.


Only remove partitions from a group if all partitions were last modified before 
the threshold date.


Diffs
-

  core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/27232/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-559) Garbage collect old consumer metadata entries

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-559:

Attachment: KAFKA-559.patch

 Garbage collect old consumer metadata entries
 -

 Key: KAFKA-559
 URL: https://issues.apache.org/jira/browse/KAFKA-559
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Tejas Patil
  Labels: newbie, project
 Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch


 Many use cases involve tranient consumers. These consumers create entries 
 under their consumer group in zk and maintain offsets there as well. There is 
 currently no way to delete these entries. It would be good to have a tool 
 that did something like
   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
 --zookeeper [zk_connect]
 This would scan through consumer group entries and delete any that had no 
 offset update since the given date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-559) Garbage collect old consumer metadata entries

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-559:

Assignee: Ewen Cheslack-Postava  (was: Tejas Patil)
  Status: Patch Available  (was: Open)

 Garbage collect old consumer metadata entries
 -

 Key: KAFKA-559
 URL: https://issues.apache.org/jira/browse/KAFKA-559
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Ewen Cheslack-Postava
  Labels: newbie, project
 Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch


 Many use cases involve tranient consumers. These consumers create entries 
 under their consumer group in zk and maintain offsets there as well. There is 
 currently no way to delete these entries. It would be good to have a tool 
 that did something like
   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
 --zookeeper [zk_connect]
 This would scan through consumer group entries and delete any that had no 
 offset update since the given date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185377#comment-14185377
 ] 

Ewen Cheslack-Postava commented on KAFKA-559:
-

Created reviewboard https://reviews.apache.org/r/27232/diff/
 against branch origin/trunk

 Garbage collect old consumer metadata entries
 -

 Key: KAFKA-559
 URL: https://issues.apache.org/jira/browse/KAFKA-559
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Tejas Patil
  Labels: newbie, project
 Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch


 Many use cases involve tranient consumers. These consumers create entries 
 under their consumer group in zk and maintain offsets there as well. There is 
 currently no way to delete these entries. It would be good to have a tool 
 that did something like
   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
 --zookeeper [zk_connect]
 This would scan through consumer group entries and delete any that had no 
 offset update since the given date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185380#comment-14185380
 ] 

Ewen Cheslack-Postava commented on KAFKA-559:
-

This is an updated version of the patch by [~tejas.patil]. I'm pretty sure I've 
addressed all the issues [~jjkoshy] brought up.

 Garbage collect old consumer metadata entries
 -

 Key: KAFKA-559
 URL: https://issues.apache.org/jira/browse/KAFKA-559
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Ewen Cheslack-Postava
  Labels: newbie, project
 Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch


 Many use cases involve tranient consumers. These consumers create entries 
 under their consumer group in zk and maintain offsets there as well. There is 
 currently no way to delete these entries. It would be good to have a tool 
 that did something like
   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
 --zookeeper [zk_connect]
 This would scan through consumer group entries and delete any that had no 
 offset update since the given date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-1732:


 Summary: DumpLogSegments tool fails when path has a '.'
 Key: KAFKA-1732
 URL: https://issues.apache.org/jira/browse/KAFKA-1732
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Priority: Minor


Using DumpLogSegments in a directory that has a '.' that isn't part of the file 
extension causes an exception:

{code}
16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments  --file 
/Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
 --verify-index-only
Dumping 
/Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
Exception in thread main java.io.FileNotFoundException: 
/Users/ewencp/kafka.log (No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at kafka.utils.Utils$.openChannel(Utils.scala:162)
at kafka.log.FileMessageSet.init(FileMessageSet.scala:74)
at 
kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
at 
kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
at 
kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.

2014-10-27 Thread Marc Chung (JIRA)
Marc Chung created KAFKA-1733:
-

 Summary: Producer.send will block indeterminately when broker is 
unavailable.
 Key: KAFKA-1733
 URL: https://issues.apache.org/jira/browse/KAFKA-1733
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Reporter: Marc Chung
Assignee: Jun Rao


This is a follow up to the conversation here:

https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E

During ClientUtils.fetchTopicMetadata, if the broker is unavailable, 
socket.connect will block indeterminately. Any retry policy 
(message.send.max.retries) further increases the time spent waiting for the 
socket to connect.

The root fix is to add a connection timeout value to the BlockingChannel's 
socket configuration, like so:

{noformat}
-channel.socket.connect(new InetSocketAddress(host, port))
+channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
{noformat}

The simplest thing to do here would be to have a constant, default value that 
would be applied to every socket configuration. 

Is that acceptable? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.

2014-10-27 Thread Marc Chung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185539#comment-14185539
 ] 

Marc Chung commented on KAFKA-1733:
---

I have a patch (work in progress) here: 
https://github.com/mchung/kafka/commit/87b8ddbfe23dc887f56fa6f9ea3669733933c49b

 Producer.send will block indeterminately when broker is unavailable.
 

 Key: KAFKA-1733
 URL: https://issues.apache.org/jira/browse/KAFKA-1733
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Reporter: Marc Chung
Assignee: Jun Rao

 This is a follow up to the conversation here:
 https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E
 During ClientUtils.fetchTopicMetadata, if the broker is unavailable, 
 socket.connect will block indeterminately. Any retry policy 
 (message.send.max.retries) further increases the time spent waiting for the 
 socket to connect.
 The root fix is to add a connection timeout value to the BlockingChannel's 
 socket configuration, like so:
 {noformat}
 -channel.socket.connect(new InetSocketAddress(host, port))
 +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
 {noformat}
 The simplest thing to do here would be to have a constant, default value that 
 would be applied to every socket configuration. 
 Is that acceptable? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 27238: Patch for KAFKA-1732

2014-10-27 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27238/
---

Review request for kafka.


Bugs: KAFKA-1732
https://issues.apache.org/jira/browse/KAFKA-1732


Repository: kafka


Description
---

KAFKA-1732 Handle paths with '.' properly in DumpLogSegments.


Diffs
-

  core/src/main/scala/kafka/tools/DumpLogSegments.scala 
8e9d47b8d4adc5754ed8861aa04ddd3c6b629e3d 

Diff: https://reviews.apache.org/r/27238/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Commented] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185586#comment-14185586
 ] 

Ewen Cheslack-Postava commented on KAFKA-1732:
--

Created reviewboard https://reviews.apache.org/r/27238/diff/
 against branch origin/trunk

 DumpLogSegments tool fails when path has a '.'
 --

 Key: KAFKA-1732
 URL: https://issues.apache.org/jira/browse/KAFKA-1732
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Priority: Minor
 Attachments: KAFKA-1732.patch


 Using DumpLogSegments in a directory that has a '.' that isn't part of the 
 file extension causes an exception:
 {code}
 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
 kafka.tools.DumpLogSegments  --file 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
  --verify-index-only
 Dumping 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
 Exception in thread main java.io.FileNotFoundException: 
 /Users/ewencp/kafka.log (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.init(FileInputStream.java:146)
   at kafka.utils.Utils$.openChannel(Utils.scala:162)
   at kafka.log.FileMessageSet.init(FileMessageSet.scala:74)
   at 
 kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1732:
-
Assignee: Ewen Cheslack-Postava
  Status: Patch Available  (was: Open)

 DumpLogSegments tool fails when path has a '.'
 --

 Key: KAFKA-1732
 URL: https://issues.apache.org/jira/browse/KAFKA-1732
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: KAFKA-1732.patch


 Using DumpLogSegments in a directory that has a '.' that isn't part of the 
 file extension causes an exception:
 {code}
 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
 kafka.tools.DumpLogSegments  --file 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
  --verify-index-only
 Dumping 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
 Exception in thread main java.io.FileNotFoundException: 
 /Users/ewencp/kafka.log (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.init(FileInputStream.java:146)
   at kafka.utils.Utils$.openChannel(Utils.scala:162)
   at kafka.log.FileMessageSet.init(FileMessageSet.scala:74)
   at 
 kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1732:
-
Attachment: KAFKA-1732.patch

 DumpLogSegments tool fails when path has a '.'
 --

 Key: KAFKA-1732
 URL: https://issues.apache.org/jira/browse/KAFKA-1732
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Priority: Minor
 Attachments: KAFKA-1732.patch


 Using DumpLogSegments in a directory that has a '.' that isn't part of the 
 file extension causes an exception:
 {code}
 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
 kafka.tools.DumpLogSegments  --file 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
  --verify-index-only
 Dumping 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
 Exception in thread main java.io.FileNotFoundException: 
 /Users/ewencp/kafka.log (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.init(FileInputStream.java:146)
   at kafka.utils.Utils$.openChannel(Utils.scala:162)
   at kafka.log.FileMessageSet.init(FileMessageSet.scala:74)
   at 
 kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Kafka Security?

2014-10-27 Thread Stephenson, John L
List Users,
Does anyone know when/if Kafka security features are being planned?   I 
haven't seen much on the net outside of the following proposal:  
https://cwiki.apache.org/confluence/display/KAFKA/Security.

Thanks!
john


[jira] [Created] (KAFKA-1734) System test metric plotting nonexistent file warnings

2014-10-27 Thread Andrew Olson (JIRA)
Andrew Olson created KAFKA-1734:
---

 Summary: System test metric plotting nonexistent file warnings
 Key: KAFKA-1734
 URL: https://issues.apache.org/jira/browse/KAFKA-1734
 Project: Kafka
  Issue Type: Bug
Reporter: Andrew Olson
Priority: Minor


Running the system tests (trunk code), there are many The file ... does not 
exist for plotting (metrics) warning messages, for example,

{noformat}
2014-10-27 14:47:58,478 - WARNING - The file 
/opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/kafka.network.RequestMetrics.Produce-RemoteTimeMs.csv
 does not exist for plotting (metrics)
{noformat}

Looks like the generated metric file names only include the last part of the 
metric, e.g. Produce-RemoteTimeMs.csv not 
kafka.network.RequestMetrics.Produce-RemoteTimeMs.csv.

{noformat}
$ ls 
/opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/*Produce*
/opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/Produce-RemoteTimeMs.csv
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Security?

2014-10-27 Thread Gwen Shapira
This is very much work in progress.
You can follow the Jira here to see how it goes:
https://issues.apache.org/jira/browse/KAFKA-1682

On Mon, Oct 27, 2014 at 11:49 AM, Stephenson, John L
john.l.stephen...@lmco.com wrote:
 List Users,
 Does anyone know when/if Kafka security features are being planned?   I 
 haven't seen much on the net outside of the following proposal:  
 https://cwiki.apache.org/confluence/display/KAFKA/Security.

 Thanks!
 john


[jira] [Updated] (KAFKA-1731) add config/jmx changes in 0.8.2 doc

2014-10-27 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1731:
---
Fix Version/s: 0.8.2
 Assignee: Jun Rao

I made a pass on the site doc to add the new broker side configs (offset 
management related configs will be added in kafka-1729) and the important jmxs. 
This is already committed to svn. I will leave this ticket open for a few more 
days for comments.

 add config/jmx changes in 0.8.2 doc
 ---

 Key: KAFKA-1731
 URL: https://issues.apache.org/jira/browse/KAFKA-1731
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8.2






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1731) add config/jmx changes in 0.8.2 doc

2014-10-27 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185818#comment-14185818
 ] 

Gwen Shapira commented on KAFKA-1731:
-

Any chance you can upload a patch so we can see what changed?

 add config/jmx changes in 0.8.2 doc
 ---

 Key: KAFKA-1731
 URL: https://issues.apache.org/jira/browse/KAFKA-1731
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8.2






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1731) add config/jmx changes in 0.8.2 doc

2014-10-27 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1731:
---
Attachment: config-jmx_082.patch

Attached please find the patch.

 add config/jmx changes in 0.8.2 doc
 ---

 Key: KAFKA-1731
 URL: https://issues.apache.org/jira/browse/KAFKA-1731
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8.2

 Attachments: config-jmx_082.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.

2014-10-27 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1733:
-
Reviewer: Jun Rao
Assignee: (was: Jun Rao)

 Producer.send will block indeterminately when broker is unavailable.
 

 Key: KAFKA-1733
 URL: https://issues.apache.org/jira/browse/KAFKA-1733
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Reporter: Marc Chung

 This is a follow up to the conversation here:
 https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E
 During ClientUtils.fetchTopicMetadata, if the broker is unavailable, 
 socket.connect will block indeterminately. Any retry policy 
 (message.send.max.retries) further increases the time spent waiting for the 
 socket to connect.
 The root fix is to add a connection timeout value to the BlockingChannel's 
 socket configuration, like so:
 {noformat}
 -channel.socket.connect(new InetSocketAddress(host, port))
 +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
 {noformat}
 The simplest thing to do here would be to have a constant, default value that 
 would be applied to every socket configuration. 
 Is that acceptable? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 27238: Patch for KAFKA-1732

2014-10-27 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27238/#review58723
---

Ship it!


Ship It!

- Neha Narkhede


On Oct. 27, 2014, 6:41 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27238/
 ---
 
 (Updated Oct. 27, 2014, 6:41 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1732
 https://issues.apache.org/jira/browse/KAFKA-1732
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1732 Handle paths with '.' properly in DumpLogSegments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/DumpLogSegments.scala 
 8e9d47b8d4adc5754ed8861aa04ddd3c6b629e3d 
 
 Diff: https://reviews.apache.org/r/27238/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1732:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patch. Pushed to trunk and 0.8.2

 DumpLogSegments tool fails when path has a '.'
 --

 Key: KAFKA-1732
 URL: https://issues.apache.org/jira/browse/KAFKA-1732
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: KAFKA-1732.patch


 Using DumpLogSegments in a directory that has a '.' that isn't part of the 
 file extension causes an exception:
 {code}
 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
 kafka.tools.DumpLogSegments  --file 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
  --verify-index-only
 Dumping 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
 Exception in thread main java.io.FileNotFoundException: 
 /Users/ewencp/kafka.log (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.init(FileInputStream.java:146)
   at kafka.utils.Utils$.openChannel(Utils.scala:162)
   at kafka.log.FileMessageSet.init(FileMessageSet.scala:74)
   at 
 kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26755: Patch for KAFKA-1706

2014-10-27 Thread Joel Koshy


 On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 109
  https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line109
 
  getAndDecrement(sizeFunction.get(e))
 
 Jiangjie Qin wrote:
 It seems getAndDecrement() does not take argument and will always 
 decrement by 1.

ah yes you are right


 On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 82
  https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line82
 
  One significant caveat to this approach (and in the timed variant 
  above) is that if a single large element needs to be enqueued it could 
  potentially block a number of smaller elements from being enqueued. This 
  may be okay in the case of mirror maker though but would make it less 
  useful as a generic utility.
 
 Jiangjie Qin wrote:
 I'm not sure why the big put could block small ones... It is possible 
 that there is a super big item put into the queue and makes the queue to pass 
 the byte limit by a lot. In that case, all the put will be blocked until a 
 bunch of small messages are taken out of the queue. But it seems to be the 
 purpose of having a byte limit for the queue.

I looked again. Yes you are right. It should not block smaller puts. Now I'm 
going to ask the question from the other side of the table: since you are just 
notifying waiting threads, it is possible for a large put to get starved if 
there are a lot of smaller puts that get notified earlier. To the best of my 
knowledge the JVM does not guarantee fairness in unblocking multiple contending 
threads. Ideally there should be some notion of maximum wait before a put 
attempt takes priority over others. i.e., these are nuances that may be a 
compelling reason to make it a specialized utility within MirrorMaker itself 
since it is not general enough (yet).


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/#review58497
---


On Oct. 27, 2014, 6:50 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26755/
 ---
 
 (Updated Oct. 27, 2014, 6:50 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1706
 https://issues.apache.org/jira/browse/KAFKA-1706
 
 
 Repository: kafka
 
 
 Description
 ---
 
 changed arguments name
 
 
 correct typo.
 
 
 Incorporated Joel's comments. Also fixed negative queue size problem.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/26755/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[DISCUSSION] Nested compression in Kafka?

2014-10-27 Thread Guozhang Wang
Hello folks,

I came across this testComplexCompressDecompress in
kafka.message.MessageCompressionTest while I'm working some consumer
decompression optimization. This test checks if nested compression is
supported.

I remember vaguely that some time ago we decide not to support nested
compression at Kafka, and in the new producer's MemoryRecords I also make
this assumption in this iterator implementation. Is that still the case? If
yes shall we remove this test case?

-- Guozhang


[jira] [Created] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream

2014-10-27 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1735:


 Summary: MemoryRecords.Iterator needs to handle partial reads from 
compressed stream
 Key: KAFKA-1735
 URL: https://issues.apache.org/jira/browse/KAFKA-1735
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0


Found a bug in the MemoryRecords.Iterator implementation, where 

{code}
stream.read(recordBuffer, 0, size)
{code}

can read less than size'ed bytes, and rest of the recordBuffer would set to 
\0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 27256: Fix KAFKA-1735

2014-10-27 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27256/
---

Review request for kafka.


Bugs: KAFKA-1735
https://issues.apache.org/jira/browse/KAFKA-1735


Repository: kafka


Description
---

Handle partial reads from compressed stream


Diffs
-

  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 

Diff: https://reviews.apache.org/r/27256/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Updated] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream

2014-10-27 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1735:
-
Status: Patch Available  (was: Open)

 MemoryRecords.Iterator needs to handle partial reads from compressed stream
 ---

 Key: KAFKA-1735
 URL: https://issues.apache.org/jira/browse/KAFKA-1735
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1735.patch


 Found a bug in the MemoryRecords.Iterator implementation, where 
 {code}
 stream.read(recordBuffer, 0, size)
 {code}
 can read less than size'ed bytes, and rest of the recordBuffer would set to 
 \0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream

2014-10-27 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1735:
-
Attachment: KAFKA-1735.patch

 MemoryRecords.Iterator needs to handle partial reads from compressed stream
 ---

 Key: KAFKA-1735
 URL: https://issues.apache.org/jira/browse/KAFKA-1735
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1735.patch


 Found a bug in the MemoryRecords.Iterator implementation, where 
 {code}
 stream.read(recordBuffer, 0, size)
 {code}
 can read less than size'ed bytes, and rest of the recordBuffer would set to 
 \0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream

2014-10-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186072#comment-14186072
 ] 

Guozhang Wang commented on KAFKA-1735:
--

Created reviewboard https://reviews.apache.org/r/27256/diff/
 against branch origin/trunk

 MemoryRecords.Iterator needs to handle partial reads from compressed stream
 ---

 Key: KAFKA-1735
 URL: https://issues.apache.org/jira/browse/KAFKA-1735
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1735.patch


 Found a bug in the MemoryRecords.Iterator implementation, where 
 {code}
 stream.read(recordBuffer, 0, size)
 {code}
 can read less than size'ed bytes, and rest of the recordBuffer would set to 
 \0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-10-27 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186104#comment-14186104
 ] 

Jiangjie Qin commented on KAFKA-1647:
-

Updated reviewboard https://reviews.apache.org/r/26373/diff/
 against branch origin/trunk

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Joel Koshy
Assignee: Jiangjie Qin
Priority: Critical
  Labels: newbie++
 Fix For: 0.8.2

 Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, 
 KAFKA-1647_2014-10-18_00:26:51.patch, KAFKA-1647_2014-10-21_23:08:43.patch, 
 KAFKA-1647_2014-10-27_17:19:07.patch


 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 p0: leader = b0, ISR = {b0, b1}
 p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make itself (b1) the leader for p1 (and create the local replica 
 object corresponding to p1). It will however abort the become follower 
 transition for p0 because the designated leader b0 is offline. So it will not 
 create the local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-10-27 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1647:

Attachment: KAFKA-1647_2014-10-27_17:19:07.patch

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Joel Koshy
Assignee: Jiangjie Qin
Priority: Critical
  Labels: newbie++
 Fix For: 0.8.2

 Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, 
 KAFKA-1647_2014-10-18_00:26:51.patch, KAFKA-1647_2014-10-21_23:08:43.patch, 
 KAFKA-1647_2014-10-27_17:19:07.patch


 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 p0: leader = b0, ISR = {b0, b1}
 p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make itself (b1) the leader for p1 (and create the local replica 
 object corresponding to p1). It will however abort the become follower 
 transition for p0 because the designated leader b0 is offline. So it will not 
 create the local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-27 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/
---

(Updated Oct. 28, 2014, 12:20 a.m.)


Review request for kafka.


Bugs: KAFKA-1647
https://issues.apache.org/jira/browse/KAFKA-1647


Repository: kafka


Description
---

Addressed Joel's comments.


the version 2 code seems to be submitted by mistake... This should be the code 
for review that addressed Joel's comments.


Addressed Jun's comments. Will do tests to verify if it works.


Addressed Joel's comments, we do not need to check the if leader exits for not 
when adding fetcher.


Diffs
-

  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 

Diff: https://reviews.apache.org/r/26373/diff/


Testing (updated)
---

Followed Joel's testing step. I was able to reproduce the problem without the 
patch and the WARN message goes away after applied the patch.


Thanks,

Jiangjie Qin



Jenkins build is back to normal : Kafka-trunk #319

2014-10-27 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/319/changes



[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-27 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186124#comment-14186124
 ] 

Jun Rao commented on KAFKA-1481:


Vladimir,

Thanks for the patch. Really appreciate your help. I realized that this is one 
of the biggest technical debt that we have accumulated over time. So, it may 
take some time to sort this out. So, bear with me. Some more comments.

30. About Taggable, I still have mixed feelings. I can see why you created it. 
However, my reasoning is that for a lot of the case classes (ClientIdTopic, 
CliendIdAndBroker) that we create, it's weird that some of them are taggable 
and some of them are not, depending whether they are used for tagging metric 
names or not. Those classes have no direct relationships with the metrics. 
Similarly, we only need to be aware of tags when creating metrics. Also, 
because of this, we change the constructor of SimpleConsumer. Since this is an 
API change, we should really try to avoid it. 

My feeling is that it's probably simpler if we just create regular case classes 
as before and generate metric tags explicitly when we create the metric. For 
example, in AbstractFetcherThread, we can do

class FetcherStats(clientIdAndBroker: ClientIdAndBroker) extends 
KafkaMetricsGroup {
  val requestRate = newMeter(RequestsPerSec, requests, TimeUnit.SECONDS,
Map(cliendId - 
clientIdAndBroker.clientId,
brokerHost - 
clientIdAndBroker.host,
brokerPort - 
clientIdAndBroker.port))

and just have ClientIdAndBroker be the following case class.

case class ClientIdAndBroker(clientId: String, host: String, port: Int)

This way, the code is a bit cleaner since all the metric tag related stuff are 
isolated to those places when the metrics are created. So, I'd suggest that we 
remove Taggable.

31. AbstractFetcherThread:
31.1 You changed the meaning of clientId. clientId is used in the fetch request 
and we want to leave it as just the clientId string. Since the clientId should 
be uniquely representing a particular consumer client, we just need to include 
the clientId in the metric name. We don't need to include the consumer id in 
either the fetch request or the metric name since it's too long and has 
redundant info. 
31.2 FetcherLagStats: This is an existing problem. FetcherLagMetrics shouldn't 
be keyed off ClientIdBrokerTopicPartition. It should be keyed off 
ClientIdTopicPartition. This way, the metric name remains the same independent 
of the current leader of those partitions.

32. ZookeeperConsumerConnector:
32.1 FetchQueueSize: I agree that the metric name just needs to be tagged with 
clientId, topic and threadId. We don't need to include the consumerId since 
it's too long (note that topicThread._2 includes both the consumerId and the 
threadId).

33. KafkaMetricsGroup: Duplicate entries.
// kafka.consumer.ConsumerTopicStats -- kafka.consumer.{ConsumerIterator, 
PartitionTopicInfo}
explicitMetricName(kafka.consumer, ConsumerTopicMetrics, 
MessagesPerSec),
explicitMetricName(kafka.consumer, ConsumerTopicMetrics, 
MessagesPerSec),

// kafka.consumer.ConsumerTopicStats
explicitMetricName(kafka.consumer, ConsumerTopicMetrics, BytesPerSec),
explicitMetricName(kafka.consumer, ConsumerTopicMetrics, BytesPerSec),

// kafka.consumer.FetchRequestAndResponseStats -- 
kafka.consumer.SimpleConsumer
explicitMetricName(kafka.consumer, FetchRequestAndResponseMetrics, 
FetchResponseSize),
explicitMetricName(kafka.consumer, FetchRequestAndResponseMetrics, 
FetchRequestRateAndTimeMs),
explicitMetricName(kafka.consumer, FetchRequestAndResponseMetrics, 
FetchResponseSize),
explicitMetricName(kafka.consumer, FetchRequestAndResponseMetrics, 
FetchRequestRateAndTimeMs),

/**
 * ProducerRequestStats -- SyncProducer
 * metric for SyncProducer in fetchTopicMetaData() needs to be removed when 
consumer is closed.
 */
explicitMetricName(kafka.producer, ProducerRequestMetrics, 
ProducerRequestRateAndTimeMs),
explicitMetricName(kafka.producer, ProducerRequestMetrics, 
ProducerRequestSize),
explicitMetricName(kafka.producer, ProducerRequestMetrics, 
ProducerRequestRateAndTimeMs),
explicitMetricName(kafka.producer, ProducerRequestMetrics, 
ProducerRequestSize)

34. AbstractFetcherManager: Could you put the followings in 2 separate lines? 
Similar things happen in a few other files. Perhaps you need to change the 
formatting in your IDE?

   }, metricPrefix.toTags

  private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers



 Stop using dashes AND underscores as separators in MBean names
 --

 Key: 

[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-27 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186130#comment-14186130
 ] 

Jay Kreps commented on KAFKA-1501:
--

Nice, so statistically it is 93% likely to be fixed, then!

So since this changes the socket server default is this the right thing to do? 
Could this have any negative side effects in production? I actually don't 
really understand the effect of this option or why lack of it was causing the 
failure. Could you explain?

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1501.patch, KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Joe Stein
The Apache Kafka community is pleased to announce the beta release for Apache 
Kafka 0.8.2.

The 0.8.2-beta release introduces many new features, improvements and fixes 
including:
 - A new Java producer for ease of implementation and enhanced performance.
 - Delete topic support.
 - Per topic configuration of preference for consistency over availability.
 - Scala 2.11 support and dropping support for Scala 2.8.
 - LZ4 Compression.

All of the changes in this release can be found: 
https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html

Apache Kafka is high-throughput, publish-subscribe messaging system rethought 
of as a distributed commit log.

** Fast = A single Kafka broker can handle hundreds of megabytes of reads and 
writes per second from thousands of clients.

** Scalable = Kafka is designed to allow a single cluster to serve as the 
central data backbone 
for a large organization. It can be elastically and transparently expanded 
without downtime. 
Data streams are partitioned and spread over a cluster of machines to allow 
data streams 
larger than the capability of any single machine and to allow clusters of 
co-ordinated consumers.

** Durable = Messages are persisted on disk and replicated within the cluster 
to prevent 
data loss. Each broker can handle terabytes of messages without performance 
impact.

** Distributed by Design = Kafka has a modern cluster-centric design that 
offers 
strong durability and fault-tolerance guarantees.

You can download the release from: http://kafka.apache.org/downloads.html

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at 
http://kafka.apache.org/



Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Jay Kreps
I actually don't see the beta release on that download page:
http://kafka.apache.org/downloads.html

-Jay

On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote:

 The Apache Kafka community is pleased to announce the beta release for
 Apache Kafka 0.8.2.

 The 0.8.2-beta release introduces many new features, improvements and
 fixes including:
  - A new Java producer for ease of implementation and enhanced performance.
  - Delete topic support.
  - Per topic configuration of preference for consistency over availability.
  - Scala 2.11 support and dropping support for Scala 2.8.
  - LZ4 Compression.

 All of the changes in this release can be found:
 https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html

 Apache Kafka is high-throughput, publish-subscribe messaging system
 rethought of as a distributed commit log.

 ** Fast = A single Kafka broker can handle hundreds of megabytes of reads
 and
 writes per second from thousands of clients.

 ** Scalable = Kafka is designed to allow a single cluster to serve as the
 central data backbone
 for a large organization. It can be elastically and transparently expanded
 without downtime.
 Data streams are partitioned and spread over a cluster of machines to
 allow data streams
 larger than the capability of any single machine and to allow clusters of
 co-ordinated consumers.

 ** Durable = Messages are persisted on disk and replicated within the
 cluster to prevent
 data loss. Each broker can handle terabytes of messages without
 performance impact.

 ** Distributed by Design = Kafka has a modern cluster-centric design that
 offers
 strong durability and fault-tolerance guarantees.

 You can download the release from: http://kafka.apache.org/downloads.html

 We welcome your help and feedback. For more information on how to
 report problems, and to get involved, visit the project website at
 http://kafka.apache.org/




Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Gwen Shapira
Strange. I'm seeing it.

Browser cache?

On Mon, Oct 27, 2014 at 5:59 PM, Jay Kreps jay.kr...@gmail.com wrote:
 I actually don't see the beta release on that download page:
 http://kafka.apache.org/downloads.html

 -Jay

 On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote:

 The Apache Kafka community is pleased to announce the beta release for
 Apache Kafka 0.8.2.

 The 0.8.2-beta release introduces many new features, improvements and
 fixes including:
  - A new Java producer for ease of implementation and enhanced performance.
  - Delete topic support.
  - Per topic configuration of preference for consistency over availability.
  - Scala 2.11 support and dropping support for Scala 2.8.
  - LZ4 Compression.

 All of the changes in this release can be found:
 https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html

 Apache Kafka is high-throughput, publish-subscribe messaging system
 rethought of as a distributed commit log.

 ** Fast = A single Kafka broker can handle hundreds of megabytes of reads
 and
 writes per second from thousands of clients.

 ** Scalable = Kafka is designed to allow a single cluster to serve as the
 central data backbone
 for a large organization. It can be elastically and transparently expanded
 without downtime.
 Data streams are partitioned and spread over a cluster of machines to
 allow data streams
 larger than the capability of any single machine and to allow clusters of
 co-ordinated consumers.

 ** Durable = Messages are persisted on disk and replicated within the
 cluster to prevent
 data loss. Each broker can handle terabytes of messages without
 performance impact.

 ** Distributed by Design = Kafka has a modern cluster-centric design that
 offers
 strong durability and fault-tolerance guarantees.

 You can download the release from: http://kafka.apache.org/downloads.html

 We welcome your help and feedback. For more information on how to
 report problems, and to get involved, visit the project website at
 http://kafka.apache.org/




Re: Review Request 26755: Patch for KAFKA-1706

2014-10-27 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/#review58725
---


Another thing I forgot to mention in the earlier review: we definitely should 
have a unit test for this. You will need to allow passing in the Time interface 
and use MockTime in the test.


core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/26755/#comment99850

Unused



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/26755/#comment99860

if



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/26755/#comment99864

no need the return
you can add on line 63:
else {
  false
}

(and remove the false at the very end)

Equivalent, but a little cleaner to look at



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/26755/#comment99865

Again, this is obviously stylistic, but in small methods like this there is 
little need to return from the middle.

Can you restructure it to something like:

if (...)
  false
else {
  ...
  success
}



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/26755/#comment99866

Same here


- Joel Koshy


On Oct. 27, 2014, 6:50 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26755/
 ---
 
 (Updated Oct. 27, 2014, 6:50 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1706
 https://issues.apache.org/jira/browse/KAFKA-1706
 
 
 Repository: kafka
 
 
 Description
 ---
 
 changed arguments name
 
 
 correct typo.
 
 
 Incorporated Joel's comments. Also fixed negative queue size problem.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/26755/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Jay Kreps
Yeah it must be a caching thing because others in the same office do see it
(but not all). And ctrl-shift-r doesn't seem to help. Nevermind :-)

-Jay

On Mon, Oct 27, 2014 at 6:00 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Strange. I'm seeing it.

 Browser cache?

 On Mon, Oct 27, 2014 at 5:59 PM, Jay Kreps jay.kr...@gmail.com wrote:
  I actually don't see the beta release on that download page:
  http://kafka.apache.org/downloads.html
 
  -Jay
 
  On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote:
 
  The Apache Kafka community is pleased to announce the beta release for
  Apache Kafka 0.8.2.
 
  The 0.8.2-beta release introduces many new features, improvements and
  fixes including:
   - A new Java producer for ease of implementation and enhanced
 performance.
   - Delete topic support.
   - Per topic configuration of preference for consistency over
 availability.
   - Scala 2.11 support and dropping support for Scala 2.8.
   - LZ4 Compression.
 
  All of the changes in this release can be found:
  https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
 
  Apache Kafka is high-throughput, publish-subscribe messaging system
  rethought of as a distributed commit log.
 
  ** Fast = A single Kafka broker can handle hundreds of megabytes of
 reads
  and
  writes per second from thousands of clients.
 
  ** Scalable = Kafka is designed to allow a single cluster to serve as
 the
  central data backbone
  for a large organization. It can be elastically and transparently
 expanded
  without downtime.
  Data streams are partitioned and spread over a cluster of machines to
  allow data streams
  larger than the capability of any single machine and to allow clusters
 of
  co-ordinated consumers.
 
  ** Durable = Messages are persisted on disk and replicated within the
  cluster to prevent
  data loss. Each broker can handle terabytes of messages without
  performance impact.
 
  ** Distributed by Design = Kafka has a modern cluster-centric design
 that
  offers
  strong durability and fault-tolerance guarantees.
 
  You can download the release from:
 http://kafka.apache.org/downloads.html
 
  We welcome your help and feedback. For more information on how to
  report problems, and to get involved, visit the project website at
  http://kafka.apache.org/
 
 



Re: Review Request 27256: Fix KAFKA-1735

2014-10-27 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27256/#review58744
---



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
https://reviews.apache.org/r/27256/#comment99882

Would it be possible to add a unit test for this?


- Neha Narkhede


On Oct. 27, 2014, 11:59 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27256/
 ---
 
 (Updated Oct. 27, 2014, 11:59 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1735
 https://issues.apache.org/jira/browse/KAFKA-1735
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Handle partial reads from compressed stream
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
 
 Diff: https://reviews.apache.org/r/27256/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




[jira] [Commented] (KAFKA-1731) add config/jmx changes in 0.8.2 doc

2014-10-27 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186179#comment-14186179
 ] 

Gwen Shapira commented on KAFKA-1731:
-

Thanks :)

No comments, it looks good. 

 add config/jmx changes in 0.8.2 doc
 ---

 Key: KAFKA-1731
 URL: https://issues.apache.org/jira/browse/KAFKA-1731
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8.2

 Attachments: config-jmx_082.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Jun Rao
Joe,

Thanks for driving the release.

Jun

On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote:

 The Apache Kafka community is pleased to announce the beta release for
 Apache Kafka 0.8.2.

 The 0.8.2-beta release introduces many new features, improvements and
 fixes including:
  - A new Java producer for ease of implementation and enhanced performance.
  - Delete topic support.
  - Per topic configuration of preference for consistency over availability.
  - Scala 2.11 support and dropping support for Scala 2.8.
  - LZ4 Compression.

 All of the changes in this release can be found:
 https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html

 Apache Kafka is high-throughput, publish-subscribe messaging system
 rethought of as a distributed commit log.

 ** Fast = A single Kafka broker can handle hundreds of megabytes of reads
 and
 writes per second from thousands of clients.

 ** Scalable = Kafka is designed to allow a single cluster to serve as the
 central data backbone
 for a large organization. It can be elastically and transparently expanded
 without downtime.
 Data streams are partitioned and spread over a cluster of machines to
 allow data streams
 larger than the capability of any single machine and to allow clusters of
 co-ordinated consumers.

 ** Durable = Messages are persisted on disk and replicated within the
 cluster to prevent
 data loss. Each broker can handle terabytes of messages without
 performance impact.

 ** Distributed by Design = Kafka has a modern cluster-centric design that
 offers
 strong durability and fault-tolerance guarantees.

 You can download the release from: http://kafka.apache.org/downloads.html

 We welcome your help and feedback. For more information on how to
 report problems, and to get involved, visit the project website at
 http://kafka.apache.org/




Re: Review Request 26373: Patch for KAFKA-1647

2014-10-27 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review58748
---

Ship it!


Looks good to me. Can you make these final edits and upload another RB?


core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/26373/#comment99891

The .format needs to be on this line. Can you fix it and upload a new patch?



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/26373/#comment99892

Small edits:

// Create the local replica even if the leader is unavailable. This is 
required to ensure that we include the partition's high watermark in the 
checkpoint file (see KAFKA-1647)

Also, I'm not sure if we need to explicitly reference the jira in comments 
since people can just git annotate.


- Joel Koshy


On Oct. 28, 2014, 12:20 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 28, 2014, 12:20 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Joel's comments.
 
 
 the version 2 code seems to be submitted by mistake... This should be the 
 code for review that addressed Joel's comments.
 
 
 Addressed Jun's comments. Will do tests to verify if it works.
 
 
 Addressed Joel's comments, we do not need to check the if leader exits for 
 not when adding fetcher.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 Followed Joel's testing step. I was able to reproduce the problem without the 
 patch and the WARN message goes away after applied the patch.
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26755: Patch for KAFKA-1706

2014-10-27 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/
---

(Updated Oct. 28, 2014, 1:34 a.m.)


Review request for kafka.


Bugs: KAFKA-1706
https://issues.apache.org/jira/browse/KAFKA-1706


Repository: kafka


Description (updated)
---

changed arguments name


correct typo.


Incorporated Joel's comments. Also fixed negative queue size problem.


Incorporated Joel's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/26755/diff/


Testing
---


Thanks,

Jiangjie Qin



RE: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Libo Yu
Congrats! When do you think the final 0.82 will be released?

 To: annou...@apache.org; us...@kafka.apache.org; dev@kafka.apache.org
 Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
 Date: Tue, 28 Oct 2014 00:50:35 +
 From: joest...@apache.org
 
 The Apache Kafka community is pleased to announce the beta release for Apache 
 Kafka 0.8.2.
 
 The 0.8.2-beta release introduces many new features, improvements and fixes 
 including:
  - A new Java producer for ease of implementation and enhanced performance.
  - Delete topic support.
  - Per topic configuration of preference for consistency over availability.
  - Scala 2.11 support and dropping support for Scala 2.8.
  - LZ4 Compression.
 
 All of the changes in this release can be found: 
 https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
 
 Apache Kafka is high-throughput, publish-subscribe messaging system rethought 
 of as a distributed commit log.
 
 ** Fast = A single Kafka broker can handle hundreds of megabytes of reads 
 and 
 writes per second from thousands of clients.
 
 ** Scalable = Kafka is designed to allow a single cluster to serve as the 
 central data backbone 
 for a large organization. It can be elastically and transparently expanded 
 without downtime. 
 Data streams are partitioned and spread over a cluster of machines to allow 
 data streams 
 larger than the capability of any single machine and to allow clusters of 
 co-ordinated consumers.
 
 ** Durable = Messages are persisted on disk and replicated within the 
 cluster to prevent 
 data loss. Each broker can handle terabytes of messages without performance 
 impact.
 
 ** Distributed by Design = Kafka has a modern cluster-centric design that 
 offers 
 strong durability and fault-tolerance guarantees.
 
 You can download the release from: http://kafka.apache.org/downloads.html
 
 We welcome your help and feedback. For more information on how to
 report problems, and to get involved, visit the project website at 
 http://kafka.apache.org/
 
  

[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1732:
-
Fix Version/s: 0.8.2

 DumpLogSegments tool fails when path has a '.'
 --

 Key: KAFKA-1732
 URL: https://issues.apache.org/jira/browse/KAFKA-1732
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Fix For: 0.8.2

 Attachments: KAFKA-1732.patch


 Using DumpLogSegments in a directory that has a '.' that isn't part of the 
 file extension causes an exception:
 {code}
 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
 kafka.tools.DumpLogSegments  --file 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
  --verify-index-only
 Dumping 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
 Exception in thread main java.io.FileNotFoundException: 
 /Users/ewencp/kafka.log (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.init(FileInputStream.java:146)
   at kafka.utils.Utils$.openChannel(Utils.scala:162)
   at kafka.log.FileMessageSet.init(FileMessageSet.scala:74)
   at 
 kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186334#comment-14186334
 ] 

Neha Narkhede commented on KAFKA-1732:
--

Thanks [~charmalloc]. Missed updating the version myself.

 DumpLogSegments tool fails when path has a '.'
 --

 Key: KAFKA-1732
 URL: https://issues.apache.org/jira/browse/KAFKA-1732
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Fix For: 0.8.2

 Attachments: KAFKA-1732.patch


 Using DumpLogSegments in a directory that has a '.' that isn't part of the 
 file extension causes an exception:
 {code}
 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
 kafka.tools.DumpLogSegments  --file 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
  --verify-index-only
 Dumping 
 /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
 Exception in thread main java.io.FileNotFoundException: 
 /Users/ewencp/kafka.log (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.init(FileInputStream.java:146)
   at kafka.utils.Utils$.openChannel(Utils.scala:162)
   at kafka.log.FileMessageSet.init(FileMessageSet.scala:74)
   at 
 kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
   at 
 kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-27 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186350#comment-14186350
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---

[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!  

Thanks,

Bhavesh 

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 Thanks,
 Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-27 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186350#comment-14186350
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/28/14 4:40 AM:
-

[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  (Of Course at cost 
of memory).  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!  

Thanks,

Bhavesh 


was (Author: bmis13):
[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!  

Thanks,

Bhavesh 

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-27 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186350#comment-14186350
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/28/14 4:58 AM:
-

[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  (Of Course at cost 
of memory).  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!Only think I can think of is do 
*AsynKafkaProducer* as mentioned in previous comments where [~ewencp] mentioned 
that problem will be those threads that are enqueue message at cost of memory, 
thread context switching etc...

Thanks,

Bhavesh 


was (Author: bmis13):
[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  (Of Course at cost 
of memory).  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!  

Thanks,

Bhavesh 

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139