[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-16 Thread Vladimir Tretyakov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173425#comment-14173425
 ] 

Vladimir Tretyakov commented on KAFKA-1481:
---

Hi Jun, thx for juicy feedback, few comments fro me:

1-2. Did it in this way because I didn't want to change existing code a lot, 
tried got what we need based on current method signatures. I've thought about 
your approach with Map too, reject it because of larger changes. Now I see that 
I will do it.

3. Removed 'mBeanName' param, was legacy code.
4. As I've understood from code this method must be called before shutdown in 
hook, right? We have to remove only metrics related to particular clientId and 
we shouldn't touch others, right?

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
 KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1506) Cancel kafka-reassign-partitions Job

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1506:
-
Labels: newbie++  (was: )

 Cancel kafka-reassign-partitions Job
 --

 Key: KAFKA-1506
 URL: https://issues.apache.org/jira/browse/KAFKA-1506
 Project: Kafka
  Issue Type: New Feature
  Components: replication, tools
Affects Versions: 0.8.1, 0.8.1.1
Reporter: Paul Lung
Assignee: Neha Narkhede
  Labels: newbie++

 I started a reassignment, and for some reason it just takes forever. However, 
 it won¹t let me start another reassignment job while this one is running. So 
 a tool to cancel a reassignment job is needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1506) Cancel kafka-reassign-partitions Job

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1506:
-
Reviewer: Neha Narkhede
Assignee: (was: Neha Narkhede)

 Cancel kafka-reassign-partitions Job
 --

 Key: KAFKA-1506
 URL: https://issues.apache.org/jira/browse/KAFKA-1506
 Project: Kafka
  Issue Type: New Feature
  Components: replication, tools
Affects Versions: 0.8.1, 0.8.1.1
Reporter: Paul Lung
  Labels: newbie++

 I started a reassignment, and for some reason it just takes forever. However, 
 it won¹t let me start another reassignment job while this one is running. So 
 a tool to cancel a reassignment job is needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 26811: Patch for KAFKA-1196

2014-10-16 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26811/
---

Review request for kafka.


Bugs: KAFKA-1196
https://issues.apache.org/jira/browse/KAFKA-1196


Repository: kafka


Description
---

KAFKA-1196 WIP Ensure FetchResponses don't exceed 2GB limit.


Diffs
-

  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/server/KafkaApis.scala 
85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
a5386a03b62956bc440b40783247c8cdf7432315 

Diff: https://reviews.apache.org/r/26811/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Neha Narkhede
Another JIRA that will be nice to include as part of 0.8.2-beta is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
naming. Looking for people's thoughts on 2 things here -

1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones mentioned
above) in 0.8.2-beta? If so, it will be great to know now so it will allow
us to move forward with the beta release quickly.

Thanks,
Neha

On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 Hi,

 We have accumulated an impressive list of pretty major features in 0.8.2 -
 Delete topic
 Automated leader rebalancing
 Controlled shutdown
 Offset management
 Parallel recovery
 min.isr and
 clean leader election

 In the past, what has worked for major feature releases is a beta release
 prior to a final release. I'm proposing we do the same for 0.8.2. The only
 blockers for 0.8.2-beta, that I know of are -

 https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and
 requires some thinking about the new dependency. Since it is not fully
 ready and there are things to think about, I suggest we take it out, think
 it end to end and then include it in 0.8.3.)
 https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
 Guozhang Wang)
 https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
 waiting on a review by Joe Stein)

 It seems that 1634 and 1671 can get wrapped up in a week. Do people think
 we should cut 0.8.2-beta by next week?

 Thanks,
 Neha



[jira] [Commented] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173941#comment-14173941
 ] 

Ewen Cheslack-Postava commented on KAFKA-1196:
--

Created reviewboard https://reviews.apache.org/r/26811/diff/
 against branch origin/trunk

 java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
 ---

 Key: KAFKA-1196
 URL: https://issues.apache.org/jira/browse/KAFKA-1196
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.0
 Environment: running java 1.7, linux and kafka compiled against scala 
 2.9.2
Reporter: Gerrit Jansen van Vuuren
Priority: Blocker
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1196.patch


 I have 6 topics each with 8 partitions spread over 4 kafka servers.
 the servers are 24 core 72 gig ram.
 While consuming from the topics I get an IlegalArgumentException and all 
 consumption stops, the error keeps on throwing.
 I've tracked it down to FectchResponse.scala line 33
 The error happens when the FetchResponsePartitionData object's readFrom 
 method calls:
 messageSetBuffer.limit(messageSetSize)
 I put in some debug code the the messageSetSize is 671758648, while the 
 buffer.capacity() gives 155733313, for some reason the buffer is smaller than 
 the required message size.
 I don't know the consumer code enough to debug this. It doesn't matter if 
 compression is used or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1196:
-
Assignee: Ewen Cheslack-Postava
  Status: Patch Available  (was: Open)

 java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
 ---

 Key: KAFKA-1196
 URL: https://issues.apache.org/jira/browse/KAFKA-1196
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.0
 Environment: running java 1.7, linux and kafka compiled against scala 
 2.9.2
Reporter: Gerrit Jansen van Vuuren
Assignee: Ewen Cheslack-Postava
Priority: Blocker
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1196.patch


 I have 6 topics each with 8 partitions spread over 4 kafka servers.
 the servers are 24 core 72 gig ram.
 While consuming from the topics I get an IlegalArgumentException and all 
 consumption stops, the error keeps on throwing.
 I've tracked it down to FectchResponse.scala line 33
 The error happens when the FetchResponsePartitionData object's readFrom 
 method calls:
 messageSetBuffer.limit(messageSetSize)
 I put in some debug code the the messageSetSize is 671758648, while the 
 buffer.capacity() gives 155733313, for some reason the buffer is smaller than 
 the required message size.
 I don't know the consumer code enough to debug this. It doesn't matter if 
 compression is used or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173943#comment-14173943
 ] 

Ewen Cheslack-Postava commented on KAFKA-1196:
--

This is a wip patch to fix this issue, which previous discussion suggests was 
due to the FetchResponse exceeding 2GB. My approach to triggering the issue, 
however, doesn't exhibit exactly the same issue but does cause an unrecoverable 
error that causes the consumer connection to terminate. (For reference, it 
causes the server to fail when FetchResponseSend.writeTo calls expectIncomplete 
and sendSize is negative due to overflow. This confuses the server since it 
looks like the message is already done sending and the server forcibly closes 
the consumer's connection.)

The patch addresses the core issue by ensuring the returned message doesn't 
exceed 2GB by dropping parts of it in a way that otherwise shouldn't affect the 
consumer. But there are a lot of points that still need to be addressed:

* I started by building an integration test to trigger the issue, included in 
PrimitiveApiTest. However, since we necessarily need to have  2GB data to 
trigger the issue, it's probably too expensive to include in this way. Offline 
discussion suggests maybe a system test would be a better place to include 
this. It's still included here for completeness.
* The implementation filters to a subset of the data in FetchResponse. The main 
reason for this is that this process needs to know the exact (or at least 
conservative estimate) size of serialized data, which only FetchResponse knows. 
But it's also a bit weird compared to other message classes, which are case 
classes and don't modify those inputs.
* Algorithm for choosing subset to return: initial approach is to remove random 
elements until we get below the limit. This is simple to understand and avoids 
starvation of specific TopicAndPartitions. Any concerns with this basic 
approach?
* I'm pretty sure I've managed to keep the  2GB case to effectively the same 
computational cost (computing the serialized size, grouped data, etc. exactly 
once as before). However, for the  2GB case I've only ensured correctness. In 
particular, the progressive removal and reevaluation of serialized size could 
potentially be very bad for very large data sets (e.g. starting a mirror maker 
against a large data set with large # of partitions from scratch).
* Note that the algorithm never deals with the actual message data, only 
metadata about what messages are available. This is relevant since this is what 
suggested the approach in the patch could still be performant -- 
ReplicaManager.readMessageSets processes the entire FetchRequest and filters it 
down because the metadata involved is relatively small.
* Based on the previous two points, this really needs some more realistic large 
scale system tests to make sure this approach is not only correct, but provides 
reasonable performance (or indicates we need to revise the algorithm for 
selecting a subset of the data).
* Testing isn't really complete -- I triggered the issue with 4 topics * 600 
MB/topic, which is  2GB. Another obvious case to check is when some partitions 
contain  2GB on their own.
* I'd like someone to help sanity check the exact maximum FetchResponse 
serialized size we limit messages to. It's not Int.MaxValue because the 
FetchResponseSend class adds 4 + FetchResponse.sizeInBytes for it's own size. 
I'd like a sanity check that the extra 4 bytes is enough -- is there any 
additional wrapping we might need to account for? Getting a test to hit exactly 
that narrow range could be tricky.
* The tests include both immediate-response and purgatory paths, but the 
purgatory version requires a timeout in the test, which could end up being 
flaky + wasting time, but it doesn't look like there's a great way to mock that 
right now. Maybe this doesn't matter if it moves to a system test?
* One case this doesn't handle yet is when the data reaches  2GB after it's in 
the purgatory. The result is correct, but the response is not sent as soon as 
that condition is satisfied. This is because it looks like evaluating this 
exactly would require calling readMessageSets and evaluating the size of the 
message for every DelayedFetch.isSatisifed call. This sounds like it could end 
up being pretty expensive. Maybe there's a better way, perhaps an approximate 
scheme?
* The test requires some extra bytes in the fetchSize for each partition, 
presumably for overhead in encoding. I haven't tracked down exactly how big 
that should be, but I'm guessing it could end up affecting the results of more 
comprehensive tests.

 java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
 ---

 Key: KAFKA-1196
 URL: 

[jira] [Resolved] (KAFKA-1707) ConsumerOffsetChecker shows none partitions assigned

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1707.
--
Resolution: Won't Fix

This is a known issue but there are a few things to check. Did you run the 
VerifyConsumerRebalance? However, I'd suggest directing such questions to the 
mailing list first. So I'll go ahead and close this ticket.

 ConsumerOffsetChecker shows none partitions assigned
 

 Key: KAFKA-1707
 URL: https://issues.apache.org/jira/browse/KAFKA-1707
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.0
 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 
 2.40GHz/1.2e+02GB
Reporter: Hari
Assignee: Neha Narkhede
  Labels: patch

 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some 
 partitions having none consumers after re-balance triggered due to new 
 consumer joined/disconnected to the group. The lag gets piling up till the 
 partitions are assigned to it usually after another re-balance trigger.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1708) Consumers intermittently stop consuming till restart

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1708.
--
Resolution: Won't Fix

Same here. Please direct this to the mailing list where people can help out. If 
we agree that there is a problem, then you can file a JIRA. 

 Consumers intermittently stop consuming till restart
 

 Key: KAFKA-1708
 URL: https://issues.apache.org/jira/browse/KAFKA-1708
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.0
 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 
 2.40GHz/1.2e+02GB
Reporter: Hari
Assignee: Neha Narkhede
  Labels: patch

 Using a simple consumer, and reading messages using StreamIterator noticed 
 that the consumptions suddenly stops and the lag starts building up till the 
 consumer is restarted. Below is the code snippet
 final MapString, ListKafkaStreambyte[], byte[] streamsByName = 
 consumerConnector.createMessageStreams(topicCountMap);
 ConsumerIteratorbyte[], byte[] streamIterator = 
 streamsByName.get(topicName).get(IDX_FIRST_ITEM).iterator();
 if (streamIterator.hasNext()) {
 final MessageAndMetadatabyte[], byte[] item =   
 streamIterator.next();
 ...
 }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1710:
-
Reviewer: Jun Rao
Assignee: (was: Jun Rao)

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 Thanks,
 Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1492) Getting error when sending producer request at the broker end with a single broker

2014-10-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173987#comment-14173987
 ] 

Neha Narkhede commented on KAFKA-1492:
--

bq. This seems more appropriate for the Kafka user mailing list or Stack 
Overflow (apache-kafka tag) rather than JIRA.

+1

 Getting error when sending producer request at the broker end with a single 
 broker
 --

 Key: KAFKA-1492
 URL: https://issues.apache.org/jira/browse/KAFKA-1492
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: sriram
Assignee: Jun Rao

 Tried to run a simple example by sending a message to a single broker . 
 Getting error 
 [2014-06-13 08:35:45,402] INFO Closing socket connection to /127.0.0.1. 
 (kafka.network.Processor)
 [2014-06-13 08:35:45,440] WARN [KafkaApi-1] Produce request with correlation 
 id 2 from client  on partition [samsung,0] failed due to Leader not local for 
 partition [samsung,0] on broker 1 (kafka.server.KafkaApis)
 [2014-06-13 08:35:45,440] INFO [KafkaApi-1] Send the close connection 
 response due to error handling produce request [clientId = , correlationId = 
 2, topicAndPartition = [samsung,0]] with Ack=0 (kafka.server.KafkaApis)
 OS- Windows 7 , JDK 1.7 , Scala 2.10



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26770: Patch for KAFKA-1108

2014-10-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26770/#review56956
---



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/26770/#comment97399

Should this be WARN instead? ERROR wouldn't be ideal since this operation 
is retried later. Also wondering if this message actually gives much 
information about the reason of the failure? It might just print out 
IOException. I think the reason for failure that people might understand is 
what might cause the IOException. How about improving the error message by 
saying that the possible cause for this error could be that the leader movement 
operation on the controller took longer than than the configured 
socket.timeout.ms. 

This will encourage users to inspect if the socket.timeout.ms needs to be 
bumped up or inspect why the controller is taking long for moving the leaders 
away from this broker.


- Neha Narkhede


On Oct. 15, 2014, 6:55 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26770/
 ---
 
 (Updated Oct. 15, 2014, 6:55 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1108
 https://issues.apache.org/jira/browse/KAFKA-1108
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1108 Log IOException messages during controlled shutdown.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 07c0a078ffa5142441f687da851472da732c3837 
 
 Diff: https://reviews.apache.org/r/26770/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1653:
-
Reviewer: Neha Narkhede

 Duplicate broker ids allowed in replica assignment
 --

 Key: KAFKA-1653
 URL: https://issues.apache.org/jira/browse/KAFKA-1653
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Attachments: KAFKA-1653.patch


 The reassign partitions command and the controller do not ensure that all 
 replicas for a partition are on different brokers. For example, you could set 
 1,2,2 as the list of brokers for the replicas.
 kafka-topics.sh --describe --under-replicated will list these partitions as 
 under-replicated, but I can't see a reason why the controller should allow 
 this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26666: Patch for KAFKA-1653

2014-10-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/#review56958
---


Since you fixed some other tools as well, can we also fix the preferred replica 
election command where we can de-dup the partitions?


core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
https://reviews.apache.org/r/2/#comment97400

I think it's worth telling the user which partition's replicas contain 
duplicates (and include all such partitions instead of one) since typically 
partition reassignment operation can contain 100s of partitions.


- Neha Narkhede


On Oct. 13, 2014, 11:57 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/2/
 ---
 
 (Updated Oct. 13, 2014, 11:57 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1653
 https://issues.apache.org/jira/browse/KAFKA-1653
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. 
 This covers a few cases besides the one identified in the bug. Aside from a 
 major refactoring to use Sets for broker/replica lists, sanitizing user input 
 seems to be the best solution here. I chose to generate errors instead of 
 just using toSet since a duplicate entry may indicate that a different broker 
 id was accidentally omitted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/admin/TopicCommand.scala 
 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
   core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
 d298e7e81acc7427c6cf4796b445966267ca54eb 
 
 Diff: https://reviews.apache.org/r/2/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2014-10-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174052#comment-14174052
 ] 

Neha Narkhede commented on KAFKA-1476:
--

[~balaji.sesha...@dish.com] Thanks for the patch. Few comments-
1. The CONSUMER GROUPS
* format is inconsistent with the other tools. For example, 
kafka-topics --list. Can we please remove it?
2. Currently, your tool only supports the list option. So the topic option is 
not required. 
3. The getConsumerGroups() API is better suited for ZkUtils.

Would you mind addressing the other feature requirements as well? Alternately, 
we can limit this JIRA to list and describe.

 Get a list of consumer groups
 -

 Key: KAFKA-1476
 URL: https://issues.apache.org/jira/browse/KAFKA-1476
 Project: Kafka
  Issue Type: Wish
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Williams
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, 
 KAFKA-1476.patch


 It would be useful to have a way to get a list of consumer groups currently 
 active via some tool/script that ships with kafka. This would be helpful so 
 that the system tools can be explored more easily.
 For example, when running the ConsumerOffsetChecker, it requires a group 
 option
 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
 ?
 But, when just getting started with kafka, using the console producer and 
 consumer, it is not clear what value to use for the group option.  If a list 
 of consumer groups could be listed, then it would be clear what value to use.
 Background:
 http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API

2014-10-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174061#comment-14174061
 ] 

Neha Narkhede commented on KAFKA-328:
-

Thanks for the patch. Would you mind using our patch review tool going forward? 
It will make it easier to review.
1. Better to use intercept[IllegalStateException] in the test. 
2. We should add all relevant test cases mentioned in the description like 
repeated shutdown

 Write unit test for kafka server startup and shutdown API 
 --

 Key: KAFKA-328
 URL: https://issues.apache.org/jira/browse/KAFKA-328
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: BalajiSeshadri
  Labels: newbie
 Attachments: KAFKA-328-FORMATTED.patch, KAFKA-328.patch


 Background discussion in KAFKA-320
 People often try to embed KafkaServer in an application that ends up calling 
 startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this 
 works correctly we have to be very careful about cleaning up resources. This 
 is a good practice for making unit tests reliable anyway.
 A good first step would be to add some unit tests on startup and shutdown to 
 cover various cases:
 1. A Kafka server can startup if it is not already starting up, if it is not 
 currently being shutdown, or if it hasn't been already started
 2. A Kafka server can shutdown if it is not already shutting down, if it is 
 not currently starting up, or if it hasn't been already shutdown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1654) Provide a way to override server configuration from command line

2014-10-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174101#comment-14174101
 ] 

Neha Narkhede commented on KAFKA-1654:
--

[~jarcec] Thanks for the patch. Overall looks good. Few comments
1. The usage command says USAGE: java [options] %s [kafka options] 
server.properties, but in order for the kafka options to take effect you also 
need to use --set. 
2. If you leave set out, it doesn't error out saying that --set is required and 
silently does not end up overriding the property value
3. Can we rename set to override?
4. If you specify multiple properties, it is unclear that you need to use --set 
for each of those. If you don't, it doesn't error out and silently doesn't 
override it.


 Provide a way to override server configuration from command line
 

 Key: KAFKA-1654
 URL: https://issues.apache.org/jira/browse/KAFKA-1654
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.1.1
Reporter: Jarek Jarcec Cecho
Assignee: Jarek Jarcec Cecho
 Fix For: 0.8.3

 Attachments: KAFKA-1654.patch


 I've been recently playing with Kafka and I found the current way of server 
 configuration quite inflexible. All the configuration options have to be 
 inside a properties file and there is no way how they can be overridden for 
 execution.  In order to temporarily change one property I had to copy the 
 config file and change the property there. Hence, I'm wondering if people 
 would be open to provide a way how to specify and override the configs from 
 the command line when starting Kafka?
 Something like:
 {code}
 ./bin/kafka-server-start.sh -Dmy.cool.property=X kafka.properties
 {code}
 or 
 {code}
 ./bin/kafka-server-start.sh --set my.cool.property=X kafka.properties
 {code}
 I'm more than happy to take a stab at it, but I would like to see if there is 
 an interest for such capability?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review56989
---


This is a fairly tricky patch and I'm not 100% sure that we haven't introduced 
any kind of regression. I'd feel more comfortable accepting this patch, if we 
repeated the kind of testing that was done to find this bug.


core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/26373/#comment97429

if(!partition.makeFollower)



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/26373/#comment97430

Now for partitions that have a leader, we are not adding a follower.


- Neha Narkhede


On Oct. 13, 2014, 11:38 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 13, 2014, 11:38 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174178#comment-14174178
 ] 

Ewen Cheslack-Postava commented on KAFKA-1710:
--

This looks like a red herring due to the structure of the test. The test code 
generates 200 threads which share 4 producers, and each thread round-robins 
through the consumers, then sleeps for 10ms.

It looks like all that's happening is that the profiling tool sees the same 
stack trace repeatedly because there's a huge amount of contention for the 4 
producers. If you take a look at the stack traces, they're almost all waiting 
on a lock on a queue that the messages get appended to. The few active threads 
have those queues locked and are working on compressing data before sending it 
out. Given the number of threads and the small number of producers, it's not 
surprising that YourKit sees the same stack traces for a long time -- the 
threads can be making forward progress, but any time the profiler stops to look 
at the stack traces, it's very likely that any given thread will be waiting on 
a lock with the same stack trace. None of the stack traces show any evidence of 
a real deadlock (i.e. I can't find any set of locks where there could be 
ordering issues since almost every thread is just waiting on a one lock in one 
of the producers).

If this did hit deadlock, the process should stop entirely because all the 
worker threads use all 4 producers and the supposedly deadlocked threads are 
all waiting on locks in the producer. I ran the test to completion multiple 
times without any issues. Unless this has actually been observed to hit 
deadlock and stop making progress, I think this should be closed since these 
messages are really just warnings from YourKit.

[~Bmis13] you might try reducing the # of threads and seeing if those charts 
end up looking better. I bet if you actually showed all the threads instead of 
just the couple in the screenshot, the areas marked as runnable across all 
threads would sum to a reasonable total. Also, there are other possible issues 
with getting good performance from this test code, e.g. the round robin 
approach can cause all threads to get blocked on the same producer if the 
producer gets locked for a relatively long time. This can happen when data is 
ready to be sent and is getting compressed. Other approaches to distributing 
work across the producers may provide better throughput.


 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 

Re: Security JIRAS

2014-10-16 Thread Michael Herstine
Thanks, Jay.

I¹m new to the project, and I¹m wondering how things proceed from hereŠ
are folks working on these tasks, or do they get assigned, orŠ?

On 10/7/14, 5:15 PM, Jay Kreps jay.kr...@gmail.com wrote:

Hey guys,

As promised, I added a tree of JIRAs for the stuff in the security wiki (
https://cwiki.apache.org/confluence/display/KAFKA/Security):

https://issues.apache.org/jira/browse/KAFKA-1682

I tried to break it into reasonably standalone pieces. I think many of the
tickets could actually be done in parallel. Since there were many people
interested in this area this may help parallelize the work a bit.

I added some strawman details on implementation to each ticket. We can
discuss and refine further on the individual tickets.

Please take a look and let me know if this breakdown seems reasonable.

Cheers,

-Jay



Re: Security JIRAS

2014-10-16 Thread Gwen Shapira
Wondering the same here :)

I think there are some parallel threads here (SSL is independent of
Kerberos, as far as I can see).

Kerberos work is blocked on
https://issues.apache.org/jira/browse/KAFKA-1683 - Implement a
session concept in the socket server. So there's no point in
picking up other tasks before this is assigned (and at least
designed).

I'm looking at Kafka Brokers authentication with ZooKeeper since this
looks independent of other tasks.

Gwen



On Thu, Oct 16, 2014 at 4:23 PM, Michael Herstine
mherst...@linkedin.com.invalid wrote:
 Thanks, Jay.

 I¹m new to the project, and I¹m wondering how things proceed from hereŠ
 are folks working on these tasks, or do they get assigned, orŠ?

 On 10/7/14, 5:15 PM, Jay Kreps jay.kr...@gmail.com wrote:

Hey guys,

As promised, I added a tree of JIRAs for the stuff in the security wiki (
https://cwiki.apache.org/confluence/display/KAFKA/Security):

https://issues.apache.org/jira/browse/KAFKA-1682

I tried to break it into reasonably standalone pieces. I think many of the
tickets could actually be done in parallel. Since there were many people
interested in this area this may help parallelize the work a bit.

I added some strawman details on implementation to each ticket. We can
discuss and refine further on the individual tickets.

Please take a look and let me know if this breakdown seems reasonable.

Cheers,

-Jay



Re: Review Request 26658: Patch for KAFKA-1493

2014-10-16 Thread James Oliver

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26658/
---

(Updated Oct. 16, 2014, 8:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1493
https://issues.apache.org/jira/browse/KAFKA-1493


Repository: kafka


Description
---

KAFKA-1493 Implement LZ4 Frame I/O Streams


KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
bf4ed66791b9a502aae6cb2ec7681f42732d9a43 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
5227b2d7ab803389d1794f48c8232350c05b14fd 
  clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
0323f5f7032dceb49d820c17a41b78c56591ffc4 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 
  config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
  core/src/main/scala/kafka/message/CompressionCodec.scala 
de0a0fade5387db63299c6b112b3c9a5e41d82ec 
  core/src/main/scala/kafka/message/CompressionFactory.scala 
8420e13d0d8680648df78f22ada4a0d4e3ab8758 
  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b024a693c23cb21f1efe405ed414bf23f3974f31 
  core/src/main/scala/kafka/tools/PerfConfig.scala 
c72002976d90416559090a665f6494072a6c2dec 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
c95485170fd8b4f5faad740f049e5d09aca8829d 
  core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
6f0addcea64f1e78a4de50ec8135f4d02cebd305 
  core/src/test/scala/unit/kafka/message/MessageTest.scala 
958c1a60069ad85ae20f5c58e74679cd9fa6f70e 

Diff: https://reviews.apache.org/r/26658/diff/


Testing (updated)
---

./gradlew test
All tests passed


Thanks,

James Oliver



[jira] [Updated] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Oliver updated KAFKA-1493:

Attachment: KAFKA-1493_2014-10-16_13:49:34.patch

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174217#comment-14174217
 ] 

James Oliver commented on KAFKA-1493:
-

Updated reviewboard https://reviews.apache.org/r/26658/diff/
 against branch origin/trunk

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Oliver reassigned KAFKA-1493:
---

Assignee: James Oliver  (was: Ivan Lyutov)

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174222#comment-14174222
 ] 

Ewen Cheslack-Postava commented on KAFKA-1108:
--

Updated reviewboard https://reviews.apache.org/r/26770/diff/
 against branch origin/trunk

 when controlled shutdown attempt fails, the reason is not always logged
 ---

 Key: KAFKA-1108
 URL: https://issues.apache.org/jira/browse/KAFKA-1108
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1108.patch, KAFKA-1108_2014-10-16_13:53:11.patch


 In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and 
 then if there's a failure, it will retry the controlledShutdown.
 Looking at the code, there are 2 ways a retry could fail, one with an error 
 response from the controller, and this messaging code:
 {code}
 info(Remaining partitions to move: 
 %s.format(shutdownResponse.partitionsRemaining.mkString(,)))
 info(Error code from controller: %d.format(shutdownResponse.errorCode))
 {code}
 Alternatively, there could be an IOException, with this code executed:
 {code}
 catch {
   case ioe: java.io.IOException =
 channel.disconnect()
 channel = null
 // ignore and try again
 }
 {code}
 And then finally, in either case:
 {code}
   if (!shutdownSuceeded) {
 Thread.sleep(config.controlledShutdownRetryBackoffMs)
 warn(Retrying controlled shutdown after the previous attempt 
 failed...)
   }
 {code}
 It would be nice if the nature of the IOException were logged in either case 
 (I'd be happy with an ioe.getMessage() instead of a full stack trace, as 
 kafka in general tends to be too willing to dump IOException stack traces!).
 I suspect, in my case, the actual IOException is a socket timeout (as the 
 time between initial Starting controlled shutdown and the first 
 Retrying... message is usually about 35 seconds (the socket timeout + the 
 controlled shutdown retry backoff).  So, it would seem that really, the issue 
 in this case is that controlled shutdown is taking too long.  It would seem 
 sensible instead to have the controller report back to the server (before the 
 socket timeout) that more time is needed, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1108:
-
Attachment: KAFKA-1108_2014-10-16_13:53:11.patch

 when controlled shutdown attempt fails, the reason is not always logged
 ---

 Key: KAFKA-1108
 URL: https://issues.apache.org/jira/browse/KAFKA-1108
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1108.patch, KAFKA-1108_2014-10-16_13:53:11.patch


 In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and 
 then if there's a failure, it will retry the controlledShutdown.
 Looking at the code, there are 2 ways a retry could fail, one with an error 
 response from the controller, and this messaging code:
 {code}
 info(Remaining partitions to move: 
 %s.format(shutdownResponse.partitionsRemaining.mkString(,)))
 info(Error code from controller: %d.format(shutdownResponse.errorCode))
 {code}
 Alternatively, there could be an IOException, with this code executed:
 {code}
 catch {
   case ioe: java.io.IOException =
 channel.disconnect()
 channel = null
 // ignore and try again
 }
 {code}
 And then finally, in either case:
 {code}
   if (!shutdownSuceeded) {
 Thread.sleep(config.controlledShutdownRetryBackoffMs)
 warn(Retrying controlled shutdown after the previous attempt 
 failed...)
   }
 {code}
 It would be nice if the nature of the IOException were logged in either case 
 (I'd be happy with an ioe.getMessage() instead of a full stack trace, as 
 kafka in general tends to be too willing to dump IOException stack traces!).
 I suspect, in my case, the actual IOException is a socket timeout (as the 
 time between initial Starting controlled shutdown and the first 
 Retrying... message is usually about 35 seconds (the socket timeout + the 
 controlled shutdown retry backoff).  So, it would seem that really, the issue 
 in this case is that controlled shutdown is taking too long.  It would seem 
 sensible instead to have the controller report back to the server (before the 
 socket timeout) that more time is needed, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26770: Patch for KAFKA-1108

2014-10-16 Thread Ewen Cheslack-Postava


 On Oct. 16, 2014, 5:55 p.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/server/KafkaServer.scala, line 239
  https://reviews.apache.org/r/26770/diff/1/?file=722474#file722474line239
 
  Should this be WARN instead? ERROR wouldn't be ideal since this 
  operation is retried later. Also wondering if this message actually gives 
  much information about the reason of the failure? It might just print out 
  IOException. I think the reason for failure that people might understand is 
  what might cause the IOException. How about improving the error message by 
  saying that the possible cause for this error could be that the leader 
  movement operation on the controller took longer than than the configured 
  socket.timeout.ms. 
  
  This will encourage users to inspect if the socket.timeout.ms needs to 
  be bumped up or inspect why the controller is taking long for moving the 
  leaders away from this broker.

The INFO level just matched similar messages a few lines above, although this 
is a more significant issue than those. Newest patch updates to WARN. Message 
is also more detailed, but ideally the IOException message also contains more 
than just the class name.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26770/#review56956
---


On Oct. 16, 2014, 8:53 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26770/
 ---
 
 (Updated Oct. 16, 2014, 8:53 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1108
 https://issues.apache.org/jira/browse/KAFKA-1108
 
 
 Repository: kafka
 
 
 Description
 ---
 
 More informative message and increase log level to warn.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 07c0a078ffa5142441f687da851472da732c3837 
 
 Diff: https://reviews.apache.org/r/26770/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Joe Stein
+1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.

I agree to the tickets you brought up to have in 0.8.2-beta and also
https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.

/***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
/
On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote:

 Another JIRA that will be nice to include as part of 0.8.2-beta is
 https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
 naming. Looking for people's thoughts on 2 things here -

 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
 final 4-5 weeks later?
 2. Do people want to include any JIRAs (other than the ones mentioned
 above) in 0.8.2-beta? If so, it will be great to know now so it will allow
 us to move forward with the beta release quickly.

 Thanks,
 Neha

 On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Hi,
 
  We have accumulated an impressive list of pretty major features in 0.8.2
 -
  Delete topic
  Automated leader rebalancing
  Controlled shutdown
  Offset management
  Parallel recovery
  min.isr and
  clean leader election
 
  In the past, what has worked for major feature releases is a beta release
  prior to a final release. I'm proposing we do the same for 0.8.2. The
 only
  blockers for 0.8.2-beta, that I know of are -
 
  https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and
  requires some thinking about the new dependency. Since it is not fully
  ready and there are things to think about, I suggest we take it out,
 think
  it end to end and then include it in 0.8.3.)
  https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
  Guozhang Wang)
  https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
  waiting on a review by Joe Stein)
 
  It seems that 1634 and 1671 can get wrapped up in a week. Do people think
  we should cut 0.8.2-beta by next week?
 
  Thanks,
  Neha
 



[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate use 
case.  It would be great if you into alternative to synchronization block.

{code}
 synchronized (dq) {
..
}
{code}

Thanks,

Bhavesh 

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 Thanks,
 Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:32 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.

{code title=RecordAccumulator.java|borderStyle=solid}
 synchronized (dq) {
..
}
{code}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid }
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
   

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:33 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.

{code title=RecordAccumulator.java|borderStyle=solid}
 synchronized (dq) {
  
}
{code}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid }
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
   

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:33 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid }
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
   

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
  

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:36 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.Test code 
amplifies the root cause.

That is root of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
 

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:38 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you can look into alternative implementation to synchronization block.
Test code amplifies the root cause.

That is root of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;

[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174305#comment-14174305
 ] 

Jun Rao commented on KAFKA-1493:


James,

Thanks for the patch. There are a few things marked as todo in the patch. Are 
those required? Do you think you have time to finish the patch for 0.8.2?

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26666: Patch for KAFKA-1653

2014-10-16 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/
---

(Updated Oct. 16, 2014, 9:54 p.m.)


Review request for kafka.


Bugs: KAFKA-1653
https://issues.apache.org/jira/browse/KAFKA-1653


Repository: kafka


Description (updated)
---

Generate error for duplicates in PreferredLeaderElectionCommand instead of just 
swallowing duplicates.


Report which entries are duplicated for ReassignPartitionCommand since they may 
be difficult to find in large reassignments.


Diffs (updated)
-

  core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
c7918483c02040a7cc18d6e9edbd20a3025a3a55 
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
691d69a49a240f38883d2025afaec26fd61281b5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 
7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
  core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
d298e7e81acc7427c6cf4796b445966267ca54eb 

Diff: https://reviews.apache.org/r/2/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1653:
-
Attachment: KAFKA-1653_2014-10-16_14:54:07.patch

 Duplicate broker ids allowed in replica assignment
 --

 Key: KAFKA-1653
 URL: https://issues.apache.org/jira/browse/KAFKA-1653
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch


 The reassign partitions command and the controller do not ensure that all 
 replicas for a partition are on different brokers. For example, you could set 
 1,2,2 as the list of brokers for the replicas.
 kafka-topics.sh --describe --under-replicated will list these partitions as 
 under-replicated, but I can't see a reason why the controller should allow 
 this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174319#comment-14174319
 ] 

Ewen Cheslack-Postava commented on KAFKA-1653:
--

Updated reviewboard https://reviews.apache.org/r/2/diff/
 against branch origin/trunk

 Duplicate broker ids allowed in replica assignment
 --

 Key: KAFKA-1653
 URL: https://issues.apache.org/jira/browse/KAFKA-1653
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Ewen Cheslack-Postava
  Labels: newbie
 Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch


 The reassign partitions command and the controller do not ensure that all 
 replicas for a partition are on different brokers. For example, you could set 
 1,2,2 as the list of brokers for the replicas.
 kafka-topics.sh --describe --under-replicated will list these partitions as 
 under-replicated, but I can't see a reason why the controller should allow 
 this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26666: Patch for KAFKA-1653

2014-10-16 Thread Ewen Cheslack-Postava


 On Oct. 16, 2014, 6:10 p.m., Neha Narkhede wrote:
  Since you fixed some other tools as well, can we also fix the preferred 
  replica election command where we can de-dup the partitions?

This was already removing duplicates, I had it generate an exception instead 
since duplicates may indicate a config error. I'm assuming that's what you 
meant here.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/#review56958
---


On Oct. 16, 2014, 9:54 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/2/
 ---
 
 (Updated Oct. 16, 2014, 9:54 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1653
 https://issues.apache.org/jira/browse/KAFKA-1653
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Generate error for duplicates in PreferredLeaderElectionCommand instead of 
 just swallowing duplicates.
 
 
 Report which entries are duplicated for ReassignPartitionCommand since they 
 may be difficult to find in large reassignments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
 c7918483c02040a7cc18d6e9edbd20a3025a3a55 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/admin/TopicCommand.scala 
 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
   core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
 d298e7e81acc7427c6cf4796b445966267ca54eb 
 
 Diff: https://reviews.apache.org/r/2/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:56 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you can look into alternative implementation to synchronization block.
Test code amplifies the root cause.

That is root of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueueProducerRecord asyncQueue; 
private final KafkaProducer producer;
private final ListThread threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException(Producer configuration 
cannot be null);
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueueProducerRecord(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayListThread(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i  numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),Kafka_Drain- +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public FutureRecordMetadata send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException(Null record 
cannot be sent.);
}
if(close.get()){
throw new KafkaException(Producer aready 
closed or in processec of closing...);
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public FutureRecordMetadata send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException(Send not supported);
}

public ListPartitionInfo partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public MapString, ? extends Metric metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;

Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Guozhang Wang
Regarding 1634, I was intended to work on that after 1583 since it will
changes the commit offset request handling logic a lot. If people think
1583 is only a few days away before check-in, we can leave in in
0.8.2-beta; otherwise we can push to 0.8.3.

Guozhang

On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote:

 +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.

 I agree to the tickets you brought up to have in 0.8.2-beta and also
 https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.

 /***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
 /
 On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote:

  Another JIRA that will be nice to include as part of 0.8.2-beta is
  https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
  naming. Looking for people's thoughts on 2 things here -
 
  1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
  final 4-5 weeks later?
  2. Do people want to include any JIRAs (other than the ones mentioned
  above) in 0.8.2-beta? If so, it will be great to know now so it will
 allow
  us to move forward with the beta release quickly.
 
  Thanks,
  Neha
 
  On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Hi,
  
   We have accumulated an impressive list of pretty major features in
 0.8.2
  -
   Delete topic
   Automated leader rebalancing
   Controlled shutdown
   Offset management
   Parallel recovery
   min.isr and
   clean leader election
  
   In the past, what has worked for major feature releases is a beta
 release
   prior to a final release. I'm proposing we do the same for 0.8.2. The
  only
   blockers for 0.8.2-beta, that I know of are -
  
   https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change
 and
   requires some thinking about the new dependency. Since it is not fully
   ready and there are things to think about, I suggest we take it out,
  think
   it end to end and then include it in 0.8.3.)
   https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
   Guozhang Wang)
   https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
   waiting on a review by Joe Stein)
  
   It seems that 1634 and 1671 can get wrapped up in a week. Do people
 think
   we should cut 0.8.2-beta by next week?
  
   Thanks,
   Neha
  
 




-- 
-- Guozhang


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Jun Rao
+1 on doing an 0.8.2 beta.

Guozhang,

kafka-1583 is relatively large. Given that we are getting close to
releasing 0.8.2 beta, my feeling is that we probably shouldn't include it
in 0.8.2 beta even if we can commit it in a few days.

Thanks,

Jun

On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote:

 Regarding 1634, I was intended to work on that after 1583 since it will
 changes the commit offset request handling logic a lot. If people think
 1583 is only a few days away before check-in, we can leave in in
 0.8.2-beta; otherwise we can push to 0.8.3.

 Guozhang

 On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote:

  +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
 
  I agree to the tickets you brought up to have in 0.8.2-beta and also
  https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
 
  /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop
  /
  On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:
 
   Another JIRA that will be nice to include as part of 0.8.2-beta is
   https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
   naming. Looking for people's thoughts on 2 things here -
  
   1. How do folks feel about doing a 0.8.2-beta release right now and
 0.8.2
   final 4-5 weeks later?
   2. Do people want to include any JIRAs (other than the ones mentioned
   above) in 0.8.2-beta? If so, it will be great to know now so it will
  allow
   us to move forward with the beta release quickly.
  
   Thanks,
   Neha
  
   On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   wrote:
  
Hi,
   
We have accumulated an impressive list of pretty major features in
  0.8.2
   -
Delete topic
Automated leader rebalancing
Controlled shutdown
Offset management
Parallel recovery
min.isr and
clean leader election
   
In the past, what has worked for major feature releases is a beta
  release
prior to a final release. I'm proposing we do the same for 0.8.2. The
   only
blockers for 0.8.2-beta, that I know of are -
   
https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change
  and
requires some thinking about the new dependency. Since it is not
 fully
ready and there are things to think about, I suggest we take it out,
   think
it end to end and then include it in 0.8.3.)
https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
Guozhang Wang)
https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
waiting on a review by Joe Stein)
   
It seems that 1634 and 1671 can get wrapped up in a week. Do people
  think
we should cut 0.8.2-beta by next week?
   
Thanks,
Neha
   
  
 



 --
 -- Guozhang



[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174356#comment-14174356
 ] 

James Oliver commented on KAFKA-1493:
-

Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress  decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by an implementation 
supporting some of the missing features. If this were to occur, a 
RuntimeException with detailed information will be thrown.

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174356#comment-14174356
 ] 

James Oliver edited comment on KAFKA-1493 at 10/16/14 10:23 PM:


Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress  decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by a more advanced 
implementation, using one or more of the missing features. If this were to 
occur, a RuntimeException with detailed information will be thrown.


was (Author: joliver):
Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress  decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by an implementation 
supporting some of the missing features. If this were to occur, a 
RuntimeException with detailed information will be thrown.

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Guozhang Wang
Agree.
On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote:

 +1 on doing an 0.8.2 beta.

 Guozhang,

 kafka-1583 is relatively large. Given that we are getting close to
 releasing 0.8.2 beta, my feeling is that we probably shouldn't include it
 in 0.8.2 beta even if we can commit it in a few days.

 Thanks,

 Jun

 On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote:

  Regarding 1634, I was intended to work on that after 1583 since it will
  changes the commit offset request handling logic a lot. If people think
  1583 is only a few days away before check-in, we can leave in in
  0.8.2-beta; otherwise we can push to 0.8.3.
 
  Guozhang
 
  On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
  
   I agree to the tickets you brought up to have in 0.8.2-beta and also
   https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
  
   /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop
   /
   On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
  
Another JIRA that will be nice to include as part of 0.8.2-beta is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the
 mbean
naming. Looking for people's thoughts on 2 things here -
   
1. How do folks feel about doing a 0.8.2-beta release right now and
  0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones mentioned
above) in 0.8.2-beta? If so, it will be great to know now so it will
   allow
us to move forward with the beta release quickly.
   
Thanks,
Neha
   
On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
  neha.narkh...@gmail.com
wrote:
   
 Hi,

 We have accumulated an impressive list of pretty major features in
   0.8.2
-
 Delete topic
 Automated leader rebalancing
 Controlled shutdown
 Offset management
 Parallel recovery
 min.isr and
 clean leader election

 In the past, what has worked for major feature releases is a beta
   release
 prior to a final release. I'm proposing we do the same for 0.8.2.
 The
only
 blockers for 0.8.2-beta, that I know of are -

 https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major
 change
   and
 requires some thinking about the new dependency. Since it is not
  fully
 ready and there are things to think about, I suggest we take it
 out,
think
 it end to end and then include it in 0.8.3.)
 https://issues.apache.org/jira/browse/KAFKA-1634 (This has an
 owner:
 Guozhang Wang)
 https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and
 is
 waiting on a review by Joe Stein)

 It seems that 1634 and 1671 can get wrapped up in a week. Do people
   think
 we should cut 0.8.2-beta by next week?

 Thanks,
 Neha

   
  
 
 
 
  --
  -- Guozhang
 



[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174356#comment-14174356
 ] 

James Oliver edited comment on KAFKA-1493 at 10/16/14 10:30 PM:


Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress  decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by a more advanced 
implementation, using one or more of the missing features. If this were to 
occur, a RuntimeException with detailed information will be thrown.

EDIT: I can take out the TODOs if you think it causes confusion


was (Author: joliver):
Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress  decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by a more advanced 
implementation, using one or more of the missing features. If this were to 
occur, a RuntimeException with detailed information will be thrown.

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174373#comment-14174373
 ] 

Ewen Cheslack-Postava commented on KAFKA-1710:
--

[~Bmis13] That approach just pushes the problem into KafkaAsyncProducer's 
thread that processes messages -- there won't be lock contention in 
KafkaProducer since KafkaAsyncProducer will be the only user of it, but you may 
not get an improvement in throughput because ultimately you're limited to the 
time a single thread can get. It may even get *slower* because you'll have more 
runnable threads at any given time, which means that the KafkaAsyncProducer 
worker thread will get less CPU time. Even disregarding that, since you used a 
LinkedBlockingQueue that will become your new source of contention (since it 
must be synchronized internally). If you have a very large capacity, that'll 
let the threads continue to make progress and contention will be lower since 
the time spent adding an item is very small, but it will cost a lot of memory 
since you're just adding a layer of buffering. That might be useful if you have 
bursty traffic (the buffer allows you to temporarily buffer more data while the 
KafkaProducer works on getting it sent), but if you have sustained traffic 
you'll just have constantly growing memory usage. If the capacity is small, 
then the threads producing messages will eventually end up getting blocked 
waiting for there to be space in the queue.

Probably the biggest issue here is that this test only writes to a single 
partition in a single topic. You could improve performance by using more 
partitions in that topic. You're already writing to all producers from all 
threads, so you must not need the ordering guarantees of a single partition. If 
you still want a single partition, you can improve performance by using more 
Producers, which will spread the contention across more queues. Since you 
already have 4 that you're running round-robin on, I'd guess adding more 
shouldn't be a problem.

In any case, this use case seems a bit odd. Are you really going to have 200 
threads generating messages *as fast as they can* with only 4 producers?

As far as this issue is concerned, the original report said the problem was 
deadlock but that doesn't seem to be the case. If you're just worried about 
performance, it probably makes more sense to move the discussion over to the 
mailing list. It'll probably be seen by more people and there will probably be 
multiple suggestions for improvements to your approach before we have to make 
changes to the Kafka code.

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, 

Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Joel Koshy
+1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2
so that will only go out with 0.8.3.

Joel

On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote:
 Agree.
 On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote:
 
  +1 on doing an 0.8.2 beta.
 
  Guozhang,
 
  kafka-1583 is relatively large. Given that we are getting close to
  releasing 0.8.2 beta, my feeling is that we probably shouldn't include it
  in 0.8.2 beta even if we can commit it in a few days.
 
  Thanks,
 
  Jun
 
  On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote:
 
   Regarding 1634, I was intended to work on that after 1583 since it will
   changes the commit offset request handling logic a lot. If people think
   1583 is only a few days away before check-in, we can leave in in
   0.8.2-beta; otherwise we can push to 0.8.3.
  
   Guozhang
  
   On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote:
  
+1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
   
I agree to the tickets you brought up to have in 0.8.2-beta and also
https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
   
/***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
/
On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com
   wrote:
   
 Another JIRA that will be nice to include as part of 0.8.2-beta is
 https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the
  mbean
 naming. Looking for people's thoughts on 2 things here -

 1. How do folks feel about doing a 0.8.2-beta release right now and
   0.8.2
 final 4-5 weeks later?
 2. Do people want to include any JIRAs (other than the ones mentioned
 above) in 0.8.2-beta? If so, it will be great to know now so it will
allow
 us to move forward with the beta release quickly.

 Thanks,
 Neha

 On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
   neha.narkh...@gmail.com
 wrote:

  Hi,
 
  We have accumulated an impressive list of pretty major features in
0.8.2
 -
  Delete topic
  Automated leader rebalancing
  Controlled shutdown
  Offset management
  Parallel recovery
  min.isr and
  clean leader election
 
  In the past, what has worked for major feature releases is a beta
release
  prior to a final release. I'm proposing we do the same for 0.8.2.
  The
 only
  blockers for 0.8.2-beta, that I know of are -
 
  https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major
  change
and
  requires some thinking about the new dependency. Since it is not
   fully
  ready and there are things to think about, I suggest we take it
  out,
 think
  it end to end and then include it in 0.8.3.)
  https://issues.apache.org/jira/browse/KAFKA-1634 (This has an
  owner:
  Guozhang Wang)
  https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and
  is
  waiting on a review by Joe Stein)
 
  It seems that 1634 and 1671 can get wrapped up in a week. Do people
think
  we should cut 0.8.2-beta by next week?
 
  Thanks,
  Neha
 

   
  
  
  
   --
   -- Guozhang
  
 



[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174403#comment-14174403
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---

[~ewencp],

Thanks for the looking into this issue.  We consume as fast as we can 
re-publish the message to another aggregated topic based on some kes in 
message. We  see thread contentions in profile tool and I separated out the 
code and to amplify the problem.  We run with about 75 threads.  [~ewencp] can 
you please discuss this issue with Kafka Community as well ?  The dead lock 
will occur something depending on Thread scheduling  and how log the are 
blocked.  All I am asking is there a better way to enqueue in coming messages.  
I just proposed simple above solution that does not impact application threads 
and only drain threads will be blocked and with buffer as you mentioned we 
might get better through-put (of course at expense of buffered memory 
(unbounded concurrent queue)  and thread context switching) .If you feel 
this is know performance issue to send to to single partition then please close 
this, and you may start discussion on Kafka Community for this issue.  Thanks 
for your help and suggestions  !! 

According to thread dumps, blocks are happening  in Synchronization block.  
{code}
pool-1-thread-200 prio=5 tid=0x7f92451c2000 nid=0x20103 waiting for 
monitor entry [0x00012d228000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:139)
- waiting to lock 0x000703ce39f0 (a java.util.ArrayDeque)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:238)
at 
org.kafka.test.TestNetworkDownProducer$MyProducer.run(TestNetworkDownProducer.java:85)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

pool-1-thread-199 prio=5 tid=0x7f92451c1800 nid=0x1ff03 waiting for 
monitor entry [0x00012d0e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:139)
- waiting to lock 0x000703ce39f0 (a java.util.ArrayDeque)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:238)
at 
org.kafka.test.TestNetworkDownProducer$MyProducer.run(TestNetworkDownProducer.java:85)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
{code}

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1710
 URL: https://issues.apache.org/jira/browse/KAFKA-1710
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Priority: Critical
  Labels: performance
 Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
 TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
 th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
 th6.dump, th7.dump, th8.dump, th9.dump


 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 

[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174462#comment-14174462
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

[~jkreps],

Did you get chance to re-produce the problem ?  Has someone else reported this 
issues or similar issue ?

Thanks,

Bhavesh 

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Jun Rao

 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26658: Patch for KAFKA-1493

2014-10-16 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26658/#review57005
---


Thanks for the patch. Looks good overall. Some minor comments below.


clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
https://reviews.apache.org/r/26658/#comment97485

Should this be private?



clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
https://reviews.apache.org/r/26658/#comment97505

Should we flush after writing the end mark?



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
https://reviews.apache.org/r/26658/#comment97444

Unused import.



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
https://reviews.apache.org/r/26658/#comment97526

How about we make this 2000 so that we can test compression on more than 
64KB, which is the default block size of lz4?



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
https://reviews.apache.org/r/26658/#comment97527

How about also adding the following properties to test out compressing more 
than 64KB?
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 66000)
props.put(ProducerConfig.LINGER_MS_CONFIG, 200)


- Jun Rao


On Oct. 16, 2014, 8:50 p.m., James Oliver wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26658/
 ---
 
 (Updated Oct. 16, 2014, 8:50 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1493
 https://issues.apache.org/jira/browse/KAFKA-1493
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1493 Implement LZ4 Frame I/O Streams
 
 
 KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 bf4ed66791b9a502aae6cb2ec7681f42732d9a43 
   
 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
 5227b2d7ab803389d1794f48c8232350c05b14fd 
   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
 0323f5f7032dceb49d820c17a41b78c56591ffc4 
   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
 a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 
   config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
   core/src/main/scala/kafka/message/CompressionCodec.scala 
 de0a0fade5387db63299c6b112b3c9a5e41d82ec 
   core/src/main/scala/kafka/message/CompressionFactory.scala 
 8420e13d0d8680648df78f22ada4a0d4e3ab8758 
   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
 b024a693c23cb21f1efe405ed414bf23f3974f31 
   core/src/main/scala/kafka/tools/PerfConfig.scala 
 c72002976d90416559090a665f6494072a6c2dec 
   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
 c95485170fd8b4f5faad740f049e5d09aca8829d 
   core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
 6f0addcea64f1e78a4de50ec8135f4d02cebd305 
   core/src/test/scala/unit/kafka/message/MessageTest.scala 
 958c1a60069ad85ae20f5c58e74679cd9fa6f70e 
 
 Diff: https://reviews.apache.org/r/26658/diff/
 
 
 Testing
 ---
 
 ./gradlew test
 All tests passed
 
 
 Thanks,
 
 James Oliver
 




[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174473#comment-14174473
 ] 

Jun Rao commented on KAFKA-1493:


James,

Thanks for the answer. We can leave the TODOs there. The patch looks good to 
me. Could you look at the comments in the RB?

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 24676: Rebase KAFKA-1583

2014-10-16 Thread Guozhang Wang


 On Oct. 16, 2014, 1:29 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 167
  https://reviews.apache.org/r/24676/diff/9/?file=720184#file720184line167
 
  Should replica manager be offset manager?

This is replica manager actually, when it tries to write the commit message 
to the local log. I have changed the comment a bit to make it more clear.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review56843
---


On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Oct. 14, 2014, 2:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + 
 minor changes
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 59c09155dd25fad7bed07d3d00039e3dc66db95c 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
   core/src/main/scala/kafka/log/Log.scala 
 a123cdc52f341a802b3e4bfeb29a6154332e5f73 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 67f2833804cb15976680e42b9dc49e275c89d266 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
 fb61d552f2320fedec547400fbbe402a0b2f5d87 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 3804a114e97c849cae48308997037786614173fc 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 24676: Fix KAFKA-1583

2014-10-16 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Oct. 17, 2014, 4:15 a.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-1583


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incoporate Jun's comments after rebase


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 
157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



[jira] [Commented] (KAFKA-1583) Kafka API Refactoring

2014-10-16 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174709#comment-14174709
 ] 

Guozhang Wang commented on KAFKA-1583:
--

Updated reviewboard https://reviews.apache.org/r/24676/diff/
 against branch origin/trunk

 Kafka API Refactoring
 -

 Key: KAFKA-1583
 URL: https://issues.apache.org/jira/browse/KAFKA-1583
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
 KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
 KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
 KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
 KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch


 This is the next step of KAFKA-1430. Details can be found at this page:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26658: Patch for KAFKA-1493

2014-10-16 Thread James Oliver

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26658/
---

(Updated Oct. 17, 2014, 4:25 a.m.)


Review request for kafka.


Bugs: KAFKA-1493
https://issues.apache.org/jira/browse/KAFKA-1493


Repository: kafka


Description (updated)
---

KAFKA-1493 Implement LZ4 Frame I/O Streams


KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction


KAFKA-1493 Flush stream after writing frame end mark


KAFKA-1493 Remove unused import


KAFKA-1493 Move finish() logic into close()


KAFKA-1493 Modify test cases to compress a 64kb message to test multi-block 
lz4 frame compression/decompression


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
bf4ed66791b9a502aae6cb2ec7681f42732d9a43 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
5227b2d7ab803389d1794f48c8232350c05b14fd 
  clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
0323f5f7032dceb49d820c17a41b78c56591ffc4 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 
  config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
  core/src/main/scala/kafka/message/CompressionCodec.scala 
de0a0fade5387db63299c6b112b3c9a5e41d82ec 
  core/src/main/scala/kafka/message/CompressionFactory.scala 
8420e13d0d8680648df78f22ada4a0d4e3ab8758 
  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b024a693c23cb21f1efe405ed414bf23f3974f31 
  core/src/main/scala/kafka/tools/PerfConfig.scala 
c72002976d90416559090a665f6494072a6c2dec 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
c95485170fd8b4f5faad740f049e5d09aca8829d 
  core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
6f0addcea64f1e78a4de50ec8135f4d02cebd305 
  core/src/test/scala/unit/kafka/message/MessageTest.scala 
958c1a60069ad85ae20f5c58e74679cd9fa6f70e 

Diff: https://reviews.apache.org/r/26658/diff/


Testing
---

./gradlew test
All tests passed


Thanks,

James Oliver



[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174716#comment-14174716
 ] 

James Oliver commented on KAFKA-1493:
-

Updated reviewboard https://reviews.apache.org/r/26658/diff/
 against branch origin/trunk

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch, KAFKA-1493_2014-10-16_21:25:23.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Oliver updated KAFKA-1493:

Attachment: KAFKA-1493_2014-10-16_21:25:23.patch

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
 KAFKA-1493_2014-10-16_13:49:34.patch, KAFKA-1493_2014-10-16_21:25:23.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)