[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173425#comment-14173425 ] Vladimir Tretyakov commented on KAFKA-1481: --- Hi Jun, thx for juicy feedback, few comments fro me: 1-2. Did it in this way because I didn't want to change existing code a lot, tried got what we need based on current method signatures. I've thought about your approach with Map too, reject it because of larger changes. Now I see that I will do it. 3. Removed 'mBeanName' param, was legacy code. 4. As I've understood from code this method must be called before shutdown in hook, right? We have to remove only metrics related to particular clientId and we shouldn't touch others, right? Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1506) Cancel kafka-reassign-partitions Job
[ https://issues.apache.org/jira/browse/KAFKA-1506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1506: - Labels: newbie++ (was: ) Cancel kafka-reassign-partitions Job -- Key: KAFKA-1506 URL: https://issues.apache.org/jira/browse/KAFKA-1506 Project: Kafka Issue Type: New Feature Components: replication, tools Affects Versions: 0.8.1, 0.8.1.1 Reporter: Paul Lung Assignee: Neha Narkhede Labels: newbie++ I started a reassignment, and for some reason it just takes forever. However, it won¹t let me start another reassignment job while this one is running. So a tool to cancel a reassignment job is needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1506) Cancel kafka-reassign-partitions Job
[ https://issues.apache.org/jira/browse/KAFKA-1506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1506: - Reviewer: Neha Narkhede Assignee: (was: Neha Narkhede) Cancel kafka-reassign-partitions Job -- Key: KAFKA-1506 URL: https://issues.apache.org/jira/browse/KAFKA-1506 Project: Kafka Issue Type: New Feature Components: replication, tools Affects Versions: 0.8.1, 0.8.1.1 Reporter: Paul Lung Labels: newbie++ I started a reassignment, and for some reason it just takes forever. However, it won¹t let me start another reassignment job while this one is running. So a tool to cancel a reassignment job is needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26811: Patch for KAFKA-1196
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26811/ --- Review request for kafka. Bugs: KAFKA-1196 https://issues.apache.org/jira/browse/KAFKA-1196 Repository: kafka Description --- KAFKA-1196 WIP Ensure FetchResponses don't exceed 2GB limit. Diffs - core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala a5386a03b62956bc440b40783247c8cdf7432315 Diff: https://reviews.apache.org/r/26811/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha
[jira] [Commented] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
[ https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173941#comment-14173941 ] Ewen Cheslack-Postava commented on KAFKA-1196: -- Created reviewboard https://reviews.apache.org/r/26811/diff/ against branch origin/trunk java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33 --- Key: KAFKA-1196 URL: https://issues.apache.org/jira/browse/KAFKA-1196 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: running java 1.7, linux and kafka compiled against scala 2.9.2 Reporter: Gerrit Jansen van Vuuren Priority: Blocker Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1196.patch I have 6 topics each with 8 partitions spread over 4 kafka servers. the servers are 24 core 72 gig ram. While consuming from the topics I get an IlegalArgumentException and all consumption stops, the error keeps on throwing. I've tracked it down to FectchResponse.scala line 33 The error happens when the FetchResponsePartitionData object's readFrom method calls: messageSetBuffer.limit(messageSetSize) I put in some debug code the the messageSetSize is 671758648, while the buffer.capacity() gives 155733313, for some reason the buffer is smaller than the required message size. I don't know the consumer code enough to debug this. It doesn't matter if compression is used or not. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
[ https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1196: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33 --- Key: KAFKA-1196 URL: https://issues.apache.org/jira/browse/KAFKA-1196 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: running java 1.7, linux and kafka compiled against scala 2.9.2 Reporter: Gerrit Jansen van Vuuren Assignee: Ewen Cheslack-Postava Priority: Blocker Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1196.patch I have 6 topics each with 8 partitions spread over 4 kafka servers. the servers are 24 core 72 gig ram. While consuming from the topics I get an IlegalArgumentException and all consumption stops, the error keeps on throwing. I've tracked it down to FectchResponse.scala line 33 The error happens when the FetchResponsePartitionData object's readFrom method calls: messageSetBuffer.limit(messageSetSize) I put in some debug code the the messageSetSize is 671758648, while the buffer.capacity() gives 155733313, for some reason the buffer is smaller than the required message size. I don't know the consumer code enough to debug this. It doesn't matter if compression is used or not. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
[ https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173943#comment-14173943 ] Ewen Cheslack-Postava commented on KAFKA-1196: -- This is a wip patch to fix this issue, which previous discussion suggests was due to the FetchResponse exceeding 2GB. My approach to triggering the issue, however, doesn't exhibit exactly the same issue but does cause an unrecoverable error that causes the consumer connection to terminate. (For reference, it causes the server to fail when FetchResponseSend.writeTo calls expectIncomplete and sendSize is negative due to overflow. This confuses the server since it looks like the message is already done sending and the server forcibly closes the consumer's connection.) The patch addresses the core issue by ensuring the returned message doesn't exceed 2GB by dropping parts of it in a way that otherwise shouldn't affect the consumer. But there are a lot of points that still need to be addressed: * I started by building an integration test to trigger the issue, included in PrimitiveApiTest. However, since we necessarily need to have 2GB data to trigger the issue, it's probably too expensive to include in this way. Offline discussion suggests maybe a system test would be a better place to include this. It's still included here for completeness. * The implementation filters to a subset of the data in FetchResponse. The main reason for this is that this process needs to know the exact (or at least conservative estimate) size of serialized data, which only FetchResponse knows. But it's also a bit weird compared to other message classes, which are case classes and don't modify those inputs. * Algorithm for choosing subset to return: initial approach is to remove random elements until we get below the limit. This is simple to understand and avoids starvation of specific TopicAndPartitions. Any concerns with this basic approach? * I'm pretty sure I've managed to keep the 2GB case to effectively the same computational cost (computing the serialized size, grouped data, etc. exactly once as before). However, for the 2GB case I've only ensured correctness. In particular, the progressive removal and reevaluation of serialized size could potentially be very bad for very large data sets (e.g. starting a mirror maker against a large data set with large # of partitions from scratch). * Note that the algorithm never deals with the actual message data, only metadata about what messages are available. This is relevant since this is what suggested the approach in the patch could still be performant -- ReplicaManager.readMessageSets processes the entire FetchRequest and filters it down because the metadata involved is relatively small. * Based on the previous two points, this really needs some more realistic large scale system tests to make sure this approach is not only correct, but provides reasonable performance (or indicates we need to revise the algorithm for selecting a subset of the data). * Testing isn't really complete -- I triggered the issue with 4 topics * 600 MB/topic, which is 2GB. Another obvious case to check is when some partitions contain 2GB on their own. * I'd like someone to help sanity check the exact maximum FetchResponse serialized size we limit messages to. It's not Int.MaxValue because the FetchResponseSend class adds 4 + FetchResponse.sizeInBytes for it's own size. I'd like a sanity check that the extra 4 bytes is enough -- is there any additional wrapping we might need to account for? Getting a test to hit exactly that narrow range could be tricky. * The tests include both immediate-response and purgatory paths, but the purgatory version requires a timeout in the test, which could end up being flaky + wasting time, but it doesn't look like there's a great way to mock that right now. Maybe this doesn't matter if it moves to a system test? * One case this doesn't handle yet is when the data reaches 2GB after it's in the purgatory. The result is correct, but the response is not sent as soon as that condition is satisfied. This is because it looks like evaluating this exactly would require calling readMessageSets and evaluating the size of the message for every DelayedFetch.isSatisifed call. This sounds like it could end up being pretty expensive. Maybe there's a better way, perhaps an approximate scheme? * The test requires some extra bytes in the fetchSize for each partition, presumably for overhead in encoding. I haven't tracked down exactly how big that should be, but I'm guessing it could end up affecting the results of more comprehensive tests. java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33 --- Key: KAFKA-1196 URL:
[jira] [Resolved] (KAFKA-1707) ConsumerOffsetChecker shows none partitions assigned
[ https://issues.apache.org/jira/browse/KAFKA-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-1707. -- Resolution: Won't Fix This is a known issue but there are a few things to check. Did you run the VerifyConsumerRebalance? However, I'd suggest directing such questions to the mailing list first. So I'll go ahead and close this ticket. ConsumerOffsetChecker shows none partitions assigned Key: KAFKA-1707 URL: https://issues.apache.org/jira/browse/KAFKA-1707 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 2.40GHz/1.2e+02GB Reporter: Hari Assignee: Neha Narkhede Labels: patch bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some partitions having none consumers after re-balance triggered due to new consumer joined/disconnected to the group. The lag gets piling up till the partitions are assigned to it usually after another re-balance trigger. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1708) Consumers intermittently stop consuming till restart
[ https://issues.apache.org/jira/browse/KAFKA-1708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-1708. -- Resolution: Won't Fix Same here. Please direct this to the mailing list where people can help out. If we agree that there is a problem, then you can file a JIRA. Consumers intermittently stop consuming till restart Key: KAFKA-1708 URL: https://issues.apache.org/jira/browse/KAFKA-1708 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 2.40GHz/1.2e+02GB Reporter: Hari Assignee: Neha Narkhede Labels: patch Using a simple consumer, and reading messages using StreamIterator noticed that the consumptions suddenly stops and the lag starts building up till the consumer is restarted. Below is the code snippet final MapString, ListKafkaStreambyte[], byte[] streamsByName = consumerConnector.createMessageStreams(topicCountMap); ConsumerIteratorbyte[], byte[] streamIterator = streamsByName.get(topicName).get(IDX_FIRST_ITEM).iterator(); if (streamIterator.hasNext()) { final MessageAndMetadatabyte[], byte[] item = streamIterator.next(); ... } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1710: - Reviewer: Jun Rao Assignee: (was: Jun Rao) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1492) Getting error when sending producer request at the broker end with a single broker
[ https://issues.apache.org/jira/browse/KAFKA-1492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173987#comment-14173987 ] Neha Narkhede commented on KAFKA-1492: -- bq. This seems more appropriate for the Kafka user mailing list or Stack Overflow (apache-kafka tag) rather than JIRA. +1 Getting error when sending producer request at the broker end with a single broker -- Key: KAFKA-1492 URL: https://issues.apache.org/jira/browse/KAFKA-1492 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1 Reporter: sriram Assignee: Jun Rao Tried to run a simple example by sending a message to a single broker . Getting error [2014-06-13 08:35:45,402] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-06-13 08:35:45,440] WARN [KafkaApi-1] Produce request with correlation id 2 from client on partition [samsung,0] failed due to Leader not local for partition [samsung,0] on broker 1 (kafka.server.KafkaApis) [2014-06-13 08:35:45,440] INFO [KafkaApi-1] Send the close connection response due to error handling produce request [clientId = , correlationId = 2, topicAndPartition = [samsung,0]] with Ack=0 (kafka.server.KafkaApis) OS- Windows 7 , JDK 1.7 , Scala 2.10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26770: Patch for KAFKA-1108
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26770/#review56956 --- core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/26770/#comment97399 Should this be WARN instead? ERROR wouldn't be ideal since this operation is retried later. Also wondering if this message actually gives much information about the reason of the failure? It might just print out IOException. I think the reason for failure that people might understand is what might cause the IOException. How about improving the error message by saying that the possible cause for this error could be that the leader movement operation on the controller took longer than than the configured socket.timeout.ms. This will encourage users to inspect if the socket.timeout.ms needs to be bumped up or inspect why the controller is taking long for moving the leaders away from this broker. - Neha Narkhede On Oct. 15, 2014, 6:55 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26770/ --- (Updated Oct. 15, 2014, 6:55 p.m.) Review request for kafka. Bugs: KAFKA-1108 https://issues.apache.org/jira/browse/KAFKA-1108 Repository: kafka Description --- KAFKA-1108 Log IOException messages during controlled shutdown. Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 07c0a078ffa5142441f687da851472da732c3837 Diff: https://reviews.apache.org/r/26770/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1653: - Reviewer: Neha Narkhede Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26666: Patch for KAFKA-1653
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/#review56958 --- Since you fixed some other tools as well, can we also fix the preferred replica election command where we can de-dup the partitions? core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala https://reviews.apache.org/r/2/#comment97400 I think it's worth telling the user which partition's replicas contain duplicates (and include all such partitions instead of one) since typically partition reassignment operation can contain 100s of partitions. - Neha Narkhede On Oct. 13, 2014, 11:57 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- (Updated Oct. 13, 2014, 11:57 p.m.) Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description --- KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. This covers a few cases besides the one identified in the bug. Aside from a major refactoring to use Sets for broker/replica lists, sanitizing user input seems to be the best solution here. I chose to generate errors instead of just using toSet since a duplicate entry may indicate that a different broker id was accidentally omitted. Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174052#comment-14174052 ] Neha Narkhede commented on KAFKA-1476: -- [~balaji.sesha...@dish.com] Thanks for the patch. Few comments- 1. The CONSUMER GROUPS * format is inconsistent with the other tools. For example, kafka-topics --list. Can we please remove it? 2. Currently, your tool only supports the list option. So the topic option is not required. 3. The getConsumerGroups() API is better suited for ZkUtils. Would you mind addressing the other feature requirements as well? Alternately, we can limit this JIRA to list and describe. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API
[ https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174061#comment-14174061 ] Neha Narkhede commented on KAFKA-328: - Thanks for the patch. Would you mind using our patch review tool going forward? It will make it easier to review. 1. Better to use intercept[IllegalStateException] in the test. 2. We should add all relevant test cases mentioned in the description like repeated shutdown Write unit test for kafka server startup and shutdown API -- Key: KAFKA-328 URL: https://issues.apache.org/jira/browse/KAFKA-328 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: BalajiSeshadri Labels: newbie Attachments: KAFKA-328-FORMATTED.patch, KAFKA-328.patch Background discussion in KAFKA-320 People often try to embed KafkaServer in an application that ends up calling startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this works correctly we have to be very careful about cleaning up resources. This is a good practice for making unit tests reliable anyway. A good first step would be to add some unit tests on startup and shutdown to cover various cases: 1. A Kafka server can startup if it is not already starting up, if it is not currently being shutdown, or if it hasn't been already started 2. A Kafka server can shutdown if it is not already shutting down, if it is not currently starting up, or if it hasn't been already shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1654) Provide a way to override server configuration from command line
[ https://issues.apache.org/jira/browse/KAFKA-1654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174101#comment-14174101 ] Neha Narkhede commented on KAFKA-1654: -- [~jarcec] Thanks for the patch. Overall looks good. Few comments 1. The usage command says USAGE: java [options] %s [kafka options] server.properties, but in order for the kafka options to take effect you also need to use --set. 2. If you leave set out, it doesn't error out saying that --set is required and silently does not end up overriding the property value 3. Can we rename set to override? 4. If you specify multiple properties, it is unclear that you need to use --set for each of those. If you don't, it doesn't error out and silently doesn't override it. Provide a way to override server configuration from command line Key: KAFKA-1654 URL: https://issues.apache.org/jira/browse/KAFKA-1654 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1.1 Reporter: Jarek Jarcec Cecho Assignee: Jarek Jarcec Cecho Fix For: 0.8.3 Attachments: KAFKA-1654.patch I've been recently playing with Kafka and I found the current way of server configuration quite inflexible. All the configuration options have to be inside a properties file and there is no way how they can be overridden for execution. In order to temporarily change one property I had to copy the config file and change the property there. Hence, I'm wondering if people would be open to provide a way how to specify and override the configs from the command line when starting Kafka? Something like: {code} ./bin/kafka-server-start.sh -Dmy.cool.property=X kafka.properties {code} or {code} ./bin/kafka-server-start.sh --set my.cool.property=X kafka.properties {code} I'm more than happy to take a stab at it, but I would like to see if there is an interest for such capability? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review56989 --- This is a fairly tricky patch and I'm not 100% sure that we haven't introduced any kind of regression. I'd feel more comfortable accepting this patch, if we repeated the kind of testing that was done to find this bug. core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/26373/#comment97429 if(!partition.makeFollower) core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/26373/#comment97430 Now for partitions that have a leader, we are not adding a follower. - Neha Narkhede On Oct. 13, 2014, 11:38 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 13, 2014, 11:38 p.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174178#comment-14174178 ] Ewen Cheslack-Postava commented on KAFKA-1710: -- This looks like a red herring due to the structure of the test. The test code generates 200 threads which share 4 producers, and each thread round-robins through the consumers, then sleeps for 10ms. It looks like all that's happening is that the profiling tool sees the same stack trace repeatedly because there's a huge amount of contention for the 4 producers. If you take a look at the stack traces, they're almost all waiting on a lock on a queue that the messages get appended to. The few active threads have those queues locked and are working on compressing data before sending it out. Given the number of threads and the small number of producers, it's not surprising that YourKit sees the same stack traces for a long time -- the threads can be making forward progress, but any time the profiler stops to look at the stack traces, it's very likely that any given thread will be waiting on a lock with the same stack trace. None of the stack traces show any evidence of a real deadlock (i.e. I can't find any set of locks where there could be ordering issues since almost every thread is just waiting on a one lock in one of the producers). If this did hit deadlock, the process should stop entirely because all the worker threads use all 4 producers and the supposedly deadlocked threads are all waiting on locks in the producer. I ran the test to completion multiple times without any issues. Unless this has actually been observed to hit deadlock and stop making progress, I think this should be closed since these messages are really just warnings from YourKit. [~Bmis13] you might try reducing the # of threads and seeing if those charts end up looking better. I bet if you actually showed all the threads instead of just the couple in the screenshot, the areas marked as runnable across all threads would sum to a reasonable total. Also, there are other possible issues with getting good performance from this test code, e.g. the round robin approach can cause all threads to get blocked on the same producer if the producer gets locked for a relatively long time. This can happen when data is ready to be sent and is getting compressed. Other approaches to distributing work across the producers may provide better throughput. [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237
Re: Security JIRAS
Thanks, Jay. I¹m new to the project, and I¹m wondering how things proceed from hereŠ are folks working on these tasks, or do they get assigned, orŠ? On 10/7/14, 5:15 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, As promised, I added a tree of JIRAs for the stuff in the security wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Security): https://issues.apache.org/jira/browse/KAFKA-1682 I tried to break it into reasonably standalone pieces. I think many of the tickets could actually be done in parallel. Since there were many people interested in this area this may help parallelize the work a bit. I added some strawman details on implementation to each ticket. We can discuss and refine further on the individual tickets. Please take a look and let me know if this breakdown seems reasonable. Cheers, -Jay
Re: Security JIRAS
Wondering the same here :) I think there are some parallel threads here (SSL is independent of Kerberos, as far as I can see). Kerberos work is blocked on https://issues.apache.org/jira/browse/KAFKA-1683 - Implement a session concept in the socket server. So there's no point in picking up other tasks before this is assigned (and at least designed). I'm looking at Kafka Brokers authentication with ZooKeeper since this looks independent of other tasks. Gwen On Thu, Oct 16, 2014 at 4:23 PM, Michael Herstine mherst...@linkedin.com.invalid wrote: Thanks, Jay. I¹m new to the project, and I¹m wondering how things proceed from hereŠ are folks working on these tasks, or do they get assigned, orŠ? On 10/7/14, 5:15 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, As promised, I added a tree of JIRAs for the stuff in the security wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Security): https://issues.apache.org/jira/browse/KAFKA-1682 I tried to break it into reasonably standalone pieces. I think many of the tickets could actually be done in parallel. Since there were many people interested in this area this may help parallelize the work a bit. I added some strawman details on implementation to each ticket. We can discuss and refine further on the individual tickets. Please take a look and let me know if this breakdown seems reasonable. Cheers, -Jay
Re: Review Request 26658: Patch for KAFKA-1493
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26658/ --- (Updated Oct. 16, 2014, 8:50 p.m.) Review request for kafka. Bugs: KAFKA-1493 https://issues.apache.org/jira/browse/KAFKA-1493 Repository: kafka Description --- KAFKA-1493 Implement LZ4 Frame I/O Streams KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java bf4ed66791b9a502aae6cb2ec7681f42732d9a43 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 5227b2d7ab803389d1794f48c8232350c05b14fd clients/src/main/java/org/apache/kafka/common/record/Compressor.java 0323f5f7032dceb49d820c17a41b78c56591ffc4 clients/src/main/java/org/apache/kafka/common/utils/Utils.java a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 core/src/main/scala/kafka/message/CompressionCodec.scala de0a0fade5387db63299c6b112b3c9a5e41d82ec core/src/main/scala/kafka/message/CompressionFactory.scala 8420e13d0d8680648df78f22ada4a0d4e3ab8758 core/src/main/scala/kafka/tools/ConsoleProducer.scala b024a693c23cb21f1efe405ed414bf23f3974f31 core/src/main/scala/kafka/tools/PerfConfig.scala c72002976d90416559090a665f6494072a6c2dec core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala c95485170fd8b4f5faad740f049e5d09aca8829d core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 6f0addcea64f1e78a4de50ec8135f4d02cebd305 core/src/test/scala/unit/kafka/message/MessageTest.scala 958c1a60069ad85ae20f5c58e74679cd9fa6f70e Diff: https://reviews.apache.org/r/26658/diff/ Testing (updated) --- ./gradlew test All tests passed Thanks, James Oliver
[jira] [Updated] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Oliver updated KAFKA-1493: Attachment: KAFKA-1493_2014-10-16_13:49:34.patch Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174217#comment-14174217 ] James Oliver commented on KAFKA-1493: - Updated reviewboard https://reviews.apache.org/r/26658/diff/ against branch origin/trunk Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Oliver reassigned KAFKA-1493: --- Assignee: James Oliver (was: Ivan Lyutov) Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174222#comment-14174222 ] Ewen Cheslack-Postava commented on KAFKA-1108: -- Updated reviewboard https://reviews.apache.org/r/26770/diff/ against branch origin/trunk when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Ewen Cheslack-Postava Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch, KAFKA-1108_2014-10-16_13:53:11.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1108: - Attachment: KAFKA-1108_2014-10-16_13:53:11.patch when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Ewen Cheslack-Postava Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch, KAFKA-1108_2014-10-16_13:53:11.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26770: Patch for KAFKA-1108
On Oct. 16, 2014, 5:55 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/server/KafkaServer.scala, line 239 https://reviews.apache.org/r/26770/diff/1/?file=722474#file722474line239 Should this be WARN instead? ERROR wouldn't be ideal since this operation is retried later. Also wondering if this message actually gives much information about the reason of the failure? It might just print out IOException. I think the reason for failure that people might understand is what might cause the IOException. How about improving the error message by saying that the possible cause for this error could be that the leader movement operation on the controller took longer than than the configured socket.timeout.ms. This will encourage users to inspect if the socket.timeout.ms needs to be bumped up or inspect why the controller is taking long for moving the leaders away from this broker. The INFO level just matched similar messages a few lines above, although this is a more significant issue than those. Newest patch updates to WARN. Message is also more detailed, but ideally the IOException message also contains more than just the class name. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26770/#review56956 --- On Oct. 16, 2014, 8:53 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26770/ --- (Updated Oct. 16, 2014, 8:53 p.m.) Review request for kafka. Bugs: KAFKA-1108 https://issues.apache.org/jira/browse/KAFKA-1108 Repository: kafka Description --- More informative message and increase log level to warn. Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 07c0a078ffa5142441f687da851472da732c3837 Diff: https://reviews.apache.org/r/26770/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
+1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry commented on KAFKA-1710: --- [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate use case. It would be great if you into alternative to synchronization block. {code} synchronized (dq) { .. } {code} Thanks, Bhavesh [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:32 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you into alternative implementation to synchronization block. That is root of the problem. {code title=RecordAccumulator.java|borderStyle=solid} synchronized (dq) { .. } {code} Do you think it would be better to do this following way ? {code title=KafkaAsyncProducer.java|borderStyle=solid } import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:33 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you into alternative implementation to synchronization block. That is root of the problem. {code title=RecordAccumulator.java|borderStyle=solid} synchronized (dq) { } {code} Do you think it would be better to do this following way ? {code title=KafkaAsyncProducer.java|borderStyle=solid } import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you into alternative implementation to synchronization block. That is root of the problem. synchronized (dq) { } Do you think it would be better to do this following way ? {code title=KafkaAsyncProducer.java|borderStyle=solid} import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){ ProducerRecord record;
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:33 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you into alternative implementation to synchronization block. That is root of the problem. synchronized (dq) { } Do you think it would be better to do this following way ? {code title=KafkaAsyncProducer.java|borderStyle=solid } import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){ ProducerRecord record;
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you into alternative implementation to synchronization block. That is root of the problem. synchronized (dq) { } Do you think it would be better to do this following way ? {code title=KafkaAsyncProducer.java} import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){ ProducerRecord record; try {
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you into alternative implementation to synchronization block. That is root of the problem. synchronized (dq) { } Do you think it would be better to do this following way ? {code} import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){ ProducerRecord record; try {
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:36 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you into alternative implementation to synchronization block.Test code amplifies the root cause. That is root of the problem. synchronized (dq) { } Do you think it would be better to do this following way ? {code} import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){ ProducerRecord record;
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:38 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you can look into alternative implementation to synchronization block. Test code amplifies the root cause. That is root of the problem. synchronized (dq) { } Do you think it would be better to do this following way ? {code} import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){ ProducerRecord record;
[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174305#comment-14174305 ] Jun Rao commented on KAFKA-1493: James, Thanks for the patch. There are a few things marked as todo in the patch. Are those required? Do you think you have time to finish the patch for 0.8.2? Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26666: Patch for KAFKA-1653
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- (Updated Oct. 16, 2014, 9:54 p.m.) Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description (updated) --- Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. Diffs (updated) - core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala c7918483c02040a7cc18d6e9edbd20a3025a3a55 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1653: - Attachment: KAFKA-1653_2014-10-16_14:54:07.patch Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174319#comment-14174319 ] Ewen Cheslack-Postava commented on KAFKA-1653: -- Updated reviewboard https://reviews.apache.org/r/2/diff/ against branch origin/trunk Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26666: Patch for KAFKA-1653
On Oct. 16, 2014, 6:10 p.m., Neha Narkhede wrote: Since you fixed some other tools as well, can we also fix the preferred replica election command where we can de-dup the partitions? This was already removing duplicates, I had it generate an exception instead since duplicates may indicate a config error. I'm assuming that's what you meant here. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/#review56958 --- On Oct. 16, 2014, 9:54 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- (Updated Oct. 16, 2014, 9:54 p.m.) Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description --- Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. Diffs - core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala c7918483c02040a7cc18d6e9edbd20a3025a3a55 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174278#comment-14174278 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:56 PM: - [~ewencp], Thanks for looking into this. If you look at the thread dump, you will see the blocked threads as well. As this particular code exposes the Thread contentions in the Kafka Producer. We have this issues when we aggregate event to send to same partition regardless of number of producers. It would be great if you can look into alternative implementation to synchronization block. Test code amplifies the root cause. That is root of the problem. synchronized (dq) { } Do you think it would be better to do this following way ? {code} import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; public class KafkaAsyncProducer implements Producer { // TODO configure this queue private final LinkedBlockingQueueProducerRecord asyncQueue; private final KafkaProducer producer; private final ListThread threadList; private final CountDownLatch latch; private final AtomicBoolean close = new AtomicBoolean(false); public KafkaAsyncProducer(int capacity, int numberOfDrainTreads, Properties configFile ){ if(configFile == null){ throw new NullPointerException(Producer configuration cannot be null); } // set the capacity for the queue asyncQueue = new LinkedBlockingQueueProducerRecord(capacity); producer = new KafkaProducer(configFile); threadList = new ArrayListThread(numberOfDrainTreads); latch = new CountDownLatch(numberOfDrainTreads); // start the drain threads... for(int i =0 ; i numberOfDrainTreads ; i ++){ Thread th = new Thread(new ConsumerThread(),Kafka_Drain- +i); th.setDaemon(true); threadList.add(th); th.start(); } } public FutureRecordMetadata send(ProducerRecord record) { try { if(record == null){ throw new NullPointerException(Null record cannot be sent.); } if(close.get()){ throw new KafkaException(Producer aready closed or in processec of closing...); } asyncQueue.put(record); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public FutureRecordMetadata send(ProducerRecord record, Callback callback) { throw new UnsupportedOperationException(Send not supported); } public ListPartitionInfo partitionsFor(String topic) { // TODO Auto-generated method stub return null; } public MapString, ? extends Metric metrics() { return producer.metrics(); } public void close() { close.compareAndSet(false, true); // wait for drain threads to finish try { latch.await(); // now drain the remaining messages while(!asyncQueue.isEmpty()){ ProducerRecord record = asyncQueue.poll(); producer.send(record); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.close(); } private class ConsumerThread implements Runnable{ public void run() { try{ while(!close.get()){ ProducerRecord record;
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
+1 on doing an 0.8.2 beta. Guozhang, kafka-1583 is relatively large. Given that we are getting close to releasing 0.8.2 beta, my feeling is that we probably shouldn't include it in 0.8.2 beta even if we can commit it in a few days. Thanks, Jun On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote: Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174356#comment-14174356 ] James Oliver commented on KAFKA-1493: - Jun, My pleasure. The TODOs are parts of the specification that are unimplemented, but are not required. I left them in there as hints if/when the spec is contributed back to lz4-java. The validation routines will disallow the use of any portion of the spec that is unimplemented, but it's totally usable. What the spec can do - compress decompress messages using 64kb/256kb/1mb/4mb blockSize (64kb by default) with optional block checksums (disabled by default) What the spec cannot do - decompress messages compressed by an implementation supporting some of the missing features. If this were to occur, a RuntimeException with detailed information will be thrown. Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174356#comment-14174356 ] James Oliver edited comment on KAFKA-1493 at 10/16/14 10:23 PM: Jun, My pleasure. The TODOs are parts of the specification that are unimplemented, but are not required. I left them in there as hints if/when the spec is contributed back to lz4-java. The validation routines will disallow the use of any portion of the spec that is unimplemented, but it's totally usable. What the spec can do - compress decompress messages using 64kb/256kb/1mb/4mb blockSize (64kb by default) with optional block checksums (disabled by default) What the spec cannot do - decompress messages compressed by a more advanced implementation, using one or more of the missing features. If this were to occur, a RuntimeException with detailed information will be thrown. was (Author: joliver): Jun, My pleasure. The TODOs are parts of the specification that are unimplemented, but are not required. I left them in there as hints if/when the spec is contributed back to lz4-java. The validation routines will disallow the use of any portion of the spec that is unimplemented, but it's totally usable. What the spec can do - compress decompress messages using 64kb/256kb/1mb/4mb blockSize (64kb by default) with optional block checksums (disabled by default) What the spec cannot do - decompress messages compressed by an implementation supporting some of the missing features. If this were to occur, a RuntimeException with detailed information will be thrown. Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
Agree. On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote: +1 on doing an 0.8.2 beta. Guozhang, kafka-1583 is relatively large. Given that we are getting close to releasing 0.8.2 beta, my feeling is that we probably shouldn't include it in 0.8.2 beta even if we can commit it in a few days. Thanks, Jun On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote: Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174356#comment-14174356 ] James Oliver edited comment on KAFKA-1493 at 10/16/14 10:30 PM: Jun, My pleasure. The TODOs are parts of the specification that are unimplemented, but are not required. I left them in there as hints if/when the spec is contributed back to lz4-java. The validation routines will disallow the use of any portion of the spec that is unimplemented, but it's totally usable. What the spec can do - compress decompress messages using 64kb/256kb/1mb/4mb blockSize (64kb by default) with optional block checksums (disabled by default) What the spec cannot do - decompress messages compressed by a more advanced implementation, using one or more of the missing features. If this were to occur, a RuntimeException with detailed information will be thrown. EDIT: I can take out the TODOs if you think it causes confusion was (Author: joliver): Jun, My pleasure. The TODOs are parts of the specification that are unimplemented, but are not required. I left them in there as hints if/when the spec is contributed back to lz4-java. The validation routines will disallow the use of any portion of the spec that is unimplemented, but it's totally usable. What the spec can do - compress decompress messages using 64kb/256kb/1mb/4mb blockSize (64kb by default) with optional block checksums (disabled by default) What the spec cannot do - decompress messages compressed by a more advanced implementation, using one or more of the missing features. If this were to occur, a RuntimeException with detailed information will be thrown. Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174373#comment-14174373 ] Ewen Cheslack-Postava commented on KAFKA-1710: -- [~Bmis13] That approach just pushes the problem into KafkaAsyncProducer's thread that processes messages -- there won't be lock contention in KafkaProducer since KafkaAsyncProducer will be the only user of it, but you may not get an improvement in throughput because ultimately you're limited to the time a single thread can get. It may even get *slower* because you'll have more runnable threads at any given time, which means that the KafkaAsyncProducer worker thread will get less CPU time. Even disregarding that, since you used a LinkedBlockingQueue that will become your new source of contention (since it must be synchronized internally). If you have a very large capacity, that'll let the threads continue to make progress and contention will be lower since the time spent adding an item is very small, but it will cost a lot of memory since you're just adding a layer of buffering. That might be useful if you have bursty traffic (the buffer allows you to temporarily buffer more data while the KafkaProducer works on getting it sent), but if you have sustained traffic you'll just have constantly growing memory usage. If the capacity is small, then the threads producing messages will eventually end up getting blocked waiting for there to be space in the queue. Probably the biggest issue here is that this test only writes to a single partition in a single topic. You could improve performance by using more partitions in that topic. You're already writing to all producers from all threads, so you must not need the ordering guarantees of a single partition. If you still want a single partition, you can improve performance by using more Producers, which will spread the contention across more queues. Since you already have 4 that you're running round-robin on, I'd guess adding more shouldn't be a problem. In any case, this use case seems a bit odd. Are you really going to have 200 threads generating messages *as fast as they can* with only 4 producers? As far as this issue is concerned, the original report said the problem was deadlock but that doesn't seem to be the case. If you're just worried about performance, it probably makes more sense to move the discussion over to the mailing list. It'll probably be seen by more people and there will probably be multiple suggestions for improvements to your approach before we have to make changes to the Kafka code. [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType,
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
+1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2 so that will only go out with 0.8.3. Joel On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote: Agree. On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote: +1 on doing an 0.8.2 beta. Guozhang, kafka-1583 is relatively large. Given that we are getting close to releasing 0.8.2 beta, my feeling is that we probably shouldn't include it in 0.8.2 beta even if we can commit it in a few days. Thanks, Jun On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote: Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174403#comment-14174403 ] Bhavesh Mistry commented on KAFKA-1710: --- [~ewencp], Thanks for the looking into this issue. We consume as fast as we can re-publish the message to another aggregated topic based on some kes in message. We see thread contentions in profile tool and I separated out the code and to amplify the problem. We run with about 75 threads. [~ewencp] can you please discuss this issue with Kafka Community as well ? The dead lock will occur something depending on Thread scheduling and how log the are blocked. All I am asking is there a better way to enqueue in coming messages. I just proposed simple above solution that does not impact application threads and only drain threads will be blocked and with buffer as you mentioned we might get better through-put (of course at expense of buffered memory (unbounded concurrent queue) and thread context switching) .If you feel this is know performance issue to send to to single partition then please close this, and you may start discussion on Kafka Community for this issue. Thanks for your help and suggestions !! According to thread dumps, blocks are happening in Synchronization block. {code} pool-1-thread-200 prio=5 tid=0x7f92451c2000 nid=0x20103 waiting for monitor entry [0x00012d228000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:139) - waiting to lock 0x000703ce39f0 (a java.util.ArrayDeque) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:238) at org.kafka.test.TestNetworkDownProducer$MyProducer.run(TestNetworkDownProducer.java:85) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) pool-1-thread-199 prio=5 tid=0x7f92451c1800 nid=0x1ff03 waiting for monitor entry [0x00012d0e5000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:139) - waiting to lock 0x000703ce39f0 (a java.util.ArrayDeque) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:238) at org.kafka.test.TestNetworkDownProducer$MyProducer.run(TestNetworkDownProducer.java:85) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) {code} [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run()
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174462#comment-14174462 ] Bhavesh Mistry commented on KAFKA-1642: --- [~jkreps], Did you get chance to re-produce the problem ? Has someone else reported this issues or similar issue ? Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26658: Patch for KAFKA-1493
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26658/#review57005 --- Thanks for the patch. Looks good overall. Some minor comments below. clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java https://reviews.apache.org/r/26658/#comment97485 Should this be private? clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java https://reviews.apache.org/r/26658/#comment97505 Should we flush after writing the end mark? clients/src/main/java/org/apache/kafka/common/record/Compressor.java https://reviews.apache.org/r/26658/#comment97444 Unused import. core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala https://reviews.apache.org/r/26658/#comment97526 How about we make this 2000 so that we can test compression on more than 64KB, which is the default block size of lz4? core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala https://reviews.apache.org/r/26658/#comment97527 How about also adding the following properties to test out compressing more than 64KB? props.put(ProducerConfig.BATCH_SIZE_CONFIG, 66000) props.put(ProducerConfig.LINGER_MS_CONFIG, 200) - Jun Rao On Oct. 16, 2014, 8:50 p.m., James Oliver wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26658/ --- (Updated Oct. 16, 2014, 8:50 p.m.) Review request for kafka. Bugs: KAFKA-1493 https://issues.apache.org/jira/browse/KAFKA-1493 Repository: kafka Description --- KAFKA-1493 Implement LZ4 Frame I/O Streams KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java bf4ed66791b9a502aae6cb2ec7681f42732d9a43 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 5227b2d7ab803389d1794f48c8232350c05b14fd clients/src/main/java/org/apache/kafka/common/record/Compressor.java 0323f5f7032dceb49d820c17a41b78c56591ffc4 clients/src/main/java/org/apache/kafka/common/utils/Utils.java a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 core/src/main/scala/kafka/message/CompressionCodec.scala de0a0fade5387db63299c6b112b3c9a5e41d82ec core/src/main/scala/kafka/message/CompressionFactory.scala 8420e13d0d8680648df78f22ada4a0d4e3ab8758 core/src/main/scala/kafka/tools/ConsoleProducer.scala b024a693c23cb21f1efe405ed414bf23f3974f31 core/src/main/scala/kafka/tools/PerfConfig.scala c72002976d90416559090a665f6494072a6c2dec core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala c95485170fd8b4f5faad740f049e5d09aca8829d core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 6f0addcea64f1e78a4de50ec8135f4d02cebd305 core/src/test/scala/unit/kafka/message/MessageTest.scala 958c1a60069ad85ae20f5c58e74679cd9fa6f70e Diff: https://reviews.apache.org/r/26658/diff/ Testing --- ./gradlew test All tests passed Thanks, James Oliver
[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174473#comment-14174473 ] Jun Rao commented on KAFKA-1493: James, Thanks for the answer. We can leave the TODOs there. The patch looks good to me. Could you look at the comments in the RB? Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 24676: Rebase KAFKA-1583
On Oct. 16, 2014, 1:29 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 167 https://reviews.apache.org/r/24676/diff/9/?file=720184#file720184line167 Should replica manager be offset manager? This is replica manager actually, when it tries to write the commit message to the local log. I have changed the comment a bit to make it more clear. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review56843 --- On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 14, 2014, 2:42 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 17, 2014, 4:15 a.m.) Review request for kafka. Summary (updated) - Fix KAFKA-1583 Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Incoporate Jun's comments after rebase Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174709#comment-14174709 ] Guozhang Wang commented on KAFKA-1583: -- Updated reviewboard https://reviews.apache.org/r/24676/diff/ against branch origin/trunk Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26658: Patch for KAFKA-1493
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26658/ --- (Updated Oct. 17, 2014, 4:25 a.m.) Review request for kafka. Bugs: KAFKA-1493 https://issues.apache.org/jira/browse/KAFKA-1493 Repository: kafka Description (updated) --- KAFKA-1493 Implement LZ4 Frame I/O Streams KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction KAFKA-1493 Flush stream after writing frame end mark KAFKA-1493 Remove unused import KAFKA-1493 Move finish() logic into close() KAFKA-1493 Modify test cases to compress a 64kb message to test multi-block lz4 frame compression/decompression Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java bf4ed66791b9a502aae6cb2ec7681f42732d9a43 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 5227b2d7ab803389d1794f48c8232350c05b14fd clients/src/main/java/org/apache/kafka/common/record/Compressor.java 0323f5f7032dceb49d820c17a41b78c56591ffc4 clients/src/main/java/org/apache/kafka/common/utils/Utils.java a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 core/src/main/scala/kafka/message/CompressionCodec.scala de0a0fade5387db63299c6b112b3c9a5e41d82ec core/src/main/scala/kafka/message/CompressionFactory.scala 8420e13d0d8680648df78f22ada4a0d4e3ab8758 core/src/main/scala/kafka/tools/ConsoleProducer.scala b024a693c23cb21f1efe405ed414bf23f3974f31 core/src/main/scala/kafka/tools/PerfConfig.scala c72002976d90416559090a665f6494072a6c2dec core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala c95485170fd8b4f5faad740f049e5d09aca8829d core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 6f0addcea64f1e78a4de50ec8135f4d02cebd305 core/src/test/scala/unit/kafka/message/MessageTest.scala 958c1a60069ad85ae20f5c58e74679cd9fa6f70e Diff: https://reviews.apache.org/r/26658/diff/ Testing --- ./gradlew test All tests passed Thanks, James Oliver
[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174716#comment-14174716 ] James Oliver commented on KAFKA-1493: - Updated reviewboard https://reviews.apache.org/r/26658/diff/ against branch origin/trunk Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch, KAFKA-1493_2014-10-16_21:25:23.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Oliver updated KAFKA-1493: Attachment: KAFKA-1493_2014-10-16_21:25:23.patch Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch, KAFKA-1493_2014-10-16_13:49:34.patch, KAFKA-1493_2014-10-16_21:25:23.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)