Re: Review Request 31850: Patch for KAFKA-1660
On April 20, 2015, 5:30 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 526 https://reviews.apache.org/r/31850/diff/9/?file=931818#file931818line526 I cleaned up this javadoc a little bit to try to simplify things. With docs it is always hard to get the level of focus right so that it has the essential information but doesn't overload the user and obscure the primary thing. 1. Changed messages to records (that is the terminology in the new clients). 2. I significantly shortened the section on calling from within a callback. I think only 0.1% of people would ever consider this. 3. I cannot think of a reason why the user would care if the I/O thread is synchronously shutdown or not, and we don't make any promises one way or the other in the main close method, so let's just leave that bit out. What do you think of this: /** * This method waits up to codetimeout/code for the producer to complete the sending of all incomplete requests. * p * If the producer is unable to complete all requests before the timeout expires, this method will fail * any unsent and unacknowledged records immediately. * p * If invoked from within a {@link Callback} this method will not block and will be equivalent to codeclose(0, TimeUnit.MILLISECONDS)/code. This is done since no further sending will happen while blocking the I/O thread of the producer. * * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be *non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete. * @param timeUnit The time unit for the codetimeout/code * @throws InterruptException If the thread is interrupted while blocked * @throws IllegalArgumentException If the codetimeout/code is negative. */ Looks good and it is easier to understand from user point of view. Thanks for cleaning this up. On April 20, 2015, 5:30 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 157 https://reviews.apache.org/r/31850/diff/9/?file=931821#file931821line157 Read locks are very expensive. I am pretty worried about this. If we want to do this we need to do a pretty detailed examination of the perf impact. Hi Jay, I looked into the ReentrantReaderWriterLock implementation and it seems under the hood it uses CompareAndSet which should provide similar performance as atomic integer. But I agree this definitely largely depends on implementation. I modified o.a.k.clients.tools.ProducerPerformance a little bit to make it multiple threaded. The performance in following tests settings are very similar which are all ~1M messages/second when target is 10M message/sec. 1. 10 thread with latest trunk 2. 10 threads using atomic integer AtomicInteger 3. 10 threads using ReaderWriterLock When I increase the thread number to 50. It drops to about 0.82M messages/second in all cases. It seems reader lock did not introduce performance issue. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review80753 --- On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/ --- (Updated April 16, 2015, 6:35 p.m.) Review request for kafka. Bugs: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Repository: kafka Description --- A minor fix. Incorporated Guozhang's comments. Modify according to the latest conclusion. Patch for the finally passed KIP-15git status Addressed Joel and Guozhang's comments. rebased on trunk Rebase on trunk Addressed Joel's comments. Addressed Joel's comments Addressed Jay's comments Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Re: [DISCUSS] KIP-11- Authorization design for kafka security
I admit that I'm also not sure what we gain by having deny rules. A user is either in the allow list (one way or another) or it will be denied by default. I think this is something we can skip for now and add later if needed? On Mon, Apr 20, 2015 at 5:24 PM, Jun Rao j...@confluent.io wrote: According to the pseudo code, if you have a rule deny user1, then it essentially denies all users? Thanks, Jun On Mon, Apr 20, 2015 at 5:16 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Here is a pseudo code that explains my current approach: acls = authorizer.getAcl(resource) if(acls == null || acls.isEmpty) { allow all requests for backward compatibility. (any topics that were created prior to security support will not have acls) This is debatable , generally we should block everyone which is what I would prefer but that means anyone moving to authorizer must go to all of his existing topics and add acl to allow all. If we are fine with imposing this requirement I can start returning deny when no acls are found. } else { //So the user has set some acls explicitly, this means they have knowingly enabled authorizer. Let’t first check if they have set an Acl to deny this user/host/operation combination. if some acl denies this request for this principal/host/operation combination , return deny //this principal/host/operation does not have any explicit deny acl, check if there is some explicit acl that allows the operation if at least one acl allows this request for this principal/host/operation combination , return allow // no acl was found for this principal/host/operation combination to allow this operation, so we will deny the request return deny } Thanks Parth On 4/20/15, 2:21 PM, Jun Rao j...@confluent.io wrote: Hmm, I thought the semantics is that if you only have rule deny user2, it means that everyone except user2 has access? Thanks, Jun On Mon, Apr 20, 2015 at 3:25 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: user3 does not have access and removing the deny rule does not grant him or user2 access. user2 even without the deny rule will not have access. Thanks Parth On 4/20/15, 12:03 PM, Jun Rao j...@confluent.io wrote: Just a followup question. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Thanks, Jun On Mon, Apr 20, 2015 at 1:34 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Joel, Thanks for the review and I plan to update the KIP today with all the updated info. My comments in line below. Thanks Parth On 4/20/15, 10:07 AM, Joel Koshy jjkosh...@gmail.commailto: jjkosh...@gmail.com wrote: Hi Parth, Nice work on this KIP. I did another read through and had a few more comments (with edits after I went through the thread). Many of these comments were brought up by others as well, so it appears that the KIP would benefit from an update at this point to incorporate comments from the thread and last hangout. - The operation enum is mostly self-explanatory, but it would help (for the sake of clarity and completeness if nothing else) to document exactly what each of the enums are. E.g., I think this came up in our hangout - SEND_CONTROL_MESSAGE is unclear and I don't remember what was said about it. Edit: After going through the thread it seems the conclusion was to categorize operations. E.g., WRITE could apply to multiple requests. Again, this is unclear, so if it would be great if you could update the KIP to clarify what you intend. Will add to document. SEND_CONTROL_MESSAGE Probably a very bad name but these are intra borker API calls like controller notifying other brokers to update metadata or heartbeats. Any better naming suggestions? - When you update the KIP to categorize the requests it would also help to have a column for what the resource is for each. Will add to the KIP. - FWIW I prefer a 1-1 mapping between requests and operations. I think categorizing requests into these can be confusing because: - The resource being protected for different requests will be different. We are mostly thinking about topics (read/write) but there are requests for which topic is not the right resource. E.g., for topic creation, the resource as you suggested would be something global/common such as “cluster”. For OffsetCommit/FetchRequest, the resource may be the consumer group, or maybe a tuple of consumer group, topic. So this can be confusing - i.e., different resources and request types in the same category. It may be simpler and clearer to just have a 1-1 mapping between the operation enum and requests.
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline.
RE: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
It works, many thanks. Thanks, Honghai From: Joe Stein [mailto:joe.st...@stealth.ly] Sent: Tuesday, April 21, 2015 9:24 AM To: Honghai Chen Cc: dev@kafka.apache.org Subject: Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals give it a try now ~ Joe Stein - - - - - - - - - - - - - - - - - [https://docs.google.com/uc?export=downloadid=0B3rS2kftp470b19EQXp0Q2JheVErevid=0B3rS2kftp470aFhGdzZqMnUwT3M0MTlsZU8zZjZobGFuNFdrPQ] http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Apr 20, 2015 at 9:22 PM, Honghai Chen honghai.c...@microsoft.commailto:honghai.c...@microsoft.com wrote: Username: waldenchen Email:waldenc...@163.commailto:email%3awaldenc...@163.com Thanks, Honghai -Original Message- From: Joe Stein [mailto:joe.st...@stealth.lymailto:joe.st...@stealth.ly] Sent: Tuesday, April 21, 2015 9:19 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals What is your confluence username? ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Apr 20, 2015 at 9:15 PM, Honghai Chen honghai.c...@microsoft.commailto:honghai.c...@microsoft.com wrote: Hi dear dev, Need create on KIP with title “Add one configuration log.preallocate” for https://issues.apache.org/jira/browse/KAFKA-1646 But can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pr oposals The below picture is what I see. Can you see it? Is there any where can get the permission or setting? Thanks, Honghai
[jira] [Updated] (KAFKA-1910) Refactor KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1910: -- Resolution: Fixed Status: Resolved (was: Patch Available) The follow-up has been committed Refactor KafkaConsumer -- Key: KAFKA-1910 URL: https://issues.apache.org/jira/browse/KAFKA-1910 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.3 Attachments: KAFKA-1910.patch, KAFKA-1910.patch, KAFKA-1910.patch, KAFKA-1910_2015-03-05_14:55:33.patch KafkaConsumer now contains all the logic on the consumer side, making it a very huge class file, better re-factoring it to have multiple layers on top of KafkaClient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-1595: -- Assignee: Ismael Juma Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount - Key: KAFKA-1595 URL: https://issues.apache.org/jira/browse/KAFKA-1595 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Jagbir Assignee: Ismael Juma Labels: newbie Fix For: 0.8.3 The following issue is created as a follow up suggested by Jun Rao in a kafka news group message with the Subject Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount SUMMARY: An issue was detected in a typical cluster of 3 kafka instances backed by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, java version 1.7.0_65). On consumer end, when consumers get recycled, there is a troubling JSON parsing recursion which takes a busy lock and blocks consumers thread pool. In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes a global lock (0xd3a7e1d0) during the rebalance, and fires an expensive JSON parsing, while keeping the other consumers from shutting down, see, e.g, at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) The deep recursive JSON parsing should be deprecated in favor of a better JSON parser, see, e.g, http://engineering.ooyala.com/blog/comparing-scala-json-libraries? DETAILS: The first dump is for a recursive blocking thread holding the lock for 0xd3a7e1d0 and the subsequent dump is for a waiting thread. (Please grep for 0xd3a7e1d0 to see the locked object.) Â -8- Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] java.lang.Thread.State: RUNNABLE at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at
[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504095#comment-14504095 ] Ismael Juma commented on KAFKA-1595: Since there were no objections in the mailing list thread about removing 2.9.x support for the next release, I submitted to reviewboard a possible implementation if support for 2.9.x is dropped. I would be interested in feedback. If we want to keep support for 2.9.x, we would have to use older versions of the libraries, which is more work, but it could be done. `testAll` succeeded eventually (it seems like some tests that rely on timings can sometimes fail). It is a bit sad that reviewboard loses track of the individual commits in the branch, making review harder. I also created a PR in GitHub that doesn't have that problem (I will close it once it's no longer relevant): https://github.com/apache/kafka/pull/55 Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount - Key: KAFKA-1595 URL: https://issues.apache.org/jira/browse/KAFKA-1595 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Jagbir Assignee: Ismael Juma Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1595.patch The following issue is created as a follow up suggested by Jun Rao in a kafka news group message with the Subject Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount SUMMARY: An issue was detected in a typical cluster of 3 kafka instances backed by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, java version 1.7.0_65). On consumer end, when consumers get recycled, there is a troubling JSON parsing recursion which takes a busy lock and blocks consumers thread pool. In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes a global lock (0xd3a7e1d0) during the rebalance, and fires an expensive JSON parsing, while keeping the other consumers from shutting down, see, e.g, at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) The deep recursive JSON parsing should be deprecated in favor of a better JSON parser, see, e.g, http://engineering.ooyala.com/blog/comparing-scala-json-libraries? DETAILS: The first dump is for a recursive blocking thread holding the lock for 0xd3a7e1d0 and the subsequent dump is for a waiting thread. (Please grep for 0xd3a7e1d0 to see the locked object.) Â -8- Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] java.lang.Thread.State: RUNNABLE at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi Jun, I am using the underlying protocol GSS-API that sasl also uses. I can add details about LDAP/AD . For AD , this is in general the integration of AD to KERBEROS. I.e kerberos can talk to AD to get the kinit login credentials ( more of a setup details between kerberos and AD) . For LDAP GSS-API allows you to do DIGEST auth as well. I’ll add the details regarding both of these. For SSL support I’ll add the details to the same KIP as they both extend the same Channel and share some of the implementation. Thanks, Harsha On April 20, 2015 at 12:31:12 PM, Jun Rao (j...@confluent.io) wrote: Hi, Harsha, For SASL, a common use case is the integration with LDAP/AD. For completeness, could you describe (or provide a link) how such integration can be done? Also, what about the SSL support, do you plan to describe it in same same KIP or a separate one? Thanks, Jun On Mon, Apr 20, 2015 at 12:42 PM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, I updated the KIP-12 with more details. Please take a look https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 Thanks, Harsha On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote: Thanks Joe. It will be part of KafkaServer and will run on its own thread. Since each kafka server will run with a keytab we should make sure they are all getting renewed. On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote: Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Here is a pseudo code that explains my current approach: acls = authorizer.getAcl(resource) if(acls == null || acls.isEmpty) { allow all requests for backward compatibility. (any topics that were created prior to security support will not have acls) This is debatable , generally we should block everyone which is what I would prefer but that means anyone moving to authorizer must go to all of his existing topics and add acl to allow all. If we are fine with imposing this requirement I can start returning deny when no acls are found. } else { //So the user has set some acls explicitly, this means they have knowingly enabled authorizer. Let’t first check if they have set an Acl to deny this user/host/operation combination. if some acl denies this request for this principal/host/operation combination , return deny //this principal/host/operation does not have any explicit deny acl, check if there is some explicit acl that allows the operation if at least one acl allows this request for this principal/host/operation combination , return allow // no acl was found for this principal/host/operation combination to allow this operation, so we will deny the request return deny } Thanks Parth On 4/20/15, 2:21 PM, Jun Rao j...@confluent.io wrote: Hmm, I thought the semantics is that if you only have rule deny user2, it means that everyone except user2 has access? Thanks, Jun On Mon, Apr 20, 2015 at 3:25 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: user3 does not have access and removing the deny rule does not grant him or user2 access. user2 even without the deny rule will not have access. Thanks Parth On 4/20/15, 12:03 PM, Jun Rao j...@confluent.io wrote: Just a followup question. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Thanks, Jun On Mon, Apr 20, 2015 at 1:34 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Joel, Thanks for the review and I plan to update the KIP today with all the updated info. My comments in line below. Thanks Parth On 4/20/15, 10:07 AM, Joel Koshy jjkosh...@gmail.commailto: jjkosh...@gmail.com wrote: Hi Parth, Nice work on this KIP. I did another read through and had a few more comments (with edits after I went through the thread). Many of these comments were brought up by others as well, so it appears that the KIP would benefit from an update at this point to incorporate comments from the thread and last hangout. - The operation enum is mostly self-explanatory, but it would help (for the sake of clarity and completeness if nothing else) to document exactly what each of the enums are. E.g., I think this came up in our hangout - SEND_CONTROL_MESSAGE is unclear and I don't remember what was said about it. Edit: After going through the thread it seems the conclusion was to categorize operations. E.g., WRITE could apply to multiple requests. Again, this is unclear, so if it would be great if you could update the KIP to clarify what you intend. Will add to document. SEND_CONTROL_MESSAGE Probably a very bad name but these are intra borker API calls like controller notifying other brokers to update metadata or heartbeats. Any better naming suggestions? - When you update the KIP to categorize the requests it would also help to have a column for what the resource is for each. Will add to the KIP. - FWIW I prefer a 1-1 mapping between requests and operations. I think categorizing requests into these can be confusing because: - The resource being protected for different requests will be different. We are mostly thinking about topics (read/write) but there are requests for which topic is not the right resource. E.g., for topic creation, the resource as you suggested would be something global/common such as “cluster”. For OffsetCommit/FetchRequest, the resource may be the consumer group, or maybe a tuple of consumer group, topic. So this can be confusing - i.e., different resources and request types in the same category. It may be simpler and clearer to just have a 1-1 mapping between the operation enum and requests. I only see 2 resource categories right now cluster and topic. I don’t really care one way or another so we can probably make a quick decision in tomorrow’s meeting to either to 1-1 mapping or have categorization? - Some requests that are intuitively READ have WRITE side-effects. E.g., (currently) TopicMetadataRequest with auto-create, although that will eventually go away. ConsumerMetadataRequest still auto-creates the offsets topic. Likewise, ADMIN-type requests may be interpreted as having side-effects (depending on who you ask). Yes and what I am doing right now
[jira] [Commented] (KAFKA-2035) Add a topic config cache.
[ https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503888#comment-14503888 ] Parth Brahmbhatt commented on KAFKA-2035: - Makes sense. I will close this jira and add 2 jiras , 1 to add owners and 1 for top level Topic instance. Add a topic config cache. - Key: KAFKA-2035 URL: https://issues.apache.org/jira/browse/KAFKA-2035 Project: Kafka Issue Type: Task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2035_2015-03-31_10:52:12.patch Currently the topic config is all about Log configuration so we have a TopicConfigManager which takes in a Log instance and keeps updating that instance's config instance as and when the topic config is updated. The topic config update notifications are sent using zk watchers by Controller. I propose to introduce a TopicConfigCache which will be updated by TopicConfigManager on any config changes. The log instance and any other component (like the authorizer mentioned in KAFKA-1688) will have a reference to TopicConfigCache using which they will access the topic configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya Auradkar updated KAFKA-2136: --- Reviewer: Joel Koshy Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2136) Client side protocol changes to return quota delays
Aditya Auradkar created KAFKA-2136: -- Summary: Client side protocol changes to return quota delays Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated April 21, 2015, 12:02 a.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description --- Changes are: - protocol changes to the fetch reuqest and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases For now the patch will publish a zero delay and return a response Added more tests Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 9c4518e840904c371a5816bfc52be1933cba0b96 clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala b4004aa3a1456d337199aa1245fb0ae61f6add46 core/src/main/scala/kafka/server/OffsetManager.scala 420e2c3535e722c503f13d093849469983f6f08d core/src/main/scala/kafka/server/ReplicaManager.scala 8ddd325015de4245fd2cf500d8b0e8c1fd2bc7e8 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 566b5381665bb027a06e17c4fc27414943f85220 core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 9186c90de5a983a73b042fcb42987bfabae14fcf core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing (updated) --- New tests added Thanks, Aditya Auradkar
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated April 21, 2015, 12:02 a.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description (updated) --- Changes are: - protocol changes to the fetch request and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases For now the patch will publish a zero delay and return a response Added more tests Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 9c4518e840904c371a5816bfc52be1933cba0b96 clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala b4004aa3a1456d337199aa1245fb0ae61f6add46 core/src/main/scala/kafka/server/OffsetManager.scala 420e2c3535e722c503f13d093849469983f6f08d core/src/main/scala/kafka/server/ReplicaManager.scala 8ddd325015de4245fd2cf500d8b0e8c1fd2bc7e8 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 566b5381665bb027a06e17c4fc27414943f85220 core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 9186c90de5a983a73b042fcb42987bfabae14fcf core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
Hi dear dev, Need create on KIP with title Add one configuration log.preallocate for https://issues.apache.org/jira/browse/KAFKA-1646 But can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals The below picture is what I see. Can you see it? Is there any where can get the permission or setting? [cid:image001.png@01D07C13.B5CFED30] Thanks, Honghai
[jira] [Updated] (KAFKA-2035) Add a topic config cache.
[ https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-2035: - Status: In Progress (was: Patch Available) [~parth.brahmbhatt] Unless you object I'd prefer to either figure out the desired end state with respect to topic's and build towards that or else leave it as is and just add this to the log config. Either way we can file that Topic refactoring JIRA even if it just as a placeholder for thoughts on the subject. Add a topic config cache. - Key: KAFKA-2035 URL: https://issues.apache.org/jira/browse/KAFKA-2035 Project: Kafka Issue Type: Task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2035_2015-03-31_10:52:12.patch Currently the topic config is all about Log configuration so we have a TopicConfigManager which takes in a Log instance and keeps updating that instance's config instance as and when the topic config is updated. The topic config update notifications are sent using zk watchers by Controller. I propose to introduce a TopicConfigCache which will be updated by TopicConfigManager on any config changes. The log instance and any other component (like the authorizer mentioned in KAFKA-1688) will have a reference to TopicConfigCache using which they will access the topic configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
According to the pseudo code, if you have a rule deny user1, then it essentially denies all users? Thanks, Jun On Mon, Apr 20, 2015 at 5:16 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Here is a pseudo code that explains my current approach: acls = authorizer.getAcl(resource) if(acls == null || acls.isEmpty) { allow all requests for backward compatibility. (any topics that were created prior to security support will not have acls) This is debatable , generally we should block everyone which is what I would prefer but that means anyone moving to authorizer must go to all of his existing topics and add acl to allow all. If we are fine with imposing this requirement I can start returning deny when no acls are found. } else { //So the user has set some acls explicitly, this means they have knowingly enabled authorizer. Let’t first check if they have set an Acl to deny this user/host/operation combination. if some acl denies this request for this principal/host/operation combination , return deny //this principal/host/operation does not have any explicit deny acl, check if there is some explicit acl that allows the operation if at least one acl allows this request for this principal/host/operation combination , return allow // no acl was found for this principal/host/operation combination to allow this operation, so we will deny the request return deny } Thanks Parth On 4/20/15, 2:21 PM, Jun Rao j...@confluent.io wrote: Hmm, I thought the semantics is that if you only have rule deny user2, it means that everyone except user2 has access? Thanks, Jun On Mon, Apr 20, 2015 at 3:25 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: user3 does not have access and removing the deny rule does not grant him or user2 access. user2 even without the deny rule will not have access. Thanks Parth On 4/20/15, 12:03 PM, Jun Rao j...@confluent.io wrote: Just a followup question. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Thanks, Jun On Mon, Apr 20, 2015 at 1:34 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Joel, Thanks for the review and I plan to update the KIP today with all the updated info. My comments in line below. Thanks Parth On 4/20/15, 10:07 AM, Joel Koshy jjkosh...@gmail.commailto: jjkosh...@gmail.com wrote: Hi Parth, Nice work on this KIP. I did another read through and had a few more comments (with edits after I went through the thread). Many of these comments were brought up by others as well, so it appears that the KIP would benefit from an update at this point to incorporate comments from the thread and last hangout. - The operation enum is mostly self-explanatory, but it would help (for the sake of clarity and completeness if nothing else) to document exactly what each of the enums are. E.g., I think this came up in our hangout - SEND_CONTROL_MESSAGE is unclear and I don't remember what was said about it. Edit: After going through the thread it seems the conclusion was to categorize operations. E.g., WRITE could apply to multiple requests. Again, this is unclear, so if it would be great if you could update the KIP to clarify what you intend. Will add to document. SEND_CONTROL_MESSAGE Probably a very bad name but these are intra borker API calls like controller notifying other brokers to update metadata or heartbeats. Any better naming suggestions? - When you update the KIP to categorize the requests it would also help to have a column for what the resource is for each. Will add to the KIP. - FWIW I prefer a 1-1 mapping between requests and operations. I think categorizing requests into these can be confusing because: - The resource being protected for different requests will be different. We are mostly thinking about topics (read/write) but there are requests for which topic is not the right resource. E.g., for topic creation, the resource as you suggested would be something global/common such as “cluster”. For OffsetCommit/FetchRequest, the resource may be the consumer group, or maybe a tuple of consumer group, topic. So this can be confusing - i.e., different resources and request types in the same category. It may be simpler and clearer to just have a 1-1 mapping between the operation enum and requests. I only see 2 resource categories right now cluster and topic. I don’t really care one way or another so we can probably make a quick decision in tomorrow’s meeting to either to 1-1 mapping or have categorization? - Some requests that are intuitively READ have WRITE side-effects.
Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
give it a try now ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Apr 20, 2015 at 9:22 PM, Honghai Chen honghai.c...@microsoft.com wrote: Username: waldenchen Email:waldenc...@163.com Thanks, Honghai -Original Message- From: Joe Stein [mailto:joe.st...@stealth.ly] Sent: Tuesday, April 21, 2015 9:19 AM To: dev@kafka.apache.org Subject: Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals What is your confluence username? ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Apr 20, 2015 at 9:15 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi dear dev, Need create on KIP with title “Add one configuration log.preallocate” for https://issues.apache.org/jira/browse/KAFKA-1646 But can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pr oposals The below picture is what I see. Can you see it? Is there any where can get the permission or setting? Thanks, Honghai
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hi Parth, OK, I understand your approach much better, now. I don’t really have a strong opinion; I guess I just shared Gwen’s concern (in the hangout) that, with this approach, if a site implements their own authorizer using their own ACL implementation, it will work, but they’ll have this vestigial infrastructure for creating maintaining ACLs that could cause confusion. One final thought: whatever the abstraction you’re planning for to represent an ACL in the TopicConfig, could you make that pluggable? On 4/15/15, 5:15 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Kafka currently stores logConfig overrides specified during topic creation in zookeeper, its just an instance of java.util.Properties converted to json. I am proposing in addition to that we store acls and owner as well as part of same Properties map. There is some infrastructure around reading this config, converting it back to Properties map and most importantly propagating any changes efficiently which we will be able to leverage. As this infrastructure is common to the cluster the reading (not interpreting) of config happens outside of any authorization code. If the TopicConfigCache just kept the json representation and left it to authorizer to parse it, the authorizer will have to either parse the json for each request(not acceptable) or it will have to keep one more layer of parsed ACL instance cache. Assuming authorizer will keep an additional caching layer we will now have to implement some way to invalidate the cache which means the TopicConfigCache will have to be an observable which the Authorizer observes and invalidates its cache entries when topicConfigCache gets updated. Seemed like unnecessary complexity with not lot to gain so I went with TopicConfigCache interpreting the json and caching a higher level modeled object. In summary, the interpretation is done for both optimization and simplicity. If you think it is important to allow custom ACL format support we can add one more pluggable config(acl.parser) and interface(AclParser) or it could just be another method in Authorizer. One thing to note the current ACL json is versioned so it is easy to make changes to it however it won’t be possible to support custom ACL formats with the current design. Thanks Parth On 4/15/15, 4:29 PM, Michael Herstine mherst...@linkedin.com.INVALID wrote: Hi Parth, I’m a little confused: why would Kafka need to interpret the JSON? IIRC KIP-11 even says that the TopicConfigData will just store the JSON. I’m not really making a design recommendation here, just trying to understand what you’re proposing. On 4/15/15, 11:20 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Michael, There is code in kafka codebase that reads and interprets the topic config JSON which has acls, owner and logconfig properties. There are 3 use cases that we are supporting with current proposal: * You use out of box simpleAcl authorizer which is tied to the acl stored in topic config and the format is locked down. * You have a custom authorizer and a custom ACL store. Ranger/Argus falls under this as they have their own acl store and ui that users use to configure acls on the cluster and cluster resources like topic. It is upto the custom authorizer to leverage the kafka acl configs or completely ignore them as they have set a user expectation that only acls configured via their ui/system will be effective. * You have a custom authorizer but no custom Acl store. You are completely tied to Acl structure that we have provided in out of box implementation. Thanks Parth On 4/15/15, 10:31 AM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, One question that occurred to me at the end of today’s hangout: how tied are we to a particular ACL representation under your proposal? I know that TopicConfigCache will just contain JSON— if a particular site decides they want to represent their ACLs differently, and swap out the authorizer implementation, will that work? I guess what I’m asking is whether there’s any code in the Kafka codebase that will interpret that JSON, or does that logic live exclusively in the authorizer? On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: I also feel, having just IP would be more appropriate. Host lookup will unnecessary slow things down and would be insecure as you pointed out. With IP, it will be also able to setup policies (in future if needed) with ranges or netmasks and it would be more scalable. Bosco On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Sorry to chime in so late, but I’ve got a minor question on the KIP. Several methods take a parameter named “host” of type String. Is that intended to be a hostname, or an IP address? If the former, I’m curious as to how that’s found (in my experience, when accepting an incoming
Review Request 33383: Patch for KAFKA-1595
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33383/ --- Review request for kafka. Bugs: KAFKA-1595 https://issues.apache.org/jira/browse/KAFKA-1595 Repository: kafka Description --- Use the same `scalatest` version for all Scala versions and remove unused code. Introduce `testJsonParse` Simple test that shows existing behaviour. KAFKA-1595; Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount A combination of spray-json's AST combined with jawn's parser are used as the replacement. Note that both libraries have no dependencies and are relatively simple. We use `jawn` for its performance, but it could be dropped by changing one line in `Json.parseFull`. An attempt has been made to maintain the existing behaviour regarding when exceptions are thrown. There are a number of cases where `DeserializationException` will be thrown instead of `ClassCastException`, however. It is expected that users would not try to catch `ClassCastException`. Minor clean-ups in `Json.encode` Diffs - README.md 946ec62cc71df93c905c5f35caf5cdb9c78e5c10 build.gradle 4775ee46c480eab7b8250e61ba1705d00f72a6aa core/src/main/scala/kafka/admin/AdminUtils.scala eee80f9c2c12da8e4879e96785f3b75a8ff7d1cd core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b3802ac221d570e7610458e50518b4499e7ed core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 3b3cd67d890e05c00d2a36a577f940347a0d387a core/src/main/scala/kafka/cluster/Broker.scala 79e16c167f67cfdef8a90212bc1c7607f989d102 core/src/main/scala/kafka/consumer/TopicCount.scala 6994c8e89055b0bb300da6346c058c8fbbea2c29 core/src/main/scala/kafka/controller/KafkaController.scala 3a09377611b48198c4c3cd1a118fc12eda0543d4 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d2bac85e16a247b1326f63619711fb0bbbd2e82a core/src/main/scala/kafka/utils/Json.scala d1102844748f2e88f79932281fe95583a57d2d16 core/src/main/scala/kafka/utils/ReplicationUtils.scala 60687332b4c9bee4d4c0851314cfb4b02d5d3489 core/src/main/scala/kafka/utils/ZkUtils.scala 5685a1eddb218baee617161f269cd1aee67bab9f core/src/test/scala/unit/kafka/utils/JsonTest.scala 93550e8f24071f88eb1ea5b41373efee27e4b8b7 Diff: https://reviews.apache.org/r/33383/diff/ Testing --- Thanks, Ismael Juma
Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
What is your confluence username? ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Apr 20, 2015 at 9:15 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi dear dev, Need create on KIP with title “Add one configuration log.preallocate” for https://issues.apache.org/jira/browse/KAFKA-1646 But can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals The below picture is what I see. Can you see it? Is there any where can get the permission or setting? Thanks, Honghai
[jira] [Updated] (KAFKA-2091) Expose a Partitioner interface in the new producer
[ https://issues.apache.org/jira/browse/KAFKA-2091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-2091: - Status: In Progress (was: Patch Available) Expose a Partitioner interface in the new producer -- Key: KAFKA-2091 URL: https://issues.apache.org/jira/browse/KAFKA-2091 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Attachments: KAFKA-2091.patch In the new producer you can pass in a key or hard code the partition as part of ProducerRecord. Internally we are using a class {code} class Partitioner { public int partition(String topic, byte[] key, Integer partition, Cluster cluster) {...} } {code} This class uses the specified partition if there is one; uses a hash of the key if there isn't a partition but there is a key; and simply chooses a partition round robin if there is neither a partition nor a key. However there are several partitioning strategies that could be useful that we don't support out of the box. An example would be having each producer periodically choose a random partition. This tends to be the most efficient since all data goes to one server and uses the fewest TCP connections, however it only produces good load balancing if there are many producers. Of course a user can do this now by just setting the partition manually, but that is a bit inconvenient if you need to do that across a bunch of apps since each will need to remember to set the partition every time. The idea would be to expose a configuration to set the partitioner implementation like {code} partitioner.class=org.apache.kafka.producer.DefaultPartitioner {code} This would default to the existing partitioner implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.
[ https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1660: - Status: In Progress (was: Patch Available) Ability to call close() with a timeout on the Java Kafka Producer. --- Key: KAFKA-1660 URL: https://issues.apache.org/jira/browse/KAFKA-1660 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2.0 Reporter: Andrew Stein Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1660.patch, KAFKA-1660.patch, KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch I would like the ability to call {{close}} with a timeout on the Java Client's KafkaProducer. h6. Workaround Currently, it is possible to ensure that {{close}} will return quickly by first doing a {{future.get(timeout)}} on the last future produced on each partition, but this means that the user has to define the partitions up front at the time of {{send}} and track the returned {{future}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 32650: Patch for KAFKA-2000
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32650/#review80857 --- core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/32650/#comment130981 I think there is an issue in relying on the metadata cache mainly due to start-up. E.g., when we start up the broker (and offset manager) the metadata cache will actually be empty so this would delete _all_ the offsets. Unfortunately even after start-up there is no _guarantee_ that you have the most current information in the cache (say, if the controller failed to send an UpdateMetadataRequest to the broker by the time the compactor task runs) - Joel Koshy On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32650/ --- (Updated March 30, 2015, 9:47 p.m.) Review request for kafka. Bugs: KAFKA-2000 https://issues.apache.org/jira/browse/KAFKA-2000 Repository: kafka Description --- KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted. Diffs - core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 Diff: https://reviews.apache.org/r/32650/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-2000) Delete consumer offsets from kafka once the topic is deleted
[ https://issues.apache.org/jira/browse/KAFKA-2000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-2000: -- Status: In Progress (was: Patch Available) Delete consumer offsets from kafka once the topic is deleted Key: KAFKA-2000 URL: https://issues.apache.org/jira/browse/KAFKA-2000 Project: Kafka Issue Type: Bug Reporter: Sriharsha Chintalapani Assignee: Sriharsha Chintalapani Labels: newbie++ Attachments: KAFKA-2000.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.
[ https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1660: Status: Patch Available (was: In Progress) Ability to call close() with a timeout on the Java Kafka Producer. --- Key: KAFKA-1660 URL: https://issues.apache.org/jira/browse/KAFKA-1660 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2.0 Reporter: Andrew Stein Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1660.patch, KAFKA-1660.patch, KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch, KAFKA-1660_2015-04-20_17:38:22.patch I would like the ability to call {{close}} with a timeout on the Java Client's KafkaProducer. h6. Workaround Currently, it is possible to ensure that {{close}} will return quickly by first doing a {{future.get(timeout)}} on the last future produced on each partition, but this means that the user has to define the partitions up front at the time of {{send}} and track the returned {{future}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.
[ https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504039#comment-14504039 ] Jiangjie Qin commented on KAFKA-1660: - Updated reviewboard https://reviews.apache.org/r/31850/diff/ against branch origin/trunk Ability to call close() with a timeout on the Java Kafka Producer. --- Key: KAFKA-1660 URL: https://issues.apache.org/jira/browse/KAFKA-1660 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2.0 Reporter: Andrew Stein Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1660.patch, KAFKA-1660.patch, KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch, KAFKA-1660_2015-04-20_17:38:22.patch I would like the ability to call {{close}} with a timeout on the Java Client's KafkaProducer. h6. Workaround Currently, it is possible to ensure that {{close}} will return quickly by first doing a {{future.get(timeout)}} on the last future produced on each partition, but this means that the user has to define the partitions up front at the time of {{send}} and track the returned {{future}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
According to the pseudo code, if you have a rule deny user1, then it essentially denies all users? Gwen, I see “deny” being useful. I.e you want to allow everyone except few users. This is probably a common case otherwise without deny you have to add long list of users who should be in allow list. Jun, As per the pseudo code, It acts per User for an operation they are requesting for . If user1 connects to Kafka and sends write request to “test-topic it will be granted if the acls for “test-topic” has allow user1” and denies the request if the acls for the “test-topic” has “deny user1”. lets say if the topic acls contains “deny user1” and allow *” user2 , user3 (all the other users except user1) can write to topic but only user1 be denied. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Rule1: User1, allow Rule2: User2, deny In this user3 won’t have access as there is no allow rule for that user so user3 will be denied. Thanks, Harsha -- Harsha Sent with Airmail On April 20, 2015 at 5:34:43 PM, Gwen Shapira (gshap...@cloudera.com) wrote: I admit that I'm also not sure what we gain by having deny rules. A user is either in the allow list (one way or another) or it will be denied by default. I think this is something we can skip for now and add later if needed? On Mon, Apr 20, 2015 at 5:24 PM, Jun Rao j...@confluent.io wrote: According to the pseudo code, if you have a rule deny user1, then it essentially denies all users? Thanks, Jun On Mon, Apr 20, 2015 at 5:16 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Here is a pseudo code that explains my current approach: acls = authorizer.getAcl(resource) if(acls == null || acls.isEmpty) { allow all requests for backward compatibility. (any topics that were created prior to security support will not have acls) This is debatable , generally we should block everyone which is what I would prefer but that means anyone moving to authorizer must go to all of his existing topics and add acl to allow all. If we are fine with imposing this requirement I can start returning deny when no acls are found. } else { //So the user has set some acls explicitly, this means they have knowingly enabled authorizer. Let’t first check if they have set an Acl to deny this user/host/operation combination. if some acl denies this request for this principal/host/operation combination , return deny //this principal/host/operation does not have any explicit deny acl, check if there is some explicit acl that allows the operation if at least one acl allows this request for this principal/host/operation combination , return allow // no acl was found for this principal/host/operation combination to allow this operation, so we will deny the request return deny } Thanks Parth On 4/20/15, 2:21 PM, Jun Rao j...@confluent.io wrote: Hmm, I thought the semantics is that if you only have rule deny user2, it means that everyone except user2 has access? Thanks, Jun On Mon, Apr 20, 2015 at 3:25 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: user3 does not have access and removing the deny rule does not grant him or user2 access. user2 even without the deny rule will not have access. Thanks Parth On 4/20/15, 12:03 PM, Jun Rao j...@confluent.io wrote: Just a followup question. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Thanks, Jun On Mon, Apr 20, 2015 at 1:34 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Joel, Thanks for the review and I plan to update the KIP today with all the updated info. My comments in line below. Thanks Parth On 4/20/15, 10:07 AM, Joel Koshy jjkosh...@gmail.commailto: jjkosh...@gmail.com wrote: Hi Parth, Nice work on this KIP. I did another read through and had a few more comments (with edits after I went through the thread). Many of these comments were brought up by others as well, so it appears that the KIP would benefit from an update at this point to incorporate comments from the thread and last hangout. - The operation enum is mostly self-explanatory, but it would help (for the sake of clarity and completeness if nothing else) to document exactly what each of the enums are. E.g., I think this came up in our hangout - SEND_CONTROL_MESSAGE is unclear and I don't remember what was said about it. Edit: After going through the thread it seems the conclusion was to categorize
[GitHub] kafka pull request: Kafka 1595 remove deprecated json parser
GitHub user ijuma opened a pull request: https://github.com/apache/kafka/pull/55 Kafka 1595 remove deprecated json parser You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka kafka-1595-remove-deprecated-json-parser Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/55.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #55 commit 31ac16dbd9d657fbb9a8a63153361ec543aabb7b Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-17T23:26:28Z Drop support for Scala 2.9.1 and 2.9.2 Use the same `scalatest` version for all Scala versions and remove unused code. commit ab71458018dc2c5b05a981da1a73aea08a0163cd Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-20T20:53:54Z Introduce `testJsonParse` Simple test that shows existing behaviour. commit bda759a01cf244d0786830a7e9540b2e84a620c2 Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-21T00:15:02Z KAFKA-1595; Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount A combination of spray-json's AST combined with jawn's parser are used as the replacement. Note that both libraries have no dependencies and are relatively simple. We use `jawn` for its performance, but it could be dropped by changing one line in `Json.parseFull`. An attempt has been made to maintain the existing behaviour regarding when exceptions are thrown. There are a number of cases where `DeserializationException` will be thrown instead of `ClassCastException`, however. It is expected that users would not try to catch `ClassCastException`. commit d1adee404e6d9a7f7b07da4d7ab2ed8e2087ccdf Author: Ismael Juma ism...@juma.me.uk Date: 2015-04-21T00:23:39Z Minor clean-ups in `Json.encode` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hmm, I thought the semantics is that if you only have rule deny user2, it means that everyone except user2 has access? Thanks, Jun On Mon, Apr 20, 2015 at 3:25 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: user3 does not have access and removing the deny rule does not grant him or user2 access. user2 even without the deny rule will not have access. Thanks Parth On 4/20/15, 12:03 PM, Jun Rao j...@confluent.io wrote: Just a followup question. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Thanks, Jun On Mon, Apr 20, 2015 at 1:34 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Joel, Thanks for the review and I plan to update the KIP today with all the updated info. My comments in line below. Thanks Parth On 4/20/15, 10:07 AM, Joel Koshy jjkosh...@gmail.commailto: jjkosh...@gmail.com wrote: Hi Parth, Nice work on this KIP. I did another read through and had a few more comments (with edits after I went through the thread). Many of these comments were brought up by others as well, so it appears that the KIP would benefit from an update at this point to incorporate comments from the thread and last hangout. - The operation enum is mostly self-explanatory, but it would help (for the sake of clarity and completeness if nothing else) to document exactly what each of the enums are. E.g., I think this came up in our hangout - SEND_CONTROL_MESSAGE is unclear and I don't remember what was said about it. Edit: After going through the thread it seems the conclusion was to categorize operations. E.g., WRITE could apply to multiple requests. Again, this is unclear, so if it would be great if you could update the KIP to clarify what you intend. Will add to document. SEND_CONTROL_MESSAGE Probably a very bad name but these are intra borker API calls like controller notifying other brokers to update metadata or heartbeats. Any better naming suggestions? - When you update the KIP to categorize the requests it would also help to have a column for what the resource is for each. Will add to the KIP. - FWIW I prefer a 1-1 mapping between requests and operations. I think categorizing requests into these can be confusing because: - The resource being protected for different requests will be different. We are mostly thinking about topics (read/write) but there are requests for which topic is not the right resource. E.g., for topic creation, the resource as you suggested would be something global/common such as “cluster”. For OffsetCommit/FetchRequest, the resource may be the consumer group, or maybe a tuple of consumer group, topic. So this can be confusing - i.e., different resources and request types in the same category. It may be simpler and clearer to just have a 1-1 mapping between the operation enum and requests. I only see 2 resource categories right now cluster and topic. I don’t really care one way or another so we can probably make a quick decision in tomorrow’s meeting to either to 1-1 mapping or have categorization? - Some requests that are intuitively READ have WRITE side-effects. E.g., (currently) TopicMetadataRequest with auto-create, although that will eventually go away. ConsumerMetadataRequest still auto-creates the offsets topic. Likewise, ADMIN-type requests may be interpreted as having side-effects (depending on who you ask). Yes and what I am doing right now is checking authorization for all possible actions i.e. for auto-create it checks if the config has it enabled and if yes, check for read + create authorization. Its not very meaningful right now as there is no CREATE authorization but I think this is implementation detail, we need to ensure we call authorize with all possible operations from KafkaAPI. - quoteWhen an ACL is missing - fail open/quote. What does missing mean? i.e., no explicit ACL for a principal? I'm confused by this especially in relation to the precedence of DENY over ALLOW. So per the description: - If no ACLs exist for topic A then ALLOW all operations on it by anyone. - If I now add an ACL for a certain principal P to ALLOW (say) WRITE to the topic then either: - This has the effect of DENYing WRITE to all other principals - Or, this ACL serves no purpose - If the effect is to DENY WRITE to all other principals, what about READ. Do all principals (including P) have READ permissions to topic A? - In other words, it seems for a specific ACL to be meaningful then fail close is necessary for an absent ACL. - editAfter through the thread: it appears that the DENY override only applies to the given principal. i.e., in the above case it appears that
[jira] [Updated] (KAFKA-1933) Fine-grained locking in log append
[ https://issues.apache.org/jira/browse/KAFKA-1933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1933: - Status: In Progress (was: Patch Available) These results are very good. I'm scared of the code. The change is quite small, but scary. You mentioned there another version of the patch coming which is simpler? We made some improvements to compression performance recently which should help this problem in the absence of this patch, is it still needed? Fine-grained locking in log append -- Key: KAFKA-1933 URL: https://issues.apache.org/jira/browse/KAFKA-1933 Project: Kafka Issue Type: Improvement Components: log Reporter: Maxim Ivanov Assignee: Maxim Ivanov Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-1933.patch, KAFKA-1933_2015-02-09_12:27:06.patch This patch adds finer locking when appending to log. It breaks global append lock into 2 sequential and 1 parallel phase. Basic idea is to allow every thread to reserve offsets in non overlapping ranges, then do compression in parallel and then commit write to log in the same order offsets where reserved. Results on a server with 16 cores CPU available: gzip: 564.0 sec - 45.2 sec (12.4x speedup) LZ4: 56.7 sec - 9.9 sec (5.7x speedup) Kafka was configured to run 16 IO threads, data was pushed using 32 netcat instances pushing in parallel batches of 200 msg 6.2 kb each (3264 MB in total) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
Username: waldenchen Email:waldenc...@163.com Thanks, Honghai -Original Message- From: Joe Stein [mailto:joe.st...@stealth.ly] Sent: Tuesday, April 21, 2015 9:19 AM To: dev@kafka.apache.org Subject: Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals What is your confluence username? ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Apr 20, 2015 at 9:15 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi dear dev, Need create on KIP with title “Add one configuration log.preallocate” for https://issues.apache.org/jira/browse/KAFKA-1646 But can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pr oposals The below picture is what I see. Can you see it? Is there any where can get the permission or setting? Thanks, Honghai
[jira] [Updated] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1646: - Status: In Progress (was: Patch Available) Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- Review request for kafka. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description --- Changes are: - protocol changes to the fetch reuqest and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases For now the patch will publish a zero delay and return a response Added more tests Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 9c4518e840904c371a5816bfc52be1933cba0b96 clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala b4004aa3a1456d337199aa1245fb0ae61f6add46 core/src/main/scala/kafka/server/OffsetManager.scala 420e2c3535e722c503f13d093849469983f6f08d core/src/main/scala/kafka/server/ReplicaManager.scala 8ddd325015de4245fd2cf500d8b0e8c1fd2bc7e8 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 566b5381665bb027a06e17c4fc27414943f85220 core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 9186c90de5a983a73b042fcb42987bfabae14fcf core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503976#comment-14503976 ] Aditya A Auradkar commented on KAFKA-2136: -- Created reviewboard https://reviews.apache.org/r/33378/diff/ against branch trunk Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2136: - Attachment: KAFKA-2136.patch Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Wouldn't the everyone except few users be easier to manage through groups / roles? I think thats how its done in pretty much any system I can think of. Do you know any system with deny option? I don't feel strongly about this (i.e. its an extra feature, people can use it or not. maybe it will be useful for IP blacklists). Just thinking that if it causes confusion, perhaps this is something we can leave out of scope for now and add later. On Mon, Apr 20, 2015 at 5:57 PM, Sriharsha Chintalapani ka...@harsha.io wrote: According to the pseudo code, if you have a rule deny user1, then it essentially denies all users? Gwen, I see “deny” being useful. I.e you want to allow everyone except few users. This is probably a common case otherwise without deny you have to add long list of users who should be in allow list. Jun, As per the pseudo code, It acts per User for an operation they are requesting for . If user1 connects to Kafka and sends write request to “test-topic it will be granted if the acls for “test-topic” has allow user1” and denies the request if the acls for the “test-topic” has “deny user1”. lets say if the topic acls contains “deny user1” and allow *” user2 , user3 (all the other users except user1) can write to topic but only user1 be denied. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Rule1: User1, allow Rule2: User2, deny In this user3 won’t have access as there is no allow rule for that user so user3 will be denied. Thanks, Harsha -- Harsha Sent with Airmail On April 20, 2015 at 5:34:43 PM, Gwen Shapira (gshap...@cloudera.com) wrote: I admit that I'm also not sure what we gain by having deny rules. A user is either in the allow list (one way or another) or it will be denied by default. I think this is something we can skip for now and add later if needed? On Mon, Apr 20, 2015 at 5:24 PM, Jun Rao j...@confluent.io wrote: According to the pseudo code, if you have a rule deny user1, then it essentially denies all users? Thanks, Jun On Mon, Apr 20, 2015 at 5:16 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Here is a pseudo code that explains my current approach: acls = authorizer.getAcl(resource) if(acls == null || acls.isEmpty) { allow all requests for backward compatibility. (any topics that were created prior to security support will not have acls) This is debatable , generally we should block everyone which is what I would prefer but that means anyone moving to authorizer must go to all of his existing topics and add acl to allow all. If we are fine with imposing this requirement I can start returning deny when no acls are found. } else { //So the user has set some acls explicitly, this means they have knowingly enabled authorizer. Let’t first check if they have set an Acl to deny this user/host/operation combination. if some acl denies this request for this principal/host/operation combination , return deny //this principal/host/operation does not have any explicit deny acl, check if there is some explicit acl that allows the operation if at least one acl allows this request for this principal/host/operation combination , return allow // no acl was found for this principal/host/operation combination to allow this operation, so we will deny the request return deny } Thanks Parth On 4/20/15, 2:21 PM, Jun Rao j...@confluent.io wrote: Hmm, I thought the semantics is that if you only have rule deny user2, it means that everyone except user2 has access? Thanks, Jun On Mon, Apr 20, 2015 at 3:25 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: user3 does not have access and removing the deny rule does not grant him or user2 access. user2 even without the deny rule will not have access. Thanks Parth On 4/20/15, 12:03 PM, Jun Rao j...@confluent.io wrote: Just a followup question. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Thanks, Jun On Mon, Apr 20, 2015 at 1:34 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Joel, Thanks for the review and I plan to update the KIP today with all the updated info. My comments in line below. Thanks Parth On 4/20/15, 10:07 AM, Joel Koshy jjkosh...@gmail.commailto: jjkosh...@gmail.com wrote: Hi Parth, Nice work on this KIP. I did another read through and had a few more comments (with edits after I went through the thread). Many of these comments were brought up by others as well, so it appears that the KIP would benefit from an update at this point to incorporate comments from the thread and last hangout. - The operation enum is mostly self-explanatory, but it would help (for
[jira] [Commented] (KAFKA-1054) Eliminate Compilation Warnings for 0.8 Final Release
[ https://issues.apache.org/jira/browse/KAFKA-1054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504099#comment-14504099 ] Ismael Juma commented on KAFKA-1054: Note that there are also some deprecation warnings when kafka is compiled with Scala 2.11 (mostly usage of `Pair`). [~blakesmith], are you interested in updating your patch to include these as well? If not, I am happy to do it. Eliminate Compilation Warnings for 0.8 Final Release Key: KAFKA-1054 URL: https://issues.apache.org/jira/browse/KAFKA-1054 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1054.patch, KAFKA-1054_Mar_10_2015.patch Currently we have a total number of 38 warnings for source code compilation of 0.8. 1) 3 from Unchecked type pattern 2) 6 from Unchecked conversion 3) 29 from Deprecated Hadoop API functions It's better we finish these before the final release of 0.8 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Ignore my last; I was catching up on this list hadn’t seen Gwen’s subsequent proposal. On 4/15/15, 5:15 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Kafka currently stores logConfig overrides specified during topic creation in zookeeper, its just an instance of java.util.Properties converted to json. I am proposing in addition to that we store acls and owner as well as part of same Properties map. There is some infrastructure around reading this config, converting it back to Properties map and most importantly propagating any changes efficiently which we will be able to leverage. As this infrastructure is common to the cluster the reading (not interpreting) of config happens outside of any authorization code. If the TopicConfigCache just kept the json representation and left it to authorizer to parse it, the authorizer will have to either parse the json for each request(not acceptable) or it will have to keep one more layer of parsed ACL instance cache. Assuming authorizer will keep an additional caching layer we will now have to implement some way to invalidate the cache which means the TopicConfigCache will have to be an observable which the Authorizer observes and invalidates its cache entries when topicConfigCache gets updated. Seemed like unnecessary complexity with not lot to gain so I went with TopicConfigCache interpreting the json and caching a higher level modeled object. In summary, the interpretation is done for both optimization and simplicity. If you think it is important to allow custom ACL format support we can add one more pluggable config(acl.parser) and interface(AclParser) or it could just be another method in Authorizer. One thing to note the current ACL json is versioned so it is easy to make changes to it however it won’t be possible to support custom ACL formats with the current design. Thanks Parth On 4/15/15, 4:29 PM, Michael Herstine mherst...@linkedin.com.INVALID wrote: Hi Parth, I’m a little confused: why would Kafka need to interpret the JSON? IIRC KIP-11 even says that the TopicConfigData will just store the JSON. I’m not really making a design recommendation here, just trying to understand what you’re proposing. On 4/15/15, 11:20 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Michael, There is code in kafka codebase that reads and interprets the topic config JSON which has acls, owner and logconfig properties. There are 3 use cases that we are supporting with current proposal: * You use out of box simpleAcl authorizer which is tied to the acl stored in topic config and the format is locked down. * You have a custom authorizer and a custom ACL store. Ranger/Argus falls under this as they have their own acl store and ui that users use to configure acls on the cluster and cluster resources like topic. It is upto the custom authorizer to leverage the kafka acl configs or completely ignore them as they have set a user expectation that only acls configured via their ui/system will be effective. * You have a custom authorizer but no custom Acl store. You are completely tied to Acl structure that we have provided in out of box implementation. Thanks Parth On 4/15/15, 10:31 AM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, One question that occurred to me at the end of today’s hangout: how tied are we to a particular ACL representation under your proposal? I know that TopicConfigCache will just contain JSON— if a particular site decides they want to represent their ACLs differently, and swap out the authorizer implementation, will that work? I guess what I’m asking is whether there’s any code in the Kafka codebase that will interpret that JSON, or does that logic live exclusively in the authorizer? On 4/14/15, 10:56 PM, Don Bosco Durai bo...@apache.orgmailto:bo...@apache.org wrote: I also feel, having just IP would be more appropriate. Host lookup will unnecessary slow things down and would be insecure as you pointed out. With IP, it will be also able to setup policies (in future if needed) with ranges or netmasks and it would be more scalable. Bosco On 4/14/15, 1:40 PM, Michael Herstine mherst...@linkedin.com.INVALIDmailto:mherst...@linkedin.com.INVALID wrote: Hi Parth, Sorry to chime in so late, but I’ve got a minor question on the KIP. Several methods take a parameter named “host” of type String. Is that intended to be a hostname, or an IP address? If the former, I’m curious as to how that’s found (in my experience, when accepting an incoming socket connection, you only know the IP address, and there isn’t a way to map that to a hostname without a round trip to a DNS server, which is insecure anyway). On 3/25/15, 1:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: Hi all, I have modified the KIP to reflect the recent change request from the reviewers. I have been working on the code and I have the server side code
Re: Review Request 31850: Patch for KAFKA-1660
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/ --- (Updated April 21, 2015, 12:38 a.m.) Review request for kafka. Bugs: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Repository: kafka Description (updated) --- A minor fix. Incorporated Guozhang's comments. Modify according to the latest conclusion. Patch for the finally passed KIP-15git status Addressed Joel and Guozhang's comments. rebased on trunk Rebase on trunk Addressed Joel's comments. Addressed Joel's comments Addressed Jay's comments Changed javadoc per Jay's suggestion Change java doc as Jay suggested. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b70e1a3d406338d4b9ddd6188d2820e87545a9b6 clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db Diff: https://reviews.apache.org/r/31850/diff/ Testing --- Unit tests passed. Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-1595: --- Status: Patch Available (was: Open) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount - Key: KAFKA-1595 URL: https://issues.apache.org/jira/browse/KAFKA-1595 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Jagbir Assignee: Ismael Juma Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1595.patch The following issue is created as a follow up suggested by Jun Rao in a kafka news group message with the Subject Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount SUMMARY: An issue was detected in a typical cluster of 3 kafka instances backed by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, java version 1.7.0_65). On consumer end, when consumers get recycled, there is a troubling JSON parsing recursion which takes a busy lock and blocks consumers thread pool. In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes a global lock (0xd3a7e1d0) during the rebalance, and fires an expensive JSON parsing, while keeping the other consumers from shutting down, see, e.g, at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) The deep recursive JSON parsing should be deprecated in favor of a better JSON parser, see, e.g, http://engineering.ooyala.com/blog/comparing-scala-json-libraries? DETAILS: The first dump is for a recursive blocking thread holding the lock for 0xd3a7e1d0 and the subsequent dump is for a waiting thread. (Please grep for 0xd3a7e1d0 to see the locked object.) Â -8- Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] java.lang.Thread.State: RUNNABLE at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at
[jira] [Updated] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-1595: --- Attachment: KAFKA-1595.patch Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount - Key: KAFKA-1595 URL: https://issues.apache.org/jira/browse/KAFKA-1595 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Jagbir Assignee: Ismael Juma Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1595.patch The following issue is created as a follow up suggested by Jun Rao in a kafka news group message with the Subject Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount SUMMARY: An issue was detected in a typical cluster of 3 kafka instances backed by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, java version 1.7.0_65). On consumer end, when consumers get recycled, there is a troubling JSON parsing recursion which takes a busy lock and blocks consumers thread pool. In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes a global lock (0xd3a7e1d0) during the rebalance, and fires an expensive JSON parsing, while keeping the other consumers from shutting down, see, e.g, at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) The deep recursive JSON parsing should be deprecated in favor of a better JSON parser, see, e.g, http://engineering.ooyala.com/blog/comparing-scala-json-libraries? DETAILS: The first dump is for a recursive blocking thread holding the lock for 0xd3a7e1d0 and the subsequent dump is for a waiting thread. (Please grep for 0xd3a7e1d0 to see the locked object.) Â -8- Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] java.lang.Thread.State: RUNNABLE at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at
[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504055#comment-14504055 ] Ismael Juma commented on KAFKA-1595: Created reviewboard https://reviews.apache.org/r/33383/diff/ against branch upstream/trunk Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount - Key: KAFKA-1595 URL: https://issues.apache.org/jira/browse/KAFKA-1595 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Jagbir Assignee: Ismael Juma Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1595.patch The following issue is created as a follow up suggested by Jun Rao in a kafka news group message with the Subject Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount SUMMARY: An issue was detected in a typical cluster of 3 kafka instances backed by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, java version 1.7.0_65). On consumer end, when consumers get recycled, there is a troubling JSON parsing recursion which takes a busy lock and blocks consumers thread pool. In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes a global lock (0xd3a7e1d0) during the rebalance, and fires an expensive JSON parsing, while keeping the other consumers from shutting down, see, e.g, at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) The deep recursive JSON parsing should be deprecated in favor of a better JSON parser, see, e.g, http://engineering.ooyala.com/blog/comparing-scala-json-libraries? DETAILS: The first dump is for a recursive blocking thread holding the lock for 0xd3a7e1d0 and the subsequent dump is for a waiting thread. (Please grep for 0xd3a7e1d0 to see the locked object.) Â -8- Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] java.lang.Thread.State: RUNNABLE at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504384#comment-14504384 ] Steven Zhen Wu commented on KAFKA-2121: --- Updated reviewboard https://reviews.apache.org/r/33242/diff/ against branch apache/trunk prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources.
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 21, 2015, 5:48 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- move MockMetricsReporter into clients/src/test/java/org/apache/kafka/test Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121_2015-04-20_22:48:31.patch prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but
Re: Review Request 33242: Patch for KAFKA-2121
On April 21, 2015, 3:08 a.m., Guozhang Wang wrote: LGTM, besides one minor suggestion: could you move MockMetricsReporter to clients/src/test/java/org/apache/kafka/test? done. moved MockMetricsReporter to clients/src/test/java/org/apache/kafka/test - Steven --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80892 --- On April 21, 2015, 5:48 a.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 21, 2015, 5:48 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- move MockMetricsReporter into clients/src/test/java/org/apache/kafka/test Diffs - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504171#comment-14504171 ] Honghai Chen commented on KAFKA-1646: - Added KIP here and send out the email, will wait for 2 days to see if there is any issue. https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system http://mail-archives.apache.org/mod_mbox/kafka-dev/201504.mbox/%3Cb1f1db2cb51645f6b459c252abae6641%40HKNPR30MB018.064d.mgd.msft.net%3E Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80892 --- LGTM, besides one minor suggestion: could you move MockMetricsReporter to clients/src/test/java/org/apache/kafka/test? - Guozhang Wang On April 20, 2015, 4:57 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 4:57 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
Joe, Is there a way to make everyone to be able to access template? There will be more and more folks asking that I think. Guozhang On Mon, Apr 20, 2015 at 6:25 PM, Honghai Chen honghai.c...@microsoft.com wrote: It works, many thanks. Thanks, Honghai From: Joe Stein [mailto:joe.st...@stealth.ly] Sent: Tuesday, April 21, 2015 9:24 AM To: Honghai Chen Cc: dev@kafka.apache.org Subject: Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals give it a try now ~ Joe Stein - - - - - - - - - - - - - - - - - [ https://docs.google.com/uc?export=downloadid=0B3rS2kftp470b19EQXp0Q2JheVErevid=0B3rS2kftp470aFhGdzZqMnUwT3M0MTlsZU8zZjZobGFuNFdrPQ ] http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Apr 20, 2015 at 9:22 PM, Honghai Chen honghai.c...@microsoft.com mailto:honghai.c...@microsoft.com wrote: Username: waldenchen Email:waldenc...@163.commailto:email%3awaldenc...@163.com Thanks, Honghai -Original Message- From: Joe Stein [mailto:joe.st...@stealth.lymailto:joe.st...@stealth.ly] Sent: Tuesday, April 21, 2015 9:19 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: Can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals What is your confluence username? ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Mon, Apr 20, 2015 at 9:15 PM, Honghai Chen honghai.c...@microsoft.com mailto:honghai.c...@microsoft.com wrote: Hi dear dev, Need create on KIP with title “Add one configuration log.preallocate” for https://issues.apache.org/jira/browse/KAFKA-1646 But can't see KIP Template after click Create on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pr oposals The below picture is what I see. Can you see it? Is there any where can get the permission or setting? Thanks, Honghai -- -- Guozhang
[DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai
Re: [DISCUSS] KIP-11- Authorization design for kafka security
I tend to agree with Parth's point here. Most ACL systems I run into have deny and allow. In general, you have a default policy of allow, then you follow your rules stopping at the first line that matches. If you would like a default deny policy, you have a bunch of allow rules and your last rule is deny all. This says everyone listed is allowed. Everyone else is denied. If you instead want a default allow, you have a list of deny rules and the last rule is allow all. This says everyone listed is denied. Everyone else is allowed. I think leaving out a full rule set would be a mistake, as it makes the assumption that you know what all the use cases are. I think all it will really mean is that we will see a KIP before long to fix it. -Todd On Apr 20, 2015, at 7:13 PM, Gwen Shapira gshap...@cloudera.com wrote: Thanks for clarifying the logic. I'm +0 on the deny thing. IMO, its not really needed, but if you think its important, I don't object to having it in. Gwen On Mon, Apr 20, 2015 at 7:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Why can't see KIP Template after click Create?
Hello everyone, Want to create KIP for https://issues.apache.org/jira/browse/KAFKA-1646; , but can't see KIP Template after click Create in page https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals . What should I do? Thanks.
Re: [DISCUSS] KIP-11- Authorization design for kafka security
The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1 (non-binding) Sure, makes sense :) Just make sure the doc for the config includes something like If you are using Kafka on Windows, you probably want to set it to true, so people will know how to use it without looking for the JIRA. On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Thanks for clarifying the logic. I'm +0 on the deny thing. IMO, its not really needed, but if you think its important, I don't object to having it in. Gwen On Mon, Apr 20, 2015 at 7:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix
Re: [DISCUSS] KIP-11- Authorization design for kafka security
user3 does not have access and removing the deny rule does not grant him or user2 access. user2 even without the deny rule will not have access. Thanks Parth On 4/20/15, 12:03 PM, Jun Rao j...@confluent.io wrote: Just a followup question. Suppose there are two rules. Rule1 allows user1 and rule2 denies user2. Does user3 have access? If not, does removing rule1 enable user3 access? Thanks, Jun On Mon, Apr 20, 2015 at 1:34 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hi Joel, Thanks for the review and I plan to update the KIP today with all the updated info. My comments in line below. Thanks Parth On 4/20/15, 10:07 AM, Joel Koshy jjkosh...@gmail.commailto: jjkosh...@gmail.com wrote: Hi Parth, Nice work on this KIP. I did another read through and had a few more comments (with edits after I went through the thread). Many of these comments were brought up by others as well, so it appears that the KIP would benefit from an update at this point to incorporate comments from the thread and last hangout. - The operation enum is mostly self-explanatory, but it would help (for the sake of clarity and completeness if nothing else) to document exactly what each of the enums are. E.g., I think this came up in our hangout - SEND_CONTROL_MESSAGE is unclear and I don't remember what was said about it. Edit: After going through the thread it seems the conclusion was to categorize operations. E.g., WRITE could apply to multiple requests. Again, this is unclear, so if it would be great if you could update the KIP to clarify what you intend. Will add to document. SEND_CONTROL_MESSAGE Probably a very bad name but these are intra borker API calls like controller notifying other brokers to update metadata or heartbeats. Any better naming suggestions? - When you update the KIP to categorize the requests it would also help to have a column for what the resource is for each. Will add to the KIP. - FWIW I prefer a 1-1 mapping between requests and operations. I think categorizing requests into these can be confusing because: - The resource being protected for different requests will be different. We are mostly thinking about topics (read/write) but there are requests for which topic is not the right resource. E.g., for topic creation, the resource as you suggested would be something global/common such as “cluster”. For OffsetCommit/FetchRequest, the resource may be the consumer group, or maybe a tuple of consumer group, topic. So this can be confusing - i.e., different resources and request types in the same category. It may be simpler and clearer to just have a 1-1 mapping between the operation enum and requests. I only see 2 resource categories right now cluster and topic. I don’t really care one way or another so we can probably make a quick decision in tomorrow’s meeting to either to 1-1 mapping or have categorization? - Some requests that are intuitively READ have WRITE side-effects. E.g., (currently) TopicMetadataRequest with auto-create, although that will eventually go away. ConsumerMetadataRequest still auto-creates the offsets topic. Likewise, ADMIN-type requests may be interpreted as having side-effects (depending on who you ask). Yes and what I am doing right now is checking authorization for all possible actions i.e. for auto-create it checks if the config has it enabled and if yes, check for read + create authorization. Its not very meaningful right now as there is no CREATE authorization but I think this is implementation detail, we need to ensure we call authorize with all possible operations from KafkaAPI. - quoteWhen an ACL is missing - fail open/quote. What does missing mean? i.e., no explicit ACL for a principal? I'm confused by this especially in relation to the precedence of DENY over ALLOW. So per the description: - If no ACLs exist for topic A then ALLOW all operations on it by anyone. - If I now add an ACL for a certain principal P to ALLOW (say) WRITE to the topic then either: - This has the effect of DENYing WRITE to all other principals - Or, this ACL serves no purpose - If the effect is to DENY WRITE to all other principals, what about READ. Do all principals (including P) have READ permissions to topic A? - In other words, it seems for a specific ACL to be meaningful then fail close is necessary for an absent ACL. - editAfter through the thread: it appears that the DENY override only applies to the given principal. i.e., in the above case it appears that the other principals will in fact be granted access. Then this makes the ACL that was added pointless right? The rule I was going with is - If there is no ACL I.e. This might be a topic that was created in non secure mode or was created before we supported ACLs. We assume you do not want authorization and let all
Re: [jira] [Commented] (KAFKA-2122) Remove controller.message.queue.size Config
unsubscribe On Apr 20, 2015, at 8:09 AM, Sriharsha Chintalapani (JIRA) j...@apache.org wrote: [ https://issues.apache.org/jira/browse/KAFKA-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502761#comment-14502761 ] Sriharsha Chintalapani commented on KAFKA-2122: --- [~wangbo23] This is for trunk. But this patch just removes the config if you want similar behavior from 0.8.2.1 just remove the config from server.properties. Remove controller.message.queue.size Config --- Key: KAFKA-2122 URL: https://issues.apache.org/jira/browse/KAFKA-2122 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Sriharsha Chintalapani Attachments: KAFKA-2122.patch, KAFKA-2122_2015-04-19_12:44:41.patch A deadlock can happen during a delete topic if controller.message.queue.size is overridden to a custom value. Details are here: https://issues.apache.org/jira/browse/KAFKA-2046?focusedCommentId=14380776page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14380776 Given that KAFKA-1993 is enabling delete topic by default, it would be unsafe to simultaneously allow a configurable controller.message.queue.size -- This message was sent by Atlassian JIRA (v6.3.4#6332) smime.p7s Description: S/MIME cryptographic signature
[jira] [Commented] (KAFKA-2122) Remove controller.message.queue.size Config
[ https://issues.apache.org/jira/browse/KAFKA-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502761#comment-14502761 ] Sriharsha Chintalapani commented on KAFKA-2122: --- [~wangbo23] This is for trunk. But this patch just removes the config if you want similar behavior from 0.8.2.1 just remove the config from server.properties. Remove controller.message.queue.size Config --- Key: KAFKA-2122 URL: https://issues.apache.org/jira/browse/KAFKA-2122 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Sriharsha Chintalapani Attachments: KAFKA-2122.patch, KAFKA-2122_2015-04-19_12:44:41.patch A deadlock can happen during a delete topic if controller.message.queue.size is overridden to a custom value. Details are here: https://issues.apache.org/jira/browse/KAFKA-2046?focusedCommentId=14380776page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14380776 Given that KAFKA-1993 is enabling delete topic by default, it would be unsafe to simultaneously allow a configurable controller.message.queue.size -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2134) Producer blocked on metric publish
[ https://issues.apache.org/jira/browse/KAFKA-2134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vamsi Subhash Achanta updated KAFKA-2134: - Description: Hi, We have a REST api to publish to a topic. Yesterday, we started noticing that the producer is not able to produce messages at a good rate and the CLOSE_WAITs of our producer REST app are very high. All the producer REST requests are hence timing out. When we took the thread dump and analysed it, we noticed that the threads are getting blocked on JmxReporter metricChange. Here is the attached stack trace. dw-70 - POST /queues/queue_1/messages #70 prio=5 os_prio=0 tid=0x7f043c8bd000 nid=0x54cf waiting for monitor entry [0x7f04363c7000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76) - waiting to lock 0x0005c1823860 (a java.lang.Object) at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:182) - locked 0x0007a5e526c8 (a org.apache.kafka.common.metrics.Metrics) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:165) - locked 0x0007a5e526e8 (a org.apache.kafka.common.metrics.Sensor) When I looked at the code of metricChange method, it uses a synchronised block on an object resource and it seems that it is held by another. was: Hi, We have a REST api to publish to a topic. Yesterday, we started noticing that the producer is not able to produce messages at a good rate and the CLOSE_WAITs of our producer REST app are very high. All the producer REST requests are hence timing out. When we took the thread dump and analysed it, we noticed that the threads are getting blocked on JmxReporter metricChange. Here is the attached stack trace. dw-70 - POST /queues/ekl_bigfoot_marvin_production_1/messages #70 prio=5 os_prio=0 tid=0x7f043c8bd000 nid=0x54cf waiting for monitor entry [0x7f04363c7000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76) - waiting to lock 0x0005c1823860 (a java.lang.Object) at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:182) - locked 0x0007a5e526c8 (a org.apache.kafka.common.metrics.Metrics) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:165) - locked 0x0007a5e526e8 (a org.apache.kafka.common.metrics.Sensor) When I looked at the code of metricChange method, it uses a synchronised block on an object resource and it seems that it is held by another. Producer blocked on metric publish -- Key: KAFKA-2134 URL: https://issues.apache.org/jira/browse/KAFKA-2134 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.1 Environment: debian7, java8 Reporter: Vamsi Subhash Achanta Assignee: Jun Rao Hi, We have a REST api to publish to a topic. Yesterday, we started noticing that the producer is not able to produce messages at a good rate and the CLOSE_WAITs of our producer REST app are very high. All the producer REST requests are hence timing out. When we took the thread dump and analysed it, we noticed that the threads are getting blocked on JmxReporter metricChange. Here is the attached stack trace. dw-70 - POST /queues/queue_1/messages #70 prio=5 os_prio=0 tid=0x7f043c8bd000 nid=0x54cf waiting for monitor entry [0x7f04363c7000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76) - waiting to lock 0x0005c1823860 (a java.lang.Object) at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:182) - locked 0x0007a5e526c8 (a org.apache.kafka.common.metrics.Metrics) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:165) - locked 0x0007a5e526e8 (a org.apache.kafka.common.metrics.Sensor) When I looked at the code of metricChange method, it uses a synchronised block on an object resource and it seems that it is held by another. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2122) Remove controller.message.queue.size Config
[ https://issues.apache.org/jira/browse/KAFKA-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502583#comment-14502583 ] 王博 commented on KAFKA-2122: --- Which version this patch is based on? I found KAFKA-2122_2015-04-19_12: 44: 41.patch is not based on 0.8.2.1? Remove controller.message.queue.size Config --- Key: KAFKA-2122 URL: https://issues.apache.org/jira/browse/KAFKA-2122 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Sriharsha Chintalapani Attachments: KAFKA-2122.patch, KAFKA-2122_2015-04-19_12:44:41.patch A deadlock can happen during a delete topic if controller.message.queue.size is overridden to a custom value. Details are here: https://issues.apache.org/jira/browse/KAFKA-2046?focusedCommentId=14380776page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14380776 Given that KAFKA-1993 is enabling delete topic by default, it would be unsafe to simultaneously allow a configurable controller.message.queue.size -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2122) Remove controller.message.queue.size Config
[ https://issues.apache.org/jira/browse/KAFKA-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502583#comment-14502583 ] Bo Wang edited comment on KAFKA-2122 at 4/20/15 10:27 AM: -- Which version this patch is based on? was (Author: wangbo23): Which version this patch is based on? I found KAFKA-2122_2015-04-19_12: 44: 41.patch is not based on 0.8.2.1? Remove controller.message.queue.size Config --- Key: KAFKA-2122 URL: https://issues.apache.org/jira/browse/KAFKA-2122 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Sriharsha Chintalapani Attachments: KAFKA-2122.patch, KAFKA-2122_2015-04-19_12:44:41.patch A deadlock can happen during a delete topic if controller.message.queue.size is overridden to a custom value. Details are here: https://issues.apache.org/jira/browse/KAFKA-2046?focusedCommentId=14380776page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14380776 Given that KAFKA-1993 is enabling delete topic by default, it would be unsafe to simultaneously allow a configurable controller.message.queue.size -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Jenkins build is back to normal : Kafka-trunk #470
See https://builds.apache.org/job/Kafka-trunk/470/changes
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi, I updated the KIP-12 with more details. Please take a look https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 Thanks, Harsha On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote: Thanks Joe. It will be part of KafkaServer and will run on its own thread. Since each kafka server will run with a keytab we should make sure they are all getting renewed. On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote: Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
[jira] [Commented] (KAFKA-2035) Add a topic config cache.
[ https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503297#comment-14503297 ] Parth Brahmbhatt commented on KAFKA-2035: - [~jkreps] yes that is the difference I was referring to. Anyways I think we don't need to discuss that any more as ACLs are now being pulled out of topic config (See KIP-11 discussion thread). I think the the topic owner is useful even outside of security context. I think it deserves its own config value and I can either add it as a log config (So no change required, anyone who needs access to owner will now either need a reference to log instance to get benefit of caching or will have to read from zookeeper) or we can continue on the path of adding a TopicConfigCache and make owner a top level entity of that class. Let me know what your preference is. I did not see a jira to add owner to topics and I can create one if you think we should discuss that separately. Add a topic config cache. - Key: KAFKA-2035 URL: https://issues.apache.org/jira/browse/KAFKA-2035 Project: Kafka Issue Type: Task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2035_2015-03-31_10:52:12.patch Currently the topic config is all about Log configuration so we have a TopicConfigManager which takes in a Log instance and keeps updating that instance's config instance as and when the topic config is updated. The topic config update notifications are sent using zk watchers by Controller. I propose to introduce a TopicConfigCache which will be updated by TopicConfigManager on any config changes. The log instance and any other component (like the authorizer mentioned in KAFKA-1688) will have a reference to TopicConfigCache using which they will access the topic configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: KafkaPreCommit #82
See https://builds.apache.org/job/KafkaPreCommit/82/changes Changes: [neha.narkhede] DelayedOperationTest.testRequestExpiry transient failure; reviewed by Neha Narkhede -- [...truncated 659 lines...] kafka.api.ProducerFailureHandlingTest testInvalidPartition PASSED kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckZero PASSED kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne PASSED kafka.api.ProducerFailureHandlingTest testNonExistentTopic PASSED kafka.api.ProducerFailureHandlingTest testWrongBrokerList PASSED kafka.api.ProducerFailureHandlingTest testNoResponse PASSED kafka.api.ProducerFailureHandlingTest testSendAfterClosed PASSED kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED kafka.api.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown PASSED kafka.api.ConsumerTest testSimpleConsumption PASSED kafka.api.ConsumerTest testAutoOffsetReset PASSED kafka.api.ConsumerTest testSeek PASSED kafka.api.ConsumerTest testGroupConsumption PASSED kafka.api.ConsumerTest testPositionAndCommit PASSED kafka.api.ConsumerTest testPartitionsFor PASSED kafka.api.ConsumerTest testPartitionReassignmentCallback PASSED kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures PASSED kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures PASSED kafka.api.RequestResponseSerializationTest testSerializationAndDeserialization PASSED kafka.api.ProducerBounceTest testBrokerFailure PASSED kafka.api.ApiUtilsTest testShortStringNonASCII PASSED kafka.api.ApiUtilsTest testShortStringASCII PASSED kafka.api.test.ProducerCompressionTest testCompression[0] PASSED kafka.api.test.ProducerCompressionTest testCompression[1] PASSED kafka.api.test.ProducerCompressionTest testCompression[2] PASSED kafka.api.test.ProducerCompressionTest testCompression[3] PASSED kafka.cluster.BrokerEndPointTest testSerDe PASSED kafka.cluster.BrokerEndPointTest testHashAndEquals PASSED kafka.cluster.BrokerEndPointTest testFromJSON PASSED kafka.cluster.BrokerEndPointTest testFromOldJSON PASSED kafka.cluster.BrokerEndPointTest testBrokerEndpointFromURI PASSED kafka.cluster.BrokerEndPointTest testEndpointFromURI PASSED kafka.javaapi.consumer.ZookeeperConsumerConnectorTest testBasic PASSED kafka.javaapi.message.ByteBufferMessageSetTest testWrittenEqualsRead PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistent PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytes PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistentWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytesWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEquals PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEqualsWithCompression PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionDisabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testCleanLeaderElectionDisabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionInvalidTopicOverride PASSED kafka.integration.FetcherTest testFetcher PASSED kafka.integration.RollingBounceTest testRollingBounce PASSED kafka.integration.PrimitiveApiTest testFetchRequestCanProperlySerialize PASSED kafka.integration.PrimitiveApiTest testEmptyFetchRequest PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetch PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetchWithCompression PASSED kafka.integration.PrimitiveApiTest testProduceAndMultiFetch PASSED kafka.integration.PrimitiveApiTest testMultiProduce PASSED kafka.integration.PrimitiveApiTest testConsumerEmptyTopic PASSED kafka.integration.PrimitiveApiTest testPipelinedProduceRequests PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooHigh PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooLow PASSED kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooHigh PASSED kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooLow PASSED kafka.integration.TopicMetadataTest testAutoCreateTopic PASSED kafka.integration.TopicMetadataTest testTopicMetadataRequest PASSED kafka.integration.TopicMetadataTest testBasicTopicMetadata PASSED kafka.integration.TopicMetadataTest testGetAllTopicMetadata PASSED kafka.metrics.KafkaTimerTest testKafkaTimer PASSED kafka.utils.UtilsTest testSwallow PASSED kafka.utils.UtilsTest testCircularIterator PASSED kafka.utils.UtilsTest testReadBytes PASSED kafka.utils.UtilsTest testAbs PASSED kafka.utils.UtilsTest testReplaceSuffix PASSED
Re: Review Request 31850: Patch for KAFKA-1660
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review80753 --- clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment130866 I cleaned up this javadoc a little bit to try to simplify things. With docs it is always hard to get the level of focus right so that it has the essential information but doesn't overload the user and obscure the primary thing. 1. Changed messages to records (that is the terminology in the new clients). 2. I significantly shortened the section on calling from within a callback. I think only 0.1% of people would ever consider this. 3. I cannot think of a reason why the user would care if the I/O thread is synchronously shutdown or not, and we don't make any promises one way or the other in the main close method, so let's just leave that bit out. What do you think of this: /** * This method waits up to codetimeout/code for the producer to complete the sending of all incomplete requests. * p * If the producer is unable to complete all requests before the timeout expires, this method will fail * any unsent and unacknowledged records immediately. * p * If invoked from within a {@link Callback} this method will not block and will be equivalent to codeclose(0, TimeUnit.MILLISECONDS)/code. This is done since no further sending will happen while blocking the I/O thread of the producer. * * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be *non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete. * @param timeUnit The time unit for the codetimeout/code * @throws InterruptException If the thread is interrupted while blocked * @throws IllegalArgumentException If the codetimeout/code is negative. */ clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java https://reviews.apache.org/r/31850/#comment130868 Read locks are very expensive. I am pretty worried about this. If we want to do this we need to do a pretty detailed examination of the perf impact. - Jay Kreps On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/ --- (Updated April 16, 2015, 6:35 p.m.) Review request for kafka. Bugs: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Repository: kafka Description --- A minor fix. Incorporated Guozhang's comments. Modify according to the latest conclusion. Patch for the finally passed KIP-15git status Addressed Joel and Guozhang's comments. rebased on trunk Rebase on trunk Addressed Joel's comments. Addressed Joel's comments Addressed Jay's comments Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db Diff: https://reviews.apache.org/r/31850/diff/ Testing --- Unit tests passed. Thanks, Jiangjie Qin
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503279#comment-14503279 ] Steven Zhen Wu commented on KAFKA-2121: --- [~ewencp] I made two more changes in latest commit 1) moved close of NetworkClient back in Sender for the reason you outlined above 2) close deserializers in KafkaConsumer similar to what KafkaProducer does with serializers prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503278#comment-14503278 ] Steven Zhen Wu commented on KAFKA-2121: --- [~ewencp] I made two more changes in latest commit 1) moved close of NetworkClient back in Sender for the reason you outlined above 2) close deserializers in KafkaConsumer similar to what KafkaProducer does with serializers prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review80761 --- clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java https://reviews.apache.org/r/33065/#comment130886 So I think you are saying this will move to a String, right? I think that makes sense... This looks great to me (yay deleting code!). One potential gotcha when converting the socket server on the server to use Selector is the differing assumptions on the read interest and write interest state transitions. The clients, being clients, are interested in writing when they have outstanding stuff to send, and are always interested in reading. However the server only allows a single request per connection to be processed at any given time, and this is a particularly important to guarantee ordering since there are multiple processing threads. I think this can be implemented in the socket server by just calling mute() after a completed request is read, but I just wanted to point out this difference as it is a little subtle. - Jay Kreps On April 10, 2015, 4:58 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated April 10, 2015, 4:58 a.m.) Review request for kafka. Bugs: KAFKA-1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Diffs - clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a09e470166a1c33639cf0cab26a3bce98ec clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java PRE-CREATION core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b3802ac221d570e7610458e50518b4499e7ed core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78adb760eaeb029466b54f335a29caf3b0f core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 5be393ab8272a49437b5057ed098ccdc42f352e5 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 2fad585f126699ba8d26c901a41bcf6b8198bf62 core/src/main/scala/kafka/api/OffsetCommitRequest.scala cf8e6acc426aef6eb19d862bf6a108a5fc37907a core/src/main/scala/kafka/api/OffsetFetchRequest.scala 67811a752a470bf9bdbc8c5419e8d6e20a006169 core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc7518ad76f9548772522751afb4d046b78 core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987c990fe561c01dac2909f5ed21a506e038 core/src/main/scala/kafka/api/TopicMetadataRequest.scala 7dca09ce637a40e125de05703dc42e8b611971ac core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 69f0397b187a737b4ddf50e390d3c2f418ce6b5d core/src/main/scala/kafka/client/ClientUtils.scala b66424b230463df6641a848b99bb73312ea66e33 core/src/main/scala/kafka/consumer/SimpleConsumer.scala cbef84ac76e62768981f74e71d451f2bda995275 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e250b94626c62b3b7f33ee4180ca3ab69a8821d6 core/src/main/scala/kafka/controller/ControllerChannelManager.scala 97acdb23f6e95554c3e0357aa112eddfc875efbc core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be14d494ae8c87f4443b52db69d273c20316 core/src/main/scala/kafka/network/BlockingChannel.scala 6e2a38eee8e568f9032f95c75fa5899e9715b433 core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala c0d77261353478232ab85591c182be57845b3f13 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b95b73b71252932867c60192b3d5b91efe99e122 core/src/main/scala/kafka/network/ByteBufferSend.scala af30042a4c713418ecd83b6c6c17dfcbdc101c62 core/src/main/scala/kafka/network/Handler.scala a0300336b8cb5a2d5be68b7b48bdbe045bf99324 core/src/main/scala/kafka/network/RequestChannel.scala 1d9c57b0b5a0ad31e4f3d7562f0266af83cc9024 core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION core/src/main/scala/kafka/network/SocketServer.scala
Re: Review Request 32781: Patch for KAFKA-2087
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32781/#review80733 --- Ship it! Ship It! - Neha Narkhede On April 2, 2015, 5:29 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32781/ --- (Updated April 2, 2015, 5:29 p.m.) Review request for kafka. Bugs: KAFKA-2087 https://issues.apache.org/jira/browse/KAFKA-2087 Repository: kafka Description --- Fixing KAFKA-2087 Diffs - core/src/main/scala/kafka/server/TopicConfigManager.scala 47295d4 Diff: https://reviews.apache.org/r/32781/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503261#comment-14503261 ] Jay Kreps commented on KAFKA-1646: -- Cool, this looks good to me. This introduces a new config so we should do a quick KIP discussion on the mailing list. I think this can be pretty minimal since it really is a fairly small change in the end. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2135) New Kafka Producer Client: Send requests wait indefinitely if no broker is available.
David Hay created KAFKA-2135: Summary: New Kafka Producer Client: Send requests wait indefinitely if no broker is available. Key: KAFKA-2135 URL: https://issues.apache.org/jira/browse/KAFKA-2135 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: David Hay Assignee: Jun Rao Priority: Critical I'm seeing issues when sending a message with the new producer client API. The future returned from Producer.send() will block indefinitely if the cluster is unreachable for some reason. Here are the steps: # Start up a single node kafka cluster locally. # Start up application and create a KafkaProducer with the following config: {noformat} KafkaProducerWrapper values: compression.type = snappy metric.reporters = [] metadata.max.age.ms = 30 metadata.fetch.timeout.ms = 6 acks = all batch.size = 16384 reconnect.backoff.ms = 10 bootstrap.servers = [localhost:9092] receive.buffer.bytes = 32768 retry.backoff.ms = 100 buffer.memory = 33554432 timeout.ms = 3 key.serializer = class com.mycompany.kafka.serializer.ToStringEncoder retries = 3 max.request.size = 1048576 block.on.buffer.full = true value.serializer = class com.mycompany.kafka.serializer.JsonEncoder metrics.sample.window.ms = 3 send.buffer.bytes = 131072 max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 linger.ms = 0 client.id = site-json {noformat} # Send some messages...they are successfully sent # Shut down the kafka broker # Send another message. At this point, calling {{get()}} on the returned Future will block indefinitely until the broker is restarted. It appears that there is some logic in {{org.apache.kafka.clients.producer.internal.Sender}} that is supposed to mark the Future as done in response to a disconnect event (towards the end of the run(long) method). However, the while loop earlier in this method seems to remove the broker from consideration entirely, so the final loop over ClientResponse objects is never executed. It seems like timeout.ms configuration should be honored in this case, or perhaps introduce another timeout, indicating that we should give up waiting for the cluster to return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80770 --- Ship it! New version with KafkaConsumer changes looks good, passes all tests for me. Great work! - Ewen Cheslack-Postava On April 20, 2015, 4:57 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 4:57 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121_2015-04-20_09:51:51.patch prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 4:51 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503172#comment-14503172 ] Steven Zhen Wu commented on KAFKA-2121: --- Updated reviewboard https://reviews.apache.org/r/33242/diff/ against branch apache/trunk prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding
[jira] [Commented] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)
[ https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503179#comment-14503179 ] Jordan Zimmerman commented on KAFKA-873: [~nehanarkhede] - the major upside with Curator (note: I'm the main author) is in the Framework, not necessarily the recipes. Writing correct Zookeeper applications is notoriously difficult. Curator manages the internal ZooKeeper connection for you which is not trivial. Curator also incorporates workarounds for many well know edge cases. Regardless of whether you use Curator or not writing your own wrapper over the Zookeeper Java client would be a major mistake. Consider replacing zkclient with curator (with zkclient-bridge) --- Key: KAFKA-873 URL: https://issues.apache.org/jira/browse/KAFKA-873 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.0 Reporter: Scott Clasen Assignee: Grant Henke If zkclient was replaced with curator and curator-x-zkclient-bridge it would be initially a drop-in replacement https://github.com/Netflix/curator/wiki/ZKClient-Bridge With the addition of a few more props to ZkConfig, and a bit of code this would open up the possibility of using ACLs in zookeeper (which arent supported directly by zkclient), as well as integrating with netflix exhibitor for those of us using that. Looks like KafkaZookeeperClient needs some love anyhow... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33027: Fix timing issue in DelayedOperationTest
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33027/#review80742 --- Ship it! Ship It! - Neha Narkhede On April 9, 2015, 6:04 p.m., Rajini Sivaram wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33027/ --- (Updated April 9, 2015, 6:04 p.m.) Review request for kafka. Bugs: KAFKA-2057 https://issues.apache.org/jira/browse/KAFKA-2057 Repository: kafka Description --- Patch for KAFKA-2057: Fix timing issue in DelayedOperationTest Diffs - core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 9186c90de5a983a73b042fcb42987bfabae14fcf Diff: https://reviews.apache.org/r/33027/diff/ Testing --- Thanks, Rajini Sivaram
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 4:57 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Build failed in Jenkins: KafkaPreCommit #81
See https://builds.apache.org/job/KafkaPreCommit/81/changes Changes: [neha.narkhede] TopicConfigManager javadoc references incorrect paths; reviewed by Neha Narkhede -- [...truncated 1557 lines...] kafka.api.ProducerFailureHandlingTest testInvalidPartition PASSED kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckZero PASSED kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne PASSED kafka.api.ProducerFailureHandlingTest testNonExistentTopic PASSED kafka.api.ProducerFailureHandlingTest testWrongBrokerList PASSED kafka.api.ProducerFailureHandlingTest testNoResponse PASSED kafka.api.ProducerFailureHandlingTest testSendAfterClosed PASSED kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED kafka.api.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown PASSED kafka.api.ConsumerTest testSimpleConsumption PASSED kafka.api.ConsumerTest testAutoOffsetReset PASSED kafka.api.ConsumerTest testSeek PASSED kafka.api.ConsumerTest testGroupConsumption PASSED kafka.api.ConsumerTest testPositionAndCommit PASSED kafka.api.ConsumerTest testPartitionsFor PASSED kafka.api.ConsumerTest testPartitionReassignmentCallback PASSED kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures PASSED kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures PASSED kafka.api.RequestResponseSerializationTest testSerializationAndDeserialization PASSED kafka.api.ProducerBounceTest testBrokerFailure PASSED kafka.api.ApiUtilsTest testShortStringNonASCII PASSED kafka.api.ApiUtilsTest testShortStringASCII PASSED kafka.api.test.ProducerCompressionTest testCompression[0] PASSED kafka.api.test.ProducerCompressionTest testCompression[1] PASSED kafka.api.test.ProducerCompressionTest testCompression[2] PASSED kafka.api.test.ProducerCompressionTest testCompression[3] PASSED kafka.cluster.BrokerEndPointTest testSerDe PASSED kafka.cluster.BrokerEndPointTest testHashAndEquals PASSED kafka.cluster.BrokerEndPointTest testFromJSON PASSED kafka.cluster.BrokerEndPointTest testFromOldJSON PASSED kafka.cluster.BrokerEndPointTest testBrokerEndpointFromURI PASSED kafka.cluster.BrokerEndPointTest testEndpointFromURI PASSED kafka.javaapi.consumer.ZookeeperConsumerConnectorTest testBasic PASSED kafka.javaapi.message.ByteBufferMessageSetTest testWrittenEqualsRead PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistent PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytes PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistentWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytesWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEquals PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEqualsWithCompression PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionDisabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testCleanLeaderElectionDisabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionInvalidTopicOverride PASSED kafka.integration.FetcherTest testFetcher PASSED kafka.integration.RollingBounceTest testRollingBounce PASSED kafka.integration.PrimitiveApiTest testFetchRequestCanProperlySerialize PASSED kafka.integration.PrimitiveApiTest testEmptyFetchRequest PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetch PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetchWithCompression PASSED kafka.integration.PrimitiveApiTest testProduceAndMultiFetch PASSED kafka.integration.PrimitiveApiTest testMultiProduce PASSED kafka.integration.PrimitiveApiTest testConsumerEmptyTopic PASSED kafka.integration.PrimitiveApiTest testPipelinedProduceRequests PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooHigh PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooLow PASSED kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooHigh PASSED kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooLow PASSED kafka.integration.TopicMetadataTest testAutoCreateTopic PASSED kafka.integration.TopicMetadataTest testTopicMetadataRequest PASSED kafka.integration.TopicMetadataTest testBasicTopicMetadata PASSED kafka.integration.TopicMetadataTest testGetAllTopicMetadata PASSED kafka.metrics.KafkaTimerTest testKafkaTimer PASSED kafka.utils.UtilsTest testSwallow PASSED kafka.utils.UtilsTest testCircularIterator PASSED kafka.utils.UtilsTest testReadBytes PASSED kafka.utils.UtilsTest testAbs PASSED kafka.utils.UtilsTest testReplaceSuffix PASSED
[jira] [Commented] (KAFKA-2035) Add a topic config cache.
[ https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503222#comment-14503222 ] Jay Kreps commented on KAFKA-2035: -- Hey [~parth.brahmbhatt] I'm not picky about the sequencing as long as it gets done and we don't get left in a half-way state. If you're on-board to take it all the way I think that could be great. But from that point of view, is there a reason we need to set the ACLs at the topic level rather than just having them at the log level? Topic configs are all set at the topic level but stored in memory at the log level today, so I'm not sure if I get why ACLs are different...? An alternative would be to leave it as is and then refactor when we have a plan for how we want it to end up. I think the difference you are pointing out is that logically ACLs don't belong mixed in with the Log data model, right? Add a topic config cache. - Key: KAFKA-2035 URL: https://issues.apache.org/jira/browse/KAFKA-2035 Project: Kafka Issue Type: Task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2035_2015-03-31_10:52:12.patch Currently the topic config is all about Log configuration so we have a TopicConfigManager which takes in a Log instance and keeps updating that instance's config instance as and when the topic config is updated. The topic config update notifications are sent using zk watchers by Controller. I propose to introduce a TopicConfigCache which will be updated by TopicConfigManager on any config changes. The log instance and any other component (like the authorizer mentioned in KAFKA-1688) will have a reference to TopicConfigCache using which they will access the topic configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 32869: Patch for KAFKA-2091
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32869/#review80738 --- clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java https://reviews.apache.org/r/32869/#comment130858 It is reasonable to need to partition on the key prior to serialization. I wonder if people would also want access to the value? This is a bit odd, but could potentially be useful... What about making this public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valBytes, Integer partition, Cluster cluster); Should DefaultPartitioner move into the public package since now users can directly configure it? Also if this is a public interface it would be good to add a producer test that configures a custom partitioner and checks that it takes effect so that this keeps working. - Jay Kreps On April 6, 2015, 12:13 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32869/ --- (Updated April 6, 2015, 12:13 a.m.) Review request for kafka. Bugs: KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 Repository: kafka Description --- KAFKA-2091. Expose a Partitioner interface in the new producer. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java fa9daaef66ff7961e1c46cd0cd8fed18a53bccd8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java 93e799105fb6cc5c49a129c0db099a3a973b2ab3 clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java 5dadd0e3554577ad6be28a18ff5ab08f8b31050f Diff: https://reviews.apache.org/r/32869/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)
[ https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503169#comment-14503169 ] Neha Narkhede commented on KAFKA-873: - I'm not so sure yet that moving to Curator is a good idea, at least not until we do a full analysis of the current zkclient problems and how Curator fixes those. Agreed that zkclient is not very well supported, but anytime we have found a serious bug, they have accepted the patch and released it. But my understanding is that the upside of Curator is that it includes a set of recipes for common operations that people use ZooKeeper for. Let me elaborate on what I think is the problem with zkclient. It wraps the zookeeper client APIs with the purpose of making it easy to perform common ZooKeeper operations. However, this limits the user to the behavior dictated by the wrapper, irrespective of how the underlying zookeeper library behaves. An example of this is the indefinite retries during a ZooKeeper disconnect. You may not want to retry indefinitely and might want to quit the operation after a timeout. Then there are various bugs introduced due to the zkclient wrapper design. For example, we have seen weird bugs due to the fact that zkclient saves the list of triggered watches in an internal queue and invokes the configured user callback in a background thread. The problems we've seen with zkclient will not be fixed with another wrapper (Curator). It looks like it will be better for us to just write a simple zookeeper client utility inside Kafka itself. If you look at zkclient, it is a pretty simple wrapper over the zookeeper client APIs. So this may not be a huge undertaking and will be a better long-term solution Consider replacing zkclient with curator (with zkclient-bridge) --- Key: KAFKA-873 URL: https://issues.apache.org/jira/browse/KAFKA-873 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.0 Reporter: Scott Clasen Assignee: Grant Henke If zkclient was replaced with curator and curator-x-zkclient-bridge it would be initially a drop-in replacement https://github.com/Netflix/curator/wiki/ZKClient-Bridge With the addition of a few more props to ZkConfig, and a bit of code this would open up the possibility of using ACLs in zookeeper (which arent supported directly by zkclient), as well as integrating with netflix exhibitor for those of us using that. Looks like KafkaZookeeperClient needs some love anyhow... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 4:52 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121_2015-04-20_09:52:46.patch prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503176#comment-14503176 ] Steven Zhen Wu commented on KAFKA-2121: --- Updated reviewboard https://reviews.apache.org/r/33242/diff/ against branch apache/trunk prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection
[jira] [Commented] (KAFKA-2091) Expose a Partitioner interface in the new producer
[ https://issues.apache.org/jira/browse/KAFKA-2091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503174#comment-14503174 ] Jay Kreps commented on KAFKA-2091: -- This is waiting on the KIP which would be the best place to have the discussion on whether we pass in the partition id and unserialized values.. Expose a Partitioner interface in the new producer -- Key: KAFKA-2091 URL: https://issues.apache.org/jira/browse/KAFKA-2091 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Attachments: KAFKA-2091.patch In the new producer you can pass in a key or hard code the partition as part of ProducerRecord. Internally we are using a class {code} class Partitioner { public int partition(String topic, byte[] key, Integer partition, Cluster cluster) {...} } {code} This class uses the specified partition if there is one; uses a hash of the key if there isn't a partition but there is a key; and simply chooses a partition round robin if there is neither a partition nor a key. However there are several partitioning strategies that could be useful that we don't support out of the box. An example would be having each producer periodically choose a random partition. This tends to be the most efficient since all data goes to one server and uses the fewest TCP connections, however it only produces good load balancing if there are many producers. Of course a user can do this now by just setting the partition manually, but that is a bit inconvenient if you need to do that across a bunch of apps since each will need to remember to set the partition every time. The idea would be to expose a configuration to set the partitioner implementation like {code} partitioner.class=org.apache.kafka.producer.DefaultPartitioner {code} This would default to the existing partitioner implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2057) DelayedOperationTest.testRequestExpiry transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-2057: - Resolution: Fixed Reviewer: Neha Narkhede Status: Resolved (was: Patch Available) Thanks for the patch. Pushed to trunk. DelayedOperationTest.testRequestExpiry transient failure Key: KAFKA-2057 URL: https://issues.apache.org/jira/browse/KAFKA-2057 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Rajini Sivaram Labels: newbie Attachments: KAFKA-2057.patch {code} kafka.server.DelayedOperationTest testRequestExpiry FAILED junit.framework.AssertionFailedError: Time for expiration 19 should at least 20 at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.assertTrue(Assert.java:20) at kafka.server.DelayedOperationTest.testRequestExpiry(DelayedOperationTest.scala:68) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121_2015-04-20_09:57:49.patch prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503190#comment-14503190 ] Steven Zhen Wu commented on KAFKA-2121: --- Updated reviewboard https://reviews.apache.org/r/33242/diff/ against branch apache/trunk prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we
[jira] [Commented] (KAFKA-2091) Expose a Partitioner interface in the new producer
[ https://issues.apache.org/jira/browse/KAFKA-2091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503197#comment-14503197 ] Sriharsha Chintalapani commented on KAFKA-2091: --- [~jkreps] Thanks for the review. I'll write up a KIP and start a discussion thread. Expose a Partitioner interface in the new producer -- Key: KAFKA-2091 URL: https://issues.apache.org/jira/browse/KAFKA-2091 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Attachments: KAFKA-2091.patch In the new producer you can pass in a key or hard code the partition as part of ProducerRecord. Internally we are using a class {code} class Partitioner { public int partition(String topic, byte[] key, Integer partition, Cluster cluster) {...} } {code} This class uses the specified partition if there is one; uses a hash of the key if there isn't a partition but there is a key; and simply chooses a partition round robin if there is neither a partition nor a key. However there are several partitioning strategies that could be useful that we don't support out of the box. An example would be having each producer periodically choose a random partition. This tends to be the most efficient since all data goes to one server and uses the fewest TCP connections, however it only produces good load balancing if there are many producers. Of course a user can do this now by just setting the partition manually, but that is a bit inconvenient if you need to do that across a bunch of apps since each will need to remember to set the partition every time. The idea would be to expose a configuration to set the partitioner implementation like {code} partitioner.class=org.apache.kafka.producer.DefaultPartitioner {code} This would default to the existing partitioner implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hi Parth, Nice work on this KIP. I did another read through and had a few more comments (with edits after I went through the thread). Many of these comments were brought up by others as well, so it appears that the KIP would benefit from an update at this point to incorporate comments from the thread and last hangout. - The operation enum is mostly self-explanatory, but it would help (for the sake of clarity and completeness if nothing else) to document exactly what each of the enums are. E.g., I think this came up in our hangout - SEND_CONTROL_MESSAGE is unclear and I don't remember what was said about it. Edit: After going through the thread it seems the conclusion was to categorize operations. E.g., WRITE could apply to multiple requests. Again, this is unclear, so if it would be great if you could update the KIP to clarify what you intend. - When you update the KIP to categorize the requests it would also help to have a column for what the resource is for each. - FWIW I prefer a 1-1 mapping between requests and operations. I think categorizing requests into these can be confusing because: - The resource being protected for different requests will be different. We are mostly thinking about topics (read/write) but there are requests for which topic is not the right resource. E.g., for topic creation, the resource as you suggested would be something global/common such as “cluster”. For OffsetCommit/FetchRequest, the resource may be the consumer group, or maybe a tuple of consumer group, topic. So this can be confusing - i.e., different resources and request types in the same category. It may be simpler and clearer to just have a 1-1 mapping between the operation enum and requests. - Some requests that are intuitively READ have WRITE side-effects. E.g., (currently) TopicMetadataRequest with auto-create, although that will eventually go away. ConsumerMetadataRequest still auto-creates the offsets topic. Likewise, ADMIN-type requests may be interpreted as having side-effects (depending on who you ask). - quoteWhen an ACL is missing - fail open/quote. What does missing mean? i.e., no explicit ACL for a principal? I'm confused by this especially in relation to the precedence of DENY over ALLOW. So per the description: - If no ACLs exist for topic A then ALLOW all operations on it by anyone. - If I now add an ACL for a certain principal P to ALLOW (say) WRITE to the topic then either: - This has the effect of DENYing WRITE to all other principals - Or, this ACL serves no purpose - If the effect is to DENY WRITE to all other principals, what about READ. Do all principals (including P) have READ permissions to topic A? - In other words, it seems for a specific ACL to be meaningful then fail close is necessary for an absent ACL. - editAfter through the thread: it appears that the DENY override only applies to the given principal. i.e., in the above case it appears that the other principals will in fact be granted access. Then this makes the ACL that was added pointless right? - On ZK ACLs: I think ZK will be closed to everyone except Kafka brokers. This is a dependency on KIP-4 though. i.e., eventually all clients should talk to brokers only via RPC. - Topic owner: list vs single entry - both have issues off the bat (although list is more intuitive at least to me), but perhaps you could write up some example workflows to clarify the current proposal. I was thinking that anyone in the owner list should be considered a super-user of the topic and can grant/revoke permissions. They should also be allowed to add other principals as owners. Even with this it is unclear who should be allowed to remove owners. - What is the effect of deleting a topic - should all associated ACLs be deleted as well? - TopicConfigCache to store topic-ACLs. As mentioned above, not all requests will be tied to topics. We may want to have an entirely separate ZK directory for ACLs. We have a similar issue with quotas. This ties in with dynamic config management. We can certainly leverage the dynamic config management part of topic configs but I think we need to have a story for non-topic resources. Thanks, Joel On Thu, Apr 16, 2015 at 12:15:37AM +, Parth Brahmbhatt wrote: Kafka currently stores logConfig overrides specified during topic creation in zookeeper, its just an instance of java.util.Properties converted to json. I am proposing in addition to that we store acls and owner as well as part of same Properties map. There is some infrastructure around reading this config, converting it back to Properties map and most importantly propagating any changes efficiently which we will be able to leverage. As this infrastructure is common to the cluster the reading (not interpreting) of config happens outside of any authorization code. If the TopicConfigCache just kept
[jira] [Updated] (KAFKA-2087) TopicConfigManager javadoc references incorrect paths
[ https://issues.apache.org/jira/browse/KAFKA-2087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-2087: - Reviewer: Neha Narkhede TopicConfigManager javadoc references incorrect paths - Key: KAFKA-2087 URL: https://issues.apache.org/jira/browse/KAFKA-2087 Project: Kafka Issue Type: Bug Reporter: Aditya Auradkar Assignee: Aditya Auradkar Priority: Trivial Attachments: KAFKA-2087.patch The TopicConfigManager docs refer to znodes in /brokers/topics/topic_name/config which is incorrect. Fix javadoc -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2087) TopicConfigManager javadoc references incorrect paths
[ https://issues.apache.org/jira/browse/KAFKA-2087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-2087: - Resolution: Fixed Status: Resolved (was: Patch Available) Pushed to trunk. Thanks! TopicConfigManager javadoc references incorrect paths - Key: KAFKA-2087 URL: https://issues.apache.org/jira/browse/KAFKA-2087 Project: Kafka Issue Type: Bug Reporter: Aditya Auradkar Assignee: Aditya Auradkar Priority: Trivial Attachments: KAFKA-2087.patch The TopicConfigManager docs refer to znodes in /brokers/topics/topic_name/config which is incorrect. Fix javadoc -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated KAFKA-2121: -- Attachment: KAFKA-2121_2015-04-20_09:06:09.patch prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change.
[jira] [Commented] (KAFKA-2055) ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503051#comment-14503051 ] Fangmin Lv commented on KAFKA-2055: --- There is a case that the leader is shutting down just before syncing the last high watermark to the followers. After that, one of the followers will be elected as the new leader, which will return the previous high watermark when calling the consumer.seekToEnd(). This is why the ConsumerBounceTest.testSeekAndCommitWithBrokerFailures failed occasionally. For example, assume broker B0 is the leader of topic partition (topic, 0), broker B1 and B2 are followers. 1. At the moment before B0 is going to shutdown, the (LOE, HW) on B0, B1, B2 are (1000, 1000), (1000, 984), (1000, 984). 2. B0 is shutting down. 3. B1 and B2 issue fetch request with fetch offset 1000. 4. B1 and B2 received error in fetch request. 5. Consumer send seekToEnd request to the leader, but it failed due to the new leader is not ready. 6. One of B1 or B2 is elected as the new leader. 7. When the client retry to fetch the last offset, the new leader will return the HW 984. 8. Assert failure, as the client is expecting to receive 1000 instead of 984. Is this the expected behaviour of Kafka? Any suggestion to fix this except adding sleep time before starting BounceBrokerSchedule in the test case? ConsumerBounceTest.testSeekAndCommitWithBrokerFailures transient failure Key: KAFKA-2055 URL: https://issues.apache.org/jira/browse/KAFKA-2055 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Fangmin Lv Labels: newbie {code} kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:976 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:913 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503086#comment-14503086 ] Steven Zhen Wu commented on KAFKA-2121: --- Updated reviewboard https://reviews.apache.org/r/33242/diff/ against branch apache/trunk prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is
[jira] [Commented] (KAFKA-2118) Cleaner cannot clean after shutdown during replaceSegments
[ https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503128#comment-14503128 ] Jay Kreps commented on KAFKA-2118: -- I don't think I'll get to it, go ahead without me. Just be careful, that code is a bit tricky. Cleaner cannot clean after shutdown during replaceSegments -- Key: KAFKA-2118 URL: https://issues.apache.org/jira/browse/KAFKA-2118 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Gian Merlino Assignee: Rajini Sivaram Attachments: KAFKA-2118.patch, KAFKA-2118_2015-04-15_09:43:51.patch, KAFKA-2118_2015-04-19_19:02:38.patch If a broker shuts down after the cleaner calls replaceSegments with more than one segment, the partition can be left in an uncleanable state. We saw this on a few brokers after doing a rolling update. The sequence of things we saw is: 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 into a new segment 0. 2) Cleaner logged Swapping in cleaned segment 0 for segment(s) 0,1094621529,1094831997 in log xxx-15. and called replaceSegments. 3) 0.cleaned was renamed to 0.swap. 4) Broker shut down before deleting segments 1094621529 and 1094831997. 5) Broker started up and logged Found log file /mnt/persistent/kafka-logs/xxx-15/.log.swap from interrupted swap operation, repairing. 6) Cleaner thread died with the exception kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index.cleaned. I think what's happening in #6 is that when the broker started back up and repaired the log, segment 0 ended up with a bunch of messages that were also in segment 1094621529 and 1094831997 (because the new segment 0 was created from cleaning all 3). But segments 1094621529 and 1094831997 were still on disk, so offsets on disk were no longer monotonically increasing, violating the assumption of OffsetIndex. We ended up fixing this by deleting segments 1094621529 and 1094831997 manually, and then removing the line for this partition from the cleaner-offset-checkpoint file (otherwise it would reference the non-existent segment 1094621529). This can happen even on a clean shutdown (the async deletes in replaceSegments might not happen). Cleaner logs post-startup: 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15. 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15... 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 segments in offset range [1094621529, 1095924157). 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete. 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones prior to Sun Apr 12 14:05:37 UTC 2015)... 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes. 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index. cleaned. at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) at kafka.log.LogSegment.append(LogSegment.scala:81) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427) at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358) at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354) at