[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499093#comment-14499093 ] Ewen Cheslack-Postava commented on KAFKA-2120: -- [~sureshms] They're related, but not the same thing. This JIRA is about the network requests made by the NetworkClient class, and is really about an internal implementation detail - we want to make sure the request times out so the client can make progress, even if it's never going to get a response. KAFKA-1788 is about timeouts for producer.send() calls which can never even make a network request because they can't make a connection to the broker. It's a user-facing issue. See the mailing list discussion thread linked in the KIP for more details. Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499210#comment-14499210 ] Suresh Srinivas commented on KAFKA-2120: [~ewencp], thanks for the explanation. Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-11: ACL Management
On Thu, Apr 16, 2015 at 6:13 PM, Jun Rao j...@confluent.io wrote: Hi, Gwen, What you suggested seems reasonable. I guess we will need the Principal, Privilege pair and the Resource in grant() and revoke()? I thought that Privilege is a Resource+Action, which is why grant and revoke can take list of principals and list of privileges. But as I said, anything that creates an ACL will work there. Is the Hive authorization api the following? It's weird that it takes user in checkPermissions(), but not in authorize(). http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.html I was thinking about this portion of the API: https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveAuthorizer.java I was imagining that a default implementation could be similar to how we store offsets in Kafka. Basically, store all acls in a special topic with compact retention. Then, every broker will build an in-memory cache off that topic. The default authorizer can definitely do it. My suggestion is that this is something the authorizer should worry about, not core Kafka and not the Kafka tools. TopicAdmin and TopicMetadata APIs should just invoke Authorizer.grant(), Authorizer.revoke(), Authorizer.show(), etc. Another thing that we haven't discussed so far is how to manage ACLs across different mirrored Kafka clusters. Let's say you use mirror maker to mirror all topics from cluster A to cluster B. You probably want to have exactly the same ACL on both A and B. It would be good if the ACL can be set up just once. If we use the above default implementation, since the ACL topic is mirrored too, the ACL will be propagated automatically. This is a great suggestion. Currently it is a huge pain-point in Hadoop. Obviously, this will only work if we know that all ACLs are in a Kafka topic, which may not be true for Sentry / Argus. I think its OK for MirrorMaker to support this only for DefaultAuthorizer and force Sentry/Argus to figure out their own replication. But it will also be awesome if we can come up with a way to replicate privileges that relies only on generic APIs and authorizer implementations of them. Gwen Thanks, Jun On Thu, Apr 16, 2015 at 9:44 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Authorization Fans, I'm starting a new thread on a specific sub-topic of KIP-11, since this is a bit long :) Currently KIP-11, as I understand it, proposes: * Authorizers are pluggable, with Kafka providing DefaultAuthorizer. * Kafka tools allow adding / managing ACLs. * Those ACLs are stored in ZK and cached in a new TopicCache * Authorizers can either use the ACLs defined and stored in Kafka, or define and use their own. I am concerned of two possible issues with this design: 1. Separation of concerns - only authorizers should worry about ACLs, and therefore the less code for ACLs that exist in Kafka core, the better. 2. User confusion - It sounded like we can define ACLs in Kafka itself but authorizers can also define their own, so kafka-topics --describe may show an ACL different than the one in use. This can be super confusing for admins. My alternative suggestion: * Authorizer API will include: grantPrivilege(ListPrincipals, ListPrivilege) revokePrivilege(ListPrincipals, ListPrivilege), getPrivilegesByPrincipal(Principal, Resource) (The exact API can be discussed in detail, but you get the idea) * Kafka tools will simply invoke these APIs when topics are added / modified / described. * Each authorizer (including the default one) will be responsible for storing, caching and using those ACLs. This way, we keep almost all ACL code with the Authorizer, where it belongs and users get a nice unified interface that reflects what is actually getting used in the system. This is pretty much how Sqoop and Hive implement their authorization APIs. What do you think? Gwen
Re: Review Request 32937: Patch for KAFKA-2102
On April 16, 2015, 5:34 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java, line 25 https://reviews.apache.org/r/32937/diff/3/?file=931249#file931249line25 This patch will definitely need a comment somewhere explaining the locking strategy and the reasoning behind it. It's won't be obvious even to someone familiar with other client code why this all works the way it does. I can add some documentation soon. On April 16, 2015, 5:34 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java, line 27 https://reviews.apache.org/r/32937/diff/3/?file=931249#file931249line27 Is there any benefit to using a Lock + Condition instead of the monitor for the object? Seems like it would make the code a bit simpler - you'd get rid of all the try/finally blocks. It does not seem like it does currently. I tend towards locks because I like the explicitness, accessibility to source, privateness, and distinction between moniter and condition. But if synchronized in preferred by the project, I can switch it to that. The only piece of functionality the lock would provide, would be if we wanted a timeout on the acquire in awaitUpdate. On April 16, 2015, 5:34 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java, line 69 https://reviews.apache.org/r/32937/diff/3/?file=931249#file931249line69 Are the lazySets accomplishing anything performance-wise? I doubt most people even know the method exists, let alone the implications. Some seem like they're probably ok, e.g. none of the code that uses the version number relies on seeing the updated value immediately. But others I'm less sure about (and the lack of documentation on lazySet makes it hard to be certain). For example, using updateRequested.lazySet() might work, but could give incorrect results if the old value is returned to a different thread. I think this only affects the network thread in the producer, but the consumer can potentially be called from different threads. Are we sure the lazySet works as expected in that case? Reasonable question on the performance. I'll take a look and if there is not much of a difference, move them all to .sets(). As to correctness, my understanding is that since the writes happen in the lock, releasing the lock ensures visability. - Tim --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32937/#review80298 --- On April 16, 2015, 12:56 a.m., Tim Brooks wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32937/ --- (Updated April 16, 2015, 12:56 a.m.) Review request for kafka. Bugs: KAFKA-2102 https://issues.apache.org/jira/browse/KAFKA-2102 Repository: kafka Description --- Method does not need to be synchronized Do not synchronize contains topic method Continue removing the need to synchronize the metadata object Store both last refresh and need to refresh in same variable Fix synchronize issue Version needs to be volatile rework how signally happens remove unnecessary creation of new set initialize 0 at the field level Fix the build Start moving synchronization of metadata to different class Start moving synchronization work to new class Remove unused code Functionality works. Not threadsafe move version into metadata synchronizer Make version volatile Rename classes move to finergrained locking Use locks in bookkeeper Only use atomic variabled use successful metadata in metrics Change these things back to trunk Diffs - clients/src/main/java/org/apache/kafka/clients/Metadata.java 07f1cdb1fe920b0c7a5f2d101ddc40c689e1b247 clients/src/main/java/org/apache/kafka/clients/MetadataBookkeeper.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a Diff: https://reviews.apache.org/r/32937/diff/ Testing --- Thanks, Tim Brooks
Review request for KAFKA-2088
KAFKA-2088 is ready for review, any one can help me review it? thanks. JIRA : https://issues.apache.org/jira/browse/KAFKA-2088 review request : https://reviews.apache.org/r/33132/diff/ ransom
Re: [DISCUSSION] KIP-11: ACL Management
I've probably been a DBA for too long, but I imagined something like: kafka-topic --topic t1 --grant user --action action kafka-topic --topic t1 --revoke user --action action (i.e. the commandline equivalent of grant select on table1 to gwenshap and revoke select on table2 from gwenshap) When you need gazillion of them, you generate a script with gazillion of those and execute. Maybe it just looks reasonable to me because I'm used to it, though :) Or maybe including the json parsing code in TopicCommand is not so bad? On Fri, Apr 17, 2015 at 10:30 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: * Yes, Acl pretty much captures everything. Originally I had resource as part of Acls, we can go back to that. * The describe can call getAcl and I plan to do so. addAcl is tricky because the user will have to specify the acls through command lines, which will probably be a location to some file. Basically the CLI won¹t know how to parse user input and convert it to a principal/acl that the plugin understands. We could add an API in authorizer that can take a file as input if we want ‹acl as an option during create. * Yes also getAcls(Principal principal). Thanks Parth On 4/17/15, 10:05 AM, Gwen Shapira gshap...@cloudera.com wrote: On Fri, Apr 17, 2015 at 9:31 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: I was following the storm model but I think this is a reasonable change. I recommend changing the API names to addAcls, removeAcls and getAcls. And they probably just need to get ListAcl instead of everything I specified? Looks like Acl encapsulates everything we need. Couple of points to ensure we are on same page: * With this approach the kafka command line will not provide a way to add/edit acls during topic creation, neither it will provide a way to modify the acls. It will be up to the authorizer to either define a command line utility or to allow other means to add acls(CLI/UI/REST). For the default implementation we can provide CLI. You looked into this deeper than I did - is there a reason TopicCommand can't invoke addACL and getACL? * We probably want to add ListAcl getAcls(Resource resource) so users can list all acls on a topic. Also getAcls(Principal princ)? I haven¹t looked at how consumer offsets are currently stored so I will have to take a look but I think that is implementation detail. Gwen,Jun and other interested parties, do you have time to jump on a quick hangout so we can go over some of the lower level details? Thanks Parth From: Tong Li liton...@us.ibm.commailto:liton...@us.ibm.com Reply-To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Date: Friday, April 17, 2015 at 7:34 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSSION] KIP-11: ACL Management Gwen, There is one product called ElasticSearch which has been quite successful. They recently added security, what they actually did is quite nice. They really separated Authentication and Authorization which many people get really confused about and often mix them up. I looked through what they did and quite impressed by it, I think there are many things we can borrow from. Here is a link to it. http://www.elastic.co/guide/en/shield/current/architecture.html. The product name is called shield which is implemented as an ElasticSearch plugin. The promise here is that you can have a running ElasticSearch, then you install this plugin, configure it, then your ElasticSearch service is secured. The goal should be really the same for Kafka, you have a Kafka service running, you install a new plugin (in this case security plugin), configure it, then your Kafka service is secured. I think that the key here is that we should introduce a true pluggable framework in Kafka which allows security, quota, encryption, compression, serialization/deserialization all being developed as plugins which can be all easily added and configured onto a running Kafka service, then the functions/features provided by the plugins will start working. Once we have this framework in, how a security plugin works internally becomes the really the concern of that plugin, for example, how a new user gets registered, permission granted, revoked, all these will be the concern of that plugin, rest of the Kafka components should not really be concerned about them. This way we are really following the design principal (Separation of concerns). With all that, what I am proposing is a true pluggable framework introduction into Kafka which I have also talked about in a previous email. For security we can implement a simple file based security plugin, other plugins such as LDAP, AD for authentication can come later, plugin for authorization such as RBAC can also come later if people care so much about using them. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205
[jira] [Commented] (KAFKA-2088) kafka-console-consumer.sh should not create zookeeper path when no brokers found and chroot was set in zookeeper.connect
[ https://issues.apache.org/jira/browse/KAFKA-2088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500286#comment-14500286 ] Gwen Shapira commented on KAFKA-2088: - Awesome, thanks for checking :) kafka-console-consumer.sh should not create zookeeper path when no brokers found and chroot was set in zookeeper.connect Key: KAFKA-2088 URL: https://issues.apache.org/jira/browse/KAFKA-2088 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.1 Reporter: Zhiqiang He Assignee: Zhiqiang He Priority: Minor Attachments: kafka-2088.1.patch 1. set server.properties server.properties: zookeeper.connect = 192.168.0.10:2181,192.168.0.10:2181,192.168.0.10:2181/kafka 2 default zookeeepr path: [zk: 192.168.0.10:2181(CONNECTED) 10] ls / [zookeeper, kafka, storm] 3.start console consumer use a not exist topic and zookeeper address without chroot. [root@stream client_0402]# kafka-console-consumer.sh --zookeeper 192.168.0.10:2181 --topic test --from-beginning [2015-04-03 18:15:28,599] WARN [console-consumer-63060_stream-1428056127990-d35ca648], no brokers found when trying to rebalance. (kafka.consumer.ZookeeperConsumerConnector) 4.then /consumer and /brokers path was create in zookeeper. [zk: 192.168.0.10:2181(CONNECTED) 2] ls / [zookeeper, consumers, kafka, storm, brokers] so it is a bug. consumer should not create /consumer and /brokers path . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-11: ACL Management
We could do this but I think its too simplistic plus now we are adding authorization related options in CLI which I thought everyone wants to avoid. When I say its too simplistic I mean there are missing options like —hosts, what happens when we start supporting group now we will probably end up adding —grant —groups”. I think we will just endup polluting kafka create CLI with all the different acl options or we will have 2 CLIs one for the basic stuff and for anything advance you will have to use a different tool. It might be better to just have a single separate ACL management CLI. Thanks Parth On 4/17/15, 10:42 AM, Gwen Shapira gshap...@cloudera.com wrote: I've probably been a DBA for too long, but I imagined something like: kafka-topic --topic t1 --grant user --action action kafka-topic --topic t1 --revoke user --action action (i.e. the commandline equivalent of grant select on table1 to gwenshap and revoke select on table2 from gwenshap) When you need gazillion of them, you generate a script with gazillion of those and execute. Maybe it just looks reasonable to me because I'm used to it, though :) Or maybe including the json parsing code in TopicCommand is not so bad? On Fri, Apr 17, 2015 at 10:30 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: * Yes, Acl pretty much captures everything. Originally I had resource as part of Acls, we can go back to that. * The describe can call getAcl and I plan to do so. addAcl is tricky because the user will have to specify the acls through command lines, which will probably be a location to some file. Basically the CLI won¹t know how to parse user input and convert it to a principal/acl that the plugin understands. We could add an API in authorizer that can take a file as input if we want ‹acl as an option during create. * Yes also getAcls(Principal principal). Thanks Parth On 4/17/15, 10:05 AM, Gwen Shapira gshap...@cloudera.com wrote: On Fri, Apr 17, 2015 at 9:31 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: I was following the storm model but I think this is a reasonable change. I recommend changing the API names to addAcls, removeAcls and getAcls. And they probably just need to get ListAcl instead of everything I specified? Looks like Acl encapsulates everything we need. Couple of points to ensure we are on same page: * With this approach the kafka command line will not provide a way to add/edit acls during topic creation, neither it will provide a way to modify the acls. It will be up to the authorizer to either define a command line utility or to allow other means to add acls(CLI/UI/REST). For the default implementation we can provide CLI. You looked into this deeper than I did - is there a reason TopicCommand can't invoke addACL and getACL? * We probably want to add ListAcl getAcls(Resource resource) so users can list all acls on a topic. Also getAcls(Principal princ)? I haven¹t looked at how consumer offsets are currently stored so I will have to take a look but I think that is implementation detail. Gwen,Jun and other interested parties, do you have time to jump on a quick hangout so we can go over some of the lower level details? Thanks Parth From: Tong Li liton...@us.ibm.commailto:liton...@us.ibm.com Reply-To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Date: Friday, April 17, 2015 at 7:34 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSSION] KIP-11: ACL Management Gwen, There is one product called ElasticSearch which has been quite successful. They recently added security, what they actually did is quite nice. They really separated Authentication and Authorization which many people get really confused about and often mix them up. I looked through what they did and quite impressed by it, I think there are many things we can borrow from. Here is a link to it. http://www.elastic.co/guide/en/shield/current/architecture.html. The product name is called shield which is implemented as an ElasticSearch plugin. The promise here is that you can have a running ElasticSearch, then you install this plugin, configure it, then your ElasticSearch service is secured. The goal should be really the same for Kafka, you have a Kafka service running, you install a new plugin (in this case security plugin), configure it, then your Kafka service is secured. I think that the key here is that we should introduce a true pluggable framework in Kafka which allows security, quota, encryption, compression, serialization/deserialization all being developed as plugins which can be all easily added and configured onto a running Kafka service, then the functions/features provided by the plugins will start working. Once we have this framework in, how a security plugin works internally becomes the really the concern of that plugin, for example, how a new user gets registered, permission
Re: [DISCUSSION] KIP-11: ACL Management
* Yes, Acl pretty much captures everything. Originally I had resource as part of Acls, we can go back to that. * The describe can call getAcl and I plan to do so. addAcl is tricky because the user will have to specify the acls through command lines, which will probably be a location to some file. Basically the CLI won¹t know how to parse user input and convert it to a principal/acl that the plugin understands. We could add an API in authorizer that can take a file as input if we want ‹acl as an option during create. * Yes also getAcls(Principal principal). Thanks Parth On 4/17/15, 10:05 AM, Gwen Shapira gshap...@cloudera.com wrote: On Fri, Apr 17, 2015 at 9:31 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: I was following the storm model but I think this is a reasonable change. I recommend changing the API names to addAcls, removeAcls and getAcls. And they probably just need to get ListAcl instead of everything I specified? Looks like Acl encapsulates everything we need. Couple of points to ensure we are on same page: * With this approach the kafka command line will not provide a way to add/edit acls during topic creation, neither it will provide a way to modify the acls. It will be up to the authorizer to either define a command line utility or to allow other means to add acls(CLI/UI/REST). For the default implementation we can provide CLI. You looked into this deeper than I did - is there a reason TopicCommand can't invoke addACL and getACL? * We probably want to add ListAcl getAcls(Resource resource) so users can list all acls on a topic. Also getAcls(Principal princ)? I haven¹t looked at how consumer offsets are currently stored so I will have to take a look but I think that is implementation detail. Gwen,Jun and other interested parties, do you have time to jump on a quick hangout so we can go over some of the lower level details? Thanks Parth From: Tong Li liton...@us.ibm.commailto:liton...@us.ibm.com Reply-To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Date: Friday, April 17, 2015 at 7:34 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSSION] KIP-11: ACL Management Gwen, There is one product called ElasticSearch which has been quite successful. They recently added security, what they actually did is quite nice. They really separated Authentication and Authorization which many people get really confused about and often mix them up. I looked through what they did and quite impressed by it, I think there are many things we can borrow from. Here is a link to it. http://www.elastic.co/guide/en/shield/current/architecture.html. The product name is called shield which is implemented as an ElasticSearch plugin. The promise here is that you can have a running ElasticSearch, then you install this plugin, configure it, then your ElasticSearch service is secured. The goal should be really the same for Kafka, you have a Kafka service running, you install a new plugin (in this case security plugin), configure it, then your Kafka service is secured. I think that the key here is that we should introduce a true pluggable framework in Kafka which allows security, quota, encryption, compression, serialization/deserialization all being developed as plugins which can be all easily added and configured onto a running Kafka service, then the functions/features provided by the plugins will start working. Once we have this framework in, how a security plugin works internally becomes the really the concern of that plugin, for example, how a new user gets registered, permission granted, revoked, all these will be the concern of that plugin, rest of the Kafka components should not really be concerned about them. This way we are really following the design principal (Separation of concerns). With all that, what I am proposing is a true pluggable framework introduction into Kafka which I have also talked about in a previous email. For security we can implement a simple file based security plugin, other plugins such as LDAP, AD for authentication can come later, plugin for authorization such as RBAC can also come later if people care so much about using them. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.commailto:liton...@us.ibm.com [Inactive hide details for Gwen Shapira ---04/16/2015 12:44:54 PM---Hi Kafka Authorization Fans, I'm starting a new thread on a]Gwen Shapira ---04/16/2015 12:44:54 PM---Hi Kafka Authorization Fans, I'm starting a new thread on a specific sub-topic of KIP-11, since From: Gwen Shapira gshap...@cloudera.commailto:gshap...@cloudera.com To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Date: 04/16/2015 12:44 PM Subject: [DISCUSSION] KIP-11: ACL Management Hi Kafka Authorization Fans,
[jira] [Commented] (KAFKA-2088) kafka-console-consumer.sh should not create zookeeper path when no brokers found and chroot was set in zookeeper.connect
[ https://issues.apache.org/jira/browse/KAFKA-2088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500330#comment-14500330 ] Jun Rao commented on KAFKA-2088: [~ransom], thanks for the patch. It may be reasonable to start a console consumer before the data is produced or even the Kafka cluster is started. It just won't get any data until the cluster is up and the data is produced. It's reasonable for the console consumer to create /consumer path. However, it probably shouldn't create /brokers, do you know what triggered this inside console consumer? kafka-console-consumer.sh should not create zookeeper path when no brokers found and chroot was set in zookeeper.connect Key: KAFKA-2088 URL: https://issues.apache.org/jira/browse/KAFKA-2088 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.1 Reporter: Zhiqiang He Assignee: Zhiqiang He Priority: Minor Attachments: kafka-2088.1.patch 1. set server.properties server.properties: zookeeper.connect = 192.168.0.10:2181,192.168.0.10:2181,192.168.0.10:2181/kafka 2 default zookeeepr path: [zk: 192.168.0.10:2181(CONNECTED) 10] ls / [zookeeper, kafka, storm] 3.start console consumer use a not exist topic and zookeeper address without chroot. [root@stream client_0402]# kafka-console-consumer.sh --zookeeper 192.168.0.10:2181 --topic test --from-beginning [2015-04-03 18:15:28,599] WARN [console-consumer-63060_stream-1428056127990-d35ca648], no brokers found when trying to rebalance. (kafka.consumer.ZookeeperConsumerConnector) 4.then /consumer and /brokers path was create in zookeeper. [zk: 192.168.0.10:2181(CONNECTED) 2] ls / [zookeeper, consumers, kafka, storm, brokers] so it is a bug. consumer should not create /consumer and /brokers path . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2088) kafka-console-consumer.sh should not create zookeeper path when no brokers found and chroot was set in zookeeper.connect
[ https://issues.apache.org/jira/browse/KAFKA-2088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500289#comment-14500289 ] Gwen Shapira commented on KAFKA-2088: - Pinging [~junrao] [~jjkoshy] [~nehanarkhede] [~guozhang] for taking a look and perhaps committing :) kafka-console-consumer.sh should not create zookeeper path when no brokers found and chroot was set in zookeeper.connect Key: KAFKA-2088 URL: https://issues.apache.org/jira/browse/KAFKA-2088 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.1 Reporter: Zhiqiang He Assignee: Zhiqiang He Priority: Minor Attachments: kafka-2088.1.patch 1. set server.properties server.properties: zookeeper.connect = 192.168.0.10:2181,192.168.0.10:2181,192.168.0.10:2181/kafka 2 default zookeeepr path: [zk: 192.168.0.10:2181(CONNECTED) 10] ls / [zookeeper, kafka, storm] 3.start console consumer use a not exist topic and zookeeper address without chroot. [root@stream client_0402]# kafka-console-consumer.sh --zookeeper 192.168.0.10:2181 --topic test --from-beginning [2015-04-03 18:15:28,599] WARN [console-consumer-63060_stream-1428056127990-d35ca648], no brokers found when trying to rebalance. (kafka.consumer.ZookeeperConsumerConnector) 4.then /consumer and /brokers path was create in zookeeper. [zk: 192.168.0.10:2181(CONNECTED) 2] ls / [zookeeper, consumers, kafka, storm, brokers] so it is a bug. consumer should not create /consumer and /brokers path . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-11: ACL Management
On Fri, Apr 17, 2015 at 9:31 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: I was following the storm model but I think this is a reasonable change. I recommend changing the API names to addAcls, removeAcls and getAcls. And they probably just need to get ListAcl instead of everything I specified? Looks like Acl encapsulates everything we need. Couple of points to ensure we are on same page: * With this approach the kafka command line will not provide a way to add/edit acls during topic creation, neither it will provide a way to modify the acls. It will be up to the authorizer to either define a command line utility or to allow other means to add acls(CLI/UI/REST). For the default implementation we can provide CLI. You looked into this deeper than I did - is there a reason TopicCommand can't invoke addACL and getACL? * We probably want to add ListAcl getAcls(Resource resource) so users can list all acls on a topic. Also getAcls(Principal princ)? I haven’t looked at how consumer offsets are currently stored so I will have to take a look but I think that is implementation detail. Gwen,Jun and other interested parties, do you have time to jump on a quick hangout so we can go over some of the lower level details? Thanks Parth From: Tong Li liton...@us.ibm.commailto:liton...@us.ibm.com Reply-To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Date: Friday, April 17, 2015 at 7:34 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSSION] KIP-11: ACL Management Gwen, There is one product called ElasticSearch which has been quite successful. They recently added security, what they actually did is quite nice. They really separated Authentication and Authorization which many people get really confused about and often mix them up. I looked through what they did and quite impressed by it, I think there are many things we can borrow from. Here is a link to it. http://www.elastic.co/guide/en/shield/current/architecture.html. The product name is called shield which is implemented as an ElasticSearch plugin. The promise here is that you can have a running ElasticSearch, then you install this plugin, configure it, then your ElasticSearch service is secured. The goal should be really the same for Kafka, you have a Kafka service running, you install a new plugin (in this case security plugin), configure it, then your Kafka service is secured. I think that the key here is that we should introduce a true pluggable framework in Kafka which allows security, quota, encryption, compression, serialization/deserialization all being developed as plugins which can be all easily added and configured onto a running Kafka service, then the functions/features provided by the plugins will start working. Once we have this framework in, how a security plugin works internally becomes the really the concern of that plugin, for example, how a new user gets registered, permission granted, revoked, all these will be the concern of that plugin, rest of the Kafka components should not really be concerned about them. This way we are really following the design principal (Separation of concerns). With all that, what I am proposing is a true pluggable framework introduction into Kafka which I have also talked about in a previous email. For security we can implement a simple file based security plugin, other plugins such as LDAP, AD for authentication can come later, plugin for authorization such as RBAC can also come later if people care so much about using them. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.commailto:liton...@us.ibm.com [Inactive hide details for Gwen Shapira ---04/16/2015 12:44:54 PM---Hi Kafka Authorization Fans, I'm starting a new thread on a]Gwen Shapira ---04/16/2015 12:44:54 PM---Hi Kafka Authorization Fans, I'm starting a new thread on a specific sub-topic of KIP-11, since From: Gwen Shapira gshap...@cloudera.commailto:gshap...@cloudera.com To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Date: 04/16/2015 12:44 PM Subject: [DISCUSSION] KIP-11: ACL Management Hi Kafka Authorization Fans, I'm starting a new thread on a specific sub-topic of KIP-11, since this is a bit long :) Currently KIP-11, as I understand it, proposes: * Authorizers are pluggable, with Kafka providing DefaultAuthorizer. * Kafka tools allow adding / managing ACLs. * Those ACLs are stored in ZK and cached in a new TopicCache * Authorizers can either use the ACLs defined and stored in Kafka, or define and use their own. I am concerned of two possible issues with this design: 1. Separation of concerns - only authorizers should worry about ACLs, and therefore the
[jira] [Comment Edited] (KAFKA-1994) Evaluate performance effect of chroot check on Topic creation
[ https://issues.apache.org/jira/browse/KAFKA-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500437#comment-14500437 ] Ashish K Singh edited comment on KAFKA-1994 at 4/17/15 6:56 PM: [~junrao], [~gwenshap], does this look OK now? was (Author: singhashish): [~junrao], [~gwenshap], does this look on now? Evaluate performance effect of chroot check on Topic creation - Key: KAFKA-1994 URL: https://issues.apache.org/jira/browse/KAFKA-1994 Project: Kafka Issue Type: Improvement Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-1994.patch, KAFKA-1994_2015-03-03_18:19:45.patch KAFKA-1664 adds check for chroot while creating a node in ZK. ZkPath checks if namespace exists before trying to create a path in ZK. This raises a concern that checking namespace for each path creation might be unnecessary and can potentially make creations expensive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1994) Evaluate performance effect of chroot check on Topic creation
[ https://issues.apache.org/jira/browse/KAFKA-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500437#comment-14500437 ] Ashish K Singh commented on KAFKA-1994: --- [~junrao], [~gwenshap], does this look on now? Evaluate performance effect of chroot check on Topic creation - Key: KAFKA-1994 URL: https://issues.apache.org/jira/browse/KAFKA-1994 Project: Kafka Issue Type: Improvement Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-1994.patch, KAFKA-1994_2015-03-03_18:19:45.patch KAFKA-1664 adds check for chroot while creating a node in ZK. ZkPath checks if namespace exists before trying to create a path in ZK. This raises a concern that checking namespace for each path creation might be unnecessary and can potentially make creations expensive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2088) kafka-console-consumer.sh should not create zookeeper path when no brokers found and chroot was set in zookeeper.connect
[ https://issues.apache.org/jira/browse/KAFKA-2088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500473#comment-14500473 ] Gwen Shapira commented on KAFKA-2088: - I think the main problem here is that someone mistyping the zookeeper chroot path (or points a consumer at the wrong ZK) looks indistinguishable from someone starting a consumer before the cluster exists. The first is far more common though (at least in my experience), so it will be nice to warn the user that things are not as he expected. kafka-console-consumer.sh should not create zookeeper path when no brokers found and chroot was set in zookeeper.connect Key: KAFKA-2088 URL: https://issues.apache.org/jira/browse/KAFKA-2088 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.1 Reporter: Zhiqiang He Assignee: Zhiqiang He Priority: Minor Attachments: kafka-2088.1.patch 1. set server.properties server.properties: zookeeper.connect = 192.168.0.10:2181,192.168.0.10:2181,192.168.0.10:2181/kafka 2 default zookeeepr path: [zk: 192.168.0.10:2181(CONNECTED) 10] ls / [zookeeper, kafka, storm] 3.start console consumer use a not exist topic and zookeeper address without chroot. [root@stream client_0402]# kafka-console-consumer.sh --zookeeper 192.168.0.10:2181 --topic test --from-beginning [2015-04-03 18:15:28,599] WARN [console-consumer-63060_stream-1428056127990-d35ca648], no brokers found when trying to rebalance. (kafka.consumer.ZookeeperConsumerConnector) 4.then /consumer and /brokers path was create in zookeeper. [zk: 192.168.0.10:2181(CONNECTED) 2] ls / [zookeeper, consumers, kafka, storm, brokers] so it is a bug. consumer should not create /consumer and /brokers path . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-11: ACL Management
Hi, Parth, How does this work in Hive? I thought authorization in Hive always goes through it's SQL cli for any authorization plugin. When integrating with Ranger(Argus), does Hive do authorization through a separate CLI? Thanks, Jun On Fri, Apr 17, 2015 at 11:01 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: We could do this but I think its too simplistic plus now we are adding authorization related options in CLI which I thought everyone wants to avoid. When I say its too simplistic I mean there are missing options like —hosts, what happens when we start supporting group now we will probably end up adding —grant —groups”. I think we will just endup polluting kafka create CLI with all the different acl options or we will have 2 CLIs one for the basic stuff and for anything advance you will have to use a different tool. It might be better to just have a single separate ACL management CLI. Thanks Parth On 4/17/15, 10:42 AM, Gwen Shapira gshap...@cloudera.com wrote: I've probably been a DBA for too long, but I imagined something like: kafka-topic --topic t1 --grant user --action action kafka-topic --topic t1 --revoke user --action action (i.e. the commandline equivalent of grant select on table1 to gwenshap and revoke select on table2 from gwenshap) When you need gazillion of them, you generate a script with gazillion of those and execute. Maybe it just looks reasonable to me because I'm used to it, though :) Or maybe including the json parsing code in TopicCommand is not so bad? On Fri, Apr 17, 2015 at 10:30 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: * Yes, Acl pretty much captures everything. Originally I had resource as part of Acls, we can go back to that. * The describe can call getAcl and I plan to do so. addAcl is tricky because the user will have to specify the acls through command lines, which will probably be a location to some file. Basically the CLI won¹t know how to parse user input and convert it to a principal/acl that the plugin understands. We could add an API in authorizer that can take a file as input if we want ‹acl as an option during create. * Yes also getAcls(Principal principal). Thanks Parth On 4/17/15, 10:05 AM, Gwen Shapira gshap...@cloudera.com wrote: On Fri, Apr 17, 2015 at 9:31 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: I was following the storm model but I think this is a reasonable change. I recommend changing the API names to addAcls, removeAcls and getAcls. And they probably just need to get ListAcl instead of everything I specified? Looks like Acl encapsulates everything we need. Couple of points to ensure we are on same page: * With this approach the kafka command line will not provide a way to add/edit acls during topic creation, neither it will provide a way to modify the acls. It will be up to the authorizer to either define a command line utility or to allow other means to add acls(CLI/UI/REST). For the default implementation we can provide CLI. You looked into this deeper than I did - is there a reason TopicCommand can't invoke addACL and getACL? * We probably want to add ListAcl getAcls(Resource resource) so users can list all acls on a topic. Also getAcls(Principal princ)? I haven¹t looked at how consumer offsets are currently stored so I will have to take a look but I think that is implementation detail. Gwen,Jun and other interested parties, do you have time to jump on a quick hangout so we can go over some of the lower level details? Thanks Parth From: Tong Li liton...@us.ibm.commailto:liton...@us.ibm.com Reply-To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Date: Friday, April 17, 2015 at 7:34 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSSION] KIP-11: ACL Management Gwen, There is one product called ElasticSearch which has been quite successful. They recently added security, what they actually did is quite nice. They really separated Authentication and Authorization which many people get really confused about and often mix them up. I looked through what they did and quite impressed by it, I think there are many things we can borrow from. Here is a link to it. http://www.elastic.co/guide/en/shield/current/architecture.html. The product name is called shield which is implemented as an ElasticSearch plugin. The promise here is that you can have a running ElasticSearch, then you install this plugin, configure it, then your ElasticSearch service is secured. The goal should be really the same for Kafka, you have a Kafka service running, you install a new plugin (in this case security plugin), configure it, then your Kafka service is secured. I think that the key here is that we should introduce a true pluggable framework in Kafka
Re: [DISCUSSION] KIP-11: ACL Management
I have copied Thejas from hive team in cc list. Here is what I learnt from him * Hive calls the authorizer plugin if you execute “grant/revoke Operation to User on Table. They use this as hive provides the SQL layer and SQL has standards for grant/revoke which they follow. * If the plugin provides more entities then what can be expressed by the above statement (like unix/ldap groups or host level control) you have to go to the plugin’s CLI/UI to create this acl. So as mentioned below you will have 2 tools. One for the very basic grant/revoke access and for anything complex you have a secondary interface provided by Authorizer plugin. Thanks Parth On 4/17/15, 12:01 PM, Jun Rao j...@confluent.io wrote: Hi, Parth, How does this work in Hive? I thought authorization in Hive always goes through it's SQL cli for any authorization plugin. When integrating with Ranger(Argus), does Hive do authorization through a separate CLI? Thanks, Jun On Fri, Apr 17, 2015 at 11:01 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: We could do this but I think its too simplistic plus now we are adding authorization related options in CLI which I thought everyone wants to avoid. When I say its too simplistic I mean there are missing options like —hosts, what happens when we start supporting group now we will probably end up adding —grant —groups”. I think we will just endup polluting kafka create CLI with all the different acl options or we will have 2 CLIs one for the basic stuff and for anything advance you will have to use a different tool. It might be better to just have a single separate ACL management CLI. Thanks Parth On 4/17/15, 10:42 AM, Gwen Shapira gshap...@cloudera.com wrote: I've probably been a DBA for too long, but I imagined something like: kafka-topic --topic t1 --grant user --action action kafka-topic --topic t1 --revoke user --action action (i.e. the commandline equivalent of grant select on table1 to gwenshap and revoke select on table2 from gwenshap) When you need gazillion of them, you generate a script with gazillion of those and execute. Maybe it just looks reasonable to me because I'm used to it, though :) Or maybe including the json parsing code in TopicCommand is not so bad? On Fri, Apr 17, 2015 at 10:30 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: * Yes, Acl pretty much captures everything. Originally I had resource as part of Acls, we can go back to that. * The describe can call getAcl and I plan to do so. addAcl is tricky because the user will have to specify the acls through command lines, which will probably be a location to some file. Basically the CLI won¹t know how to parse user input and convert it to a principal/acl that the plugin understands. We could add an API in authorizer that can take a file as input if we want ‹acl as an option during create. * Yes also getAcls(Principal principal). Thanks Parth On 4/17/15, 10:05 AM, Gwen Shapira gshap...@cloudera.com wrote: On Fri, Apr 17, 2015 at 9:31 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: I was following the storm model but I think this is a reasonable change. I recommend changing the API names to addAcls, removeAcls and getAcls. And they probably just need to get ListAcl instead of everything I specified? Looks like Acl encapsulates everything we need. Couple of points to ensure we are on same page: * With this approach the kafka command line will not provide a way to add/edit acls during topic creation, neither it will provide a way to modify the acls. It will be up to the authorizer to either define a command line utility or to allow other means to add acls(CLI/UI/REST). For the default implementation we can provide CLI. You looked into this deeper than I did - is there a reason TopicCommand can't invoke addACL and getACL? * We probably want to add ListAcl getAcls(Resource resource) so users can list all acls on a topic. Also getAcls(Principal princ)? I haven¹t looked at how consumer offsets are currently stored so I will have to take a look but I think that is implementation detail. Gwen,Jun and other interested parties, do you have time to jump on a quick hangout so we can go over some of the lower level details? Thanks Parth From: Tong Li liton...@us.ibm.commailto:liton...@us.ibm.com Reply-To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Date: Friday, April 17, 2015 at 7:34 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSSION] KIP-11: ACL Management Gwen, There is one product called ElasticSearch which has been quite successful. They recently added security, what they actually did is quite nice. They really separated Authentication and Authorization which many people get really confused about and often mix them up. I
Re: Review Request 31590: Patch for KAFKA-1990
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31590/#review80526 --- Thanks for the patch. A few comments below. Also, yes, LogConfigTest should be in unit/kafka/log instead of kafka/log. Could you fix it in this patch too? core/src/main/scala/kafka/log/LogManager.scala https://reviews.apache.org/r/31590/#comment130516 See the coding convention comment below. core/src/main/scala/kafka/log/LogManager.scala https://reviews.apache.org/r/31590/#comment130504 Our current coding convention is not to wrap single line statement with {}. core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/31590/#comment130525 The original code is more idiomatic in scala. low. - Jun Rao On March 10, 2015, 4:58 a.m., Jeff Holoman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31590/ --- (Updated March 10, 2015, 4:58 a.m.) Review request for kafka and Gwen Shapira. Bugs: KAFKA-1990 https://issues.apache.org/jira/browse/KAFKA-1990 Repository: kafka Description --- KAFKA-1990 Diffs - core/src/main/scala/kafka/log/LogConfig.scala 8b67aee3a37765178b30d79e9e7bb882bdc89c29 core/src/main/scala/kafka/log/LogManager.scala 47d250af62c1aa53d11204a332d0684fb4217c8d core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 core/src/test/scala/kafka/log/LogConfigTest.scala 9690f141be75202973085025444b52208ebd5ab2 core/src/test/scala/unit/kafka/admin/AdminTest.scala ee0b21e6a94ad79c11dd08f6e5adf98c333e2ec9 core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 7f47e6f9a74314ed9e9f19d0c97931f3f2e49259 Diff: https://reviews.apache.org/r/31590/diff/ Testing --- Updated for ConfigDef. Open question. Is there any reason that LogConfigTest is located in kafka/log rather than unit/kafka/log ? Thanks, Jeff Holoman
[jira] [Commented] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500750#comment-14500750 ] Ashish K Singh commented on KAFKA-1982: --- Updated reviewboard https://reviews.apache.org/r/31369/ against branch trunk change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-1982: -- Status: Patch Available (was: In Progress) change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-1982: -- Attachment: KAFKA-1982_2015-04-17_14:49:34.patch change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31369: Patch for KAFKA-1982
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/ --- (Updated April 17, 2015, 9:49 p.m.) Review request for kafka. Bugs: KAFKA-1982 https://issues.apache.org/jira/browse/KAFKA-1982 Repository: kafka Description --- KAFKA-1982: change kafka.examples.Producer to use the new java producer Diffs (updated) - clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java f5cd61c1aa9433524da0b83826a766389de68a0b examples/README 53db6969b2e2d49e23ab13283b9146848e37434e examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 1239394190fe557e025fbd8f3803334402b0aeea examples/src/main/java/kafka/examples/Producer.java 96e98933148d07564c1b30ba8e805e2433c2adc8 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java 0d66fe5f8819194c8624aed4a21105733c20cc8e Diff: https://reviews.apache.org/r/31369/diff/ Testing --- Thanks, Ashish Singh
Re: Review Request 31369: Patch for KAFKA-1982
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/ --- (Updated April 17, 2015, 10:09 p.m.) Review request for kafka. Bugs: KAFKA-1982 https://issues.apache.org/jira/browse/KAFKA-1982 Repository: kafka Description --- KAFKA-1982: change kafka.examples.Producer to use the new java producer Diffs (updated) - clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java f5cd61c1aa9433524da0b83826a766389de68a0b examples/README 53db6969b2e2d49e23ab13283b9146848e37434e examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 1239394190fe557e025fbd8f3803334402b0aeea examples/src/main/java/kafka/examples/Producer.java 96e98933148d07564c1b30ba8e805e2433c2adc8 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java 0d66fe5f8819194c8624aed4a21105733c20cc8e Diff: https://reviews.apache.org/r/31369/diff/ Testing --- Thanks, Ashish Singh
[jira] [Updated] (KAFKA-2130) Resource leakage in AppInfo.scala during initialization
[ https://issues.apache.org/jira/browse/KAFKA-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sebastien Zimmer updated KAFKA-2130: Status: Patch Available (was: Open) diff --git a/core/src/main/scala/kafka/common/AppInfo.scala b/core/src/main/scala/kafka/common/AppInfo.scala index d642ca5..ecc1dc5 100644 --- a/core/src/main/scala/kafka/common/AppInfo.scala +++ b/core/src/main/scala/kafka/common/AppInfo.scala @@ -45,8 +45,10 @@ object AppInfo extends KafkaMetricsGroup { val manifestPath = classPath.substring(0, classPath.lastIndexOf(!) + 1) + /META-INF/MANIFEST.MF val mf = new Manifest - mf.read(new URL(manifestPath).openStream()) + val stream = new URL(manifestPath).openStream() + mf.read(stream) val version = mf.getMainAttributes.get(new Attributes.Name(Version)).toString + stream.close() newGauge(Version, new Gauge[String] { Resource leakage in AppInfo.scala during initialization --- Key: KAFKA-2130 URL: https://issues.apache.org/jira/browse/KAFKA-2130 Project: Kafka Issue Type: Bug Reporter: Sebastien Zimmer Priority: Trivial Labels: patch Minor InputStream leakage during the server initialization in AppInfo.scala. Patch attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2130) Resource leakage in AppInfo.scala during initialization
[ https://issues.apache.org/jira/browse/KAFKA-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sebastien Zimmer updated KAFKA-2130: Attachment: patch.diff Resource leakage in AppInfo.scala during initialization --- Key: KAFKA-2130 URL: https://issues.apache.org/jira/browse/KAFKA-2130 Project: Kafka Issue Type: Bug Reporter: Sebastien Zimmer Priority: Trivial Labels: patch Attachments: patch.diff Minor InputStream leakage during the server initialization in AppInfo.scala. Patch attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-2130) Resource leakage in AppInfo.scala during initialization
[ https://issues.apache.org/jira/browse/KAFKA-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sebastien Zimmer updated KAFKA-2130: Comment: was deleted (was: diff --git a/core/src/main/scala/kafka/common/AppInfo.scala b/core/src/main/scala/kafka/common/AppInfo.scala index d642ca5..ecc1dc5 100644 --- a/core/src/main/scala/kafka/common/AppInfo.scala +++ b/core/src/main/scala/kafka/common/AppInfo.scala @@ -45,8 +45,10 @@ object AppInfo extends KafkaMetricsGroup { val manifestPath = classPath.substring(0, classPath.lastIndexOf(!) + 1) + /META-INF/MANIFEST.MF val mf = new Manifest - mf.read(new URL(manifestPath).openStream()) + val stream = new URL(manifestPath).openStream() + mf.read(stream) val version = mf.getMainAttributes.get(new Attributes.Name(Version)).toString + stream.close() newGauge(Version, new Gauge[String] { ) Resource leakage in AppInfo.scala during initialization --- Key: KAFKA-2130 URL: https://issues.apache.org/jira/browse/KAFKA-2130 Project: Kafka Issue Type: Bug Reporter: Sebastien Zimmer Priority: Trivial Labels: patch Attachments: patch.diff Minor InputStream leakage during the server initialization in AppInfo.scala. Patch attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1990) Add unlimited time-based log retention
[ https://issues.apache.org/jira/browse/KAFKA-1990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1990: --- Status: In Progress (was: Patch Available) Add unlimited time-based log retention -- Key: KAFKA-1990 URL: https://issues.apache.org/jira/browse/KAFKA-1990 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Jeff Holoman Labels: newbie Attachments: KAFKA-1990_2015-03-10_00:55:11.patch Currently you can set log.retention.bytes = -1 to disable size based retention (in fact that is the default). However, there is no equivalent for time based retention. You can set time based retention to something really big like 2147483647 hours, which in practical terms is forever, but it is kind of silly to require this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-1982: -- Attachment: KAFKA-1982_2015-04-17_15:08:54.patch change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch, KAFKA-1982_2015-04-17_15:08:54.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31369: Patch for KAFKA-1982
On April 7, 2015, 10:41 p.m., Jun Rao wrote: Sorry for the late review. A few more comments below. Done! - Ashish --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/#review77523 --- On April 17, 2015, 10:09 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/ --- (Updated April 17, 2015, 10:09 p.m.) Review request for kafka. Bugs: KAFKA-1982 https://issues.apache.org/jira/browse/KAFKA-1982 Repository: kafka Description --- KAFKA-1982: change kafka.examples.Producer to use the new java producer Diffs - clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java f5cd61c1aa9433524da0b83826a766389de68a0b examples/README 53db6969b2e2d49e23ab13283b9146848e37434e examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 1239394190fe557e025fbd8f3803334402b0aeea examples/src/main/java/kafka/examples/Producer.java 96e98933148d07564c1b30ba8e805e2433c2adc8 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java 0d66fe5f8819194c8624aed4a21105733c20cc8e Diff: https://reviews.apache.org/r/31369/diff/ Testing --- Thanks, Ashish Singh
[jira] [Commented] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500783#comment-14500783 ] Ashish K Singh commented on KAFKA-1982: --- Updated reviewboard https://reviews.apache.org/r/31369/ against branch trunk change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch, KAFKA-1982_2015-04-17_15:08:54.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2130) Resource leakage in AppInfo.scala during initialization
Sebastien Zimmer created KAFKA-2130: --- Summary: Resource leakage in AppInfo.scala during initialization Key: KAFKA-2130 URL: https://issues.apache.org/jira/browse/KAFKA-2130 Project: Kafka Issue Type: Bug Reporter: Sebastien Zimmer Priority: Trivial Minor InputStream leakage during the server initialization in AppInfo.scala. Patch attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31369: Patch for KAFKA-1982
On March 25, 2015, 4:48 p.m., Mayuresh Gharat wrote: Thanks for the review. Addressed your comment. - Ashish --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/#review77747 --- On April 17, 2015, 10:09 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/ --- (Updated April 17, 2015, 10:09 p.m.) Review request for kafka. Bugs: KAFKA-1982 https://issues.apache.org/jira/browse/KAFKA-1982 Repository: kafka Description --- KAFKA-1982: change kafka.examples.Producer to use the new java producer Diffs - clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java f5cd61c1aa9433524da0b83826a766389de68a0b examples/README 53db6969b2e2d49e23ab13283b9146848e37434e examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 1239394190fe557e025fbd8f3803334402b0aeea examples/src/main/java/kafka/examples/Producer.java 96e98933148d07564c1b30ba8e805e2433c2adc8 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java 0d66fe5f8819194c8624aed4a21105733c20cc8e Diff: https://reviews.apache.org/r/31369/diff/ Testing --- Thanks, Ashish Singh
Re: Review Request 33168: Fix recovery of swap files after broker crash
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33168/#review80543 --- Thanks for the patch. Great finding and very comprehensive tests! Just a couple of minor comments below. core/src/main/scala/kafka/log/Log.scala https://reviews.apache.org/r/33168/#comment130557 It's not clear what this operation is. It seems to refer to replaceSegments(), bit replaceSegments() doesn't create new suffix .cleaned. So, it probably should refer to the cleaning process. core/src/main/scala/kafka/log/Log.scala https://reviews.apache.org/r/33168/#comment130550 What happens if the broker crashes just before the swapping in line 805. Then, when calling replaceSegments() during broker restart, you will have old segments already with suffix .delete. In asyncDeleteSegment(), we will then try to rename the file by adding another .delete suffix. The deletion will probaby still work, but this will be a bit wierd. - Jun Rao On April 15, 2015, 9:44 a.m., Rajini Sivaram wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33168/ --- (Updated April 15, 2015, 9:44 a.m.) Review request for kafka. Bugs: KAFKA-2118 https://issues.apache.org/jira/browse/KAFKA-2118 Repository: kafka Description --- Patch for KAFKA-2118: Fix recovery of cleaned segments after broker crash Diffs - core/src/main/scala/kafka/log/Log.scala 5563f2de8113a0ece8929bec9c75dbf892abbb66 core/src/test/scala/unit/kafka/log/CleanerTest.scala 9792ed689033dbd4ad99809a4e566136d2b9fadf Diff: https://reviews.apache.org/r/33168/diff/ Testing --- Thanks, Rajini Sivaram
[jira] [Updated] (KAFKA-2118) Cleaner cannot clean after shutdown during replaceSegments
[ https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2118: --- Status: In Progress (was: Patch Available) Cleaner cannot clean after shutdown during replaceSegments -- Key: KAFKA-2118 URL: https://issues.apache.org/jira/browse/KAFKA-2118 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Gian Merlino Assignee: Rajini Sivaram Attachments: KAFKA-2118.patch, KAFKA-2118_2015-04-15_09:43:51.patch If a broker shuts down after the cleaner calls replaceSegments with more than one segment, the partition can be left in an uncleanable state. We saw this on a few brokers after doing a rolling update. The sequence of things we saw is: 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 into a new segment 0. 2) Cleaner logged Swapping in cleaned segment 0 for segment(s) 0,1094621529,1094831997 in log xxx-15. and called replaceSegments. 3) 0.cleaned was renamed to 0.swap. 4) Broker shut down before deleting segments 1094621529 and 1094831997. 5) Broker started up and logged Found log file /mnt/persistent/kafka-logs/xxx-15/.log.swap from interrupted swap operation, repairing. 6) Cleaner thread died with the exception kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index.cleaned. I think what's happening in #6 is that when the broker started back up and repaired the log, segment 0 ended up with a bunch of messages that were also in segment 1094621529 and 1094831997 (because the new segment 0 was created from cleaning all 3). But segments 1094621529 and 1094831997 were still on disk, so offsets on disk were no longer monotonically increasing, violating the assumption of OffsetIndex. We ended up fixing this by deleting segments 1094621529 and 1094831997 manually, and then removing the line for this partition from the cleaner-offset-checkpoint file (otherwise it would reference the non-existent segment 1094621529). This can happen even on a clean shutdown (the async deletes in replaceSegments might not happen). Cleaner logs post-startup: 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15. 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15... 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 segments in offset range [1094621529, 1095924157). 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete. 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones prior to Sun Apr 12 14:05:37 UTC 2015)... 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes. 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes. 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/.index. cleaned. at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) at kafka.log.LogSegment.append(LogSegment.scala:81) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427) at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358) at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:354) at
Re: Review Request 31369: Patch for KAFKA-1982
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/#review80555 --- Thanks for the patch. A few more minor comments. clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java https://reviews.apache.org/r/31369/#comment130586 Perhaps we can put b 0xFF in brackets to make the precedence clear? clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java https://reviews.apache.org/r/31369/#comment130579 Perhaps we can combine the two tests. We don't need to test null twice. Instead of deplicating the test code for positive and negative values, we can probably just to put the test code in a loop and iterate it twice. examples/src/main/java/kafka/examples/Producer.java https://reviews.apache.org/r/31369/#comment130583 record - message - Jun Rao On April 17, 2015, 10:09 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/ --- (Updated April 17, 2015, 10:09 p.m.) Review request for kafka. Bugs: KAFKA-1982 https://issues.apache.org/jira/browse/KAFKA-1982 Repository: kafka Description --- KAFKA-1982: change kafka.examples.Producer to use the new java producer Diffs - clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java f5cd61c1aa9433524da0b83826a766389de68a0b examples/README 53db6969b2e2d49e23ab13283b9146848e37434e examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 1239394190fe557e025fbd8f3803334402b0aeea examples/src/main/java/kafka/examples/Producer.java 96e98933148d07564c1b30ba8e805e2433c2adc8 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java 0d66fe5f8819194c8624aed4a21105733c20cc8e Diff: https://reviews.apache.org/r/31369/diff/ Testing --- Thanks, Ashish Singh
[jira] [Updated] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1982: --- Status: In Progress (was: Patch Available) change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch, KAFKA-1982_2015-04-17_15:08:54.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review80130 --- clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java https://reviews.apache.org/r/33049/#comment129902 quotaEnforcementBlackoutMs may be a clearer name. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala https://reviews.apache.org/r/33049/#comment129908 Is this necessary? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala https://reviews.apache.org/r/33049/#comment130453 It would help readability a bit if you can indent the RHS for each argument. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala https://reviews.apache.org/r/33049/#comment130576 Overall, I think this should work, but it seems slightly high touch no? i.e., do we really need to wrap everything? i.e., you definitely need a clientQuotaMetricsConfig, but it seems many of the wrapper routines here can be folded into the core metrics package. E.g., otherwise for every metric that we want to quota on, we are forced to add new record* methods; If I'm reading this right, a motivation for having the wrappers is to getOrCreate the sensors. Can we just pre-emptively (at the beginning) create the per-client sensors and then avoid the wrapper routines? This would also help avoid the need for the extra quota map and the synchronization logic in creating the sensors. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala https://reviews.apache.org/r/33049/#comment129915 It is somewhat weird for the record* methods to return the delay time - ideally we should just propagate the quota violation. core/src/main/scala/kafka/server/ClientQuotaMetrics.scala https://reviews.apache.org/r/33049/#comment130584 Can we make this private to kafka.server? You should be able to access it from the unit tests if you need core/src/main/scala/kafka/server/ClientQuotaMetrics.scala https://reviews.apache.org/r/33049/#comment130585 This too core/src/main/scala/kafka/server/ClientQuotaMetrics.scala https://reviews.apache.org/r/33049/#comment130472 Why do we need to create an additional default metricconfig here? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala https://reviews.apache.org/r/33049/#comment130473 and here? core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/33049/#comment130589 Should we name this more specifically? E.g., ...QuotaBytesPerSecondOverrides ? i.e., in future if we want to quota on other metrics such as messages/sec or requests/sec core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/33049/#comment130590 We should probably expose various metric configs - reporters, window size, etc. in the server config. - Joel Koshy On April 15, 2015, 5:36 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated April 15, 2015, 5:36 p.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description --- WIP: First patch for quotas. Changes are 1. Adding per-client throttle time and quota metrics in ClientQuotaMetrics.scala 2. Making changes in QuotaViolationException and Sensor to return delay time changes. 3. Added configuration needed so far for quotas in KafkaConfig. 4. Unit tests This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. Hopefully, this smaller patch is easier to review. Added more testcases Some locking changes for reading/creating the sensors Diffs - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 69b772c1941865fbe15b34bb2784c511f8ce519a core/src/main/scala/kafka/server/KafkaServer.scala
KIP hangout on Apr 21
Hi, We will have a KIP hangout at 3pm PST on Apr 21. The following is the tentative agenda. If you'd like to attend but haven't received an invite, please let me know. Agenda: KIP-4 (admin commands): wrap up any remaining issues KIP-11 (Authorization): KIP-12 (SSL/Kerberos): See if there is any blocker. jira backlog check Thanks, Jun
[jira] [Commented] (KAFKA-1884) Print metadata response errors
[ https://issues.apache.org/jira/browse/KAFKA-1884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14501112#comment-14501112 ] Manikumar Reddy commented on KAFKA-1884: [~guozhang] can you review this trivial patch? Print metadata response errors -- Key: KAFKA-1884 URL: https://issues.apache.org/jira/browse/KAFKA-1884 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Manikumar Reddy Assignee: Manikumar Reddy Fix For: 0.8.3 Attachments: KAFKA-1884.patch Print metadata response errors. producer logs: DEBUG [2015-01-20 12:46:13,406] NetworkClient: maybeUpdateMetadata(): Trying to send metadata request to node -1 DEBUG [2015-01-20 12:46:13,406] NetworkClient: maybeUpdateMetadata(): Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=50845,client_id=my-producer}, body={topics=[TOPIC=]})) to node -1 TRACE [2015-01-20 12:46:13,416] NetworkClient: handleMetadataResponse(): Ignoring empty metadata response with correlation id 50845. DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Trying to send metadata request to node -1 DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=50846,client_id=my-producer}, body={topics=[TOPIC=]})) to node -1 TRACE [2015-01-20 12:46:13,417] NetworkClient: handleMetadataResponse(): Ignoring empty metadata response with correlation id 50846. DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Trying to send metadata request to node -1 DEBUG [2015-01-20 12:46:13,418] NetworkClient: maybeUpdateMetadata(): Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=50847,client_id=my-producer}, body={topics=[TOPIC=]})) to node -1 TRACE [2015-01-20 12:46:13,418] NetworkClient: handleMetadataResponse(): Ignoring empty metadata response with correlation id 50847. Broker logs: [2015-01-20 12:46:14,074] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 51020; ClientId: my-producer; Topics: TOPIC= (kafka.server.KafkaApis) kafka.common.InvalidTopicException: topic name TOPIC= is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-' at kafka.common.Topic$.validate(Topic.scala:42) at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:186) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:177) at kafka.server.KafkaApis$$anonfun$5.apply(KafkaApis.scala:367) at kafka.server.KafkaApis$$anonfun$5.apply(KafkaApis.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:350) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:389) at kafka.server.KafkaApis.handle(KafkaApis.scala:57) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-11: ACL Management
I looked into the consumer offset storage and it seems like for acl storage we should not need something as complex. Consumer offset has different throughput requirements which is why I think it made sense to move away from zookeeper. Acls on the other hand seldom change and because of the caching layer will require very low read rate. Zookeeper seems like a perfect storage solution for small metadata like this. As far as mirror maker goes, given we are adding the storage APIs for acls (add/remove/get) it should be easy for mirror maker to use these APIs to read acls and add it back in the new cluster. Thanks Parth On 4/16/15, 6:13 PM, Jun Rao j...@confluent.io wrote: Hi, Gwen, What you suggested seems reasonable. I guess we will need the Principal, Privilege pair and the Resource in grant() and revoke()? Is the Hive authorization api the following? It's weird that it takes user in checkPermissions(), but not in authorize(). http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/ql/secu rity/authorization/StorageBasedAuthorizationProvider.html I was imagining that a default implementation could be similar to how we store offsets in Kafka. Basically, store all acls in a special topic with compact retention. Then, every broker will build an in-memory cache off that topic. Another thing that we haven't discussed so far is how to manage ACLs across different mirrored Kafka clusters. Let's say you use mirror maker to mirror all topics from cluster A to cluster B. You probably want to have exactly the same ACL on both A and B. It would be good if the ACL can be set up just once. If we use the above default implementation, since the ACL topic is mirrored too, the ACL will be propagated automatically. Thanks, Jun On Thu, Apr 16, 2015 at 9:44 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Authorization Fans, I'm starting a new thread on a specific sub-topic of KIP-11, since this is a bit long :) Currently KIP-11, as I understand it, proposes: * Authorizers are pluggable, with Kafka providing DefaultAuthorizer. * Kafka tools allow adding / managing ACLs. * Those ACLs are stored in ZK and cached in a new TopicCache * Authorizers can either use the ACLs defined and stored in Kafka, or define and use their own. I am concerned of two possible issues with this design: 1. Separation of concerns - only authorizers should worry about ACLs, and therefore the less code for ACLs that exist in Kafka core, the better. 2. User confusion - It sounded like we can define ACLs in Kafka itself but authorizers can also define their own, so kafka-topics --describe may show an ACL different than the one in use. This can be super confusing for admins. My alternative suggestion: * Authorizer API will include: grantPrivilege(ListPrincipals, ListPrivilege) revokePrivilege(ListPrincipals, ListPrivilege), getPrivilegesByPrincipal(Principal, Resource) (The exact API can be discussed in detail, but you get the idea) * Kafka tools will simply invoke these APIs when topics are added / modified / described. * Each authorizer (including the default one) will be responsible for storing, caching and using those ACLs. This way, we keep almost all ACL code with the Authorizer, where it belongs and users get a nice unified interface that reflects what is actually getting used in the system. This is pretty much how Sqoop and Hive implement their authorization APIs. What do you think? Gwen
[jira] [Updated] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-1982: -- Status: Patch Available (was: In Progress) change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch, KAFKA-1982_2015-04-17_15:08:54.patch, KAFKA-1982_2015-04-17_17:00:40.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14500922#comment-14500922 ] Ashish K Singh commented on KAFKA-1982: --- Updated reviewboard https://reviews.apache.org/r/31369/ against branch trunk change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch, KAFKA-1982_2015-04-17_15:08:54.patch, KAFKA-1982_2015-04-17_17:00:40.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1982) change kafka.examples.Producer to use the new java producer
[ https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish K Singh updated KAFKA-1982: -- Attachment: KAFKA-1982_2015-04-17_17:00:40.patch change kafka.examples.Producer to use the new java producer --- Key: KAFKA-1982 URL: https://issues.apache.org/jira/browse/KAFKA-1982 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.3 Reporter: Jun Rao Assignee: Ashish K Singh Labels: newbie Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch, KAFKA-1982_2015-04-17_14:49:34.patch, KAFKA-1982_2015-04-17_15:08:54.patch, KAFKA-1982_2015-04-17_17:00:40.patch We need to change the example to use the new java producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2119) ConsumerRecord key() and value() methods should not have throws Exception
[ https://issues.apache.org/jira/browse/KAFKA-2119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2119: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch. +1 and committed to trunk. ConsumerRecord key() and value() methods should not have throws Exception - Key: KAFKA-2119 URL: https://issues.apache.org/jira/browse/KAFKA-2119 Project: Kafka Issue Type: Improvement Components: clients, consumer Affects Versions: 0.8.2.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Trivial Fix For: 0.8.3 Attachments: KAFKA-2119.patch There were some leftover throws clauses in ConsumerRecord. It looks like the initial implementation removed errors from the ConsumerRecord but didn't clean up these clauses. Attaching a trivial patch to remove the clauses. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: Kafka-trunk #465
See https://builds.apache.org/job/Kafka-trunk/465/changes Changes: [junrao] kafka-2119; ConsumerRecord key() and value() methods should not have throws Exception; patched by Ewen Cheslack-Postava; reviewed by Jun Rao -- [...truncated 1249 lines...] kafka.api.ProducerFailureHandlingTest testWrongBrokerList PASSED kafka.api.ProducerFailureHandlingTest testNoResponse PASSED kafka.api.ProducerFailureHandlingTest testSendAfterClosed PASSED kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic PASSED kafka.api.ProducerFailureHandlingTest testNotEnoughReplicasAfterBrokerShutdown PASSED kafka.api.ConsumerTest testSimpleConsumption PASSED kafka.api.ConsumerTest testAutoOffsetReset PASSED kafka.api.ConsumerTest testSeek PASSED kafka.api.ConsumerTest testGroupConsumption PASSED kafka.api.ConsumerTest testPositionAndCommit PASSED kafka.api.ConsumerTest testPartitionsFor PASSED kafka.api.ConsumerTest testPartitionReassignmentCallback PASSED kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures PASSED kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:1000 but was:554 at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) kafka.api.RequestResponseSerializationTest testSerializationAndDeserialization PASSED kafka.api.ProducerBounceTest testBrokerFailure PASSED kafka.api.ApiUtilsTest testShortStringNonASCII PASSED kafka.api.ApiUtilsTest testShortStringASCII PASSED kafka.api.test.ProducerCompressionTest testCompression[0] PASSED kafka.api.test.ProducerCompressionTest testCompression[1] PASSED kafka.api.test.ProducerCompressionTest testCompression[2] PASSED kafka.api.test.ProducerCompressionTest testCompression[3] PASSED kafka.cluster.BrokerEndPointTest testSerDe PASSED kafka.cluster.BrokerEndPointTest testHashAndEquals PASSED kafka.cluster.BrokerEndPointTest testFromJSON PASSED kafka.cluster.BrokerEndPointTest testFromOldJSON PASSED kafka.cluster.BrokerEndPointTest testBrokerEndpointFromURI PASSED kafka.cluster.BrokerEndPointTest testEndpointFromURI PASSED kafka.javaapi.consumer.ZookeeperConsumerConnectorTest testBasic PASSED kafka.javaapi.message.ByteBufferMessageSetTest testWrittenEqualsRead PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistent PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytes PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistentWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytesWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEquals PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEqualsWithCompression PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionDisabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testCleanLeaderElectionDisabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionInvalidTopicOverride PASSED kafka.integration.FetcherTest testFetcher PASSED kafka.integration.RollingBounceTest testRollingBounce PASSED kafka.integration.PrimitiveApiTest testFetchRequestCanProperlySerialize PASSED kafka.integration.PrimitiveApiTest testEmptyFetchRequest PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetch PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetchWithCompression PASSED kafka.integration.PrimitiveApiTest testProduceAndMultiFetch PASSED kafka.integration.PrimitiveApiTest testMultiProduce PASSED kafka.integration.PrimitiveApiTest testConsumerEmptyTopic PASSED kafka.integration.PrimitiveApiTest testPipelinedProduceRequests PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooHigh PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooLow PASSED kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooHigh PASSED kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooLow PASSED kafka.integration.TopicMetadataTest testAutoCreateTopic PASSED kafka.integration.TopicMetadataTest testTopicMetadataRequest PASSED kafka.integration.TopicMetadataTest testBasicTopicMetadata PASSED kafka.integration.TopicMetadataTest
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Agreed we also need to change in the code of Sender.java to indicate that it resembles REPLICATION_TIMEOUT and not the request Timeout. Thanks, Mayuresh On Thu, Apr 16, 2015 at 1:08 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Guozhang, By implicit timeout for close() and flush(), I meant that currently we don’t have a explicit timeout for close() or flush() when a broker is down, so they can take pretty long up to TCP timeout which is hours as you mentioned. With the client side request timeout, the waiting time would be sort of bounded by request timeout. And I agree we’d better change the TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG to avoid confusion. Thanks. Jiangjie (Becket) Qin On 4/15/15, 10:38 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other
Re: Review Request 31369: Patch for KAFKA-1982
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/ --- (Updated April 18, 2015, midnight) Review request for kafka. Bugs: KAFKA-1982 https://issues.apache.org/jira/browse/KAFKA-1982 Repository: kafka Description --- KAFKA-1982: change kafka.examples.Producer to use the new java producer Diffs (updated) - clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java f5cd61c1aa9433524da0b83826a766389de68a0b examples/README 53db6969b2e2d49e23ab13283b9146848e37434e examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 1239394190fe557e025fbd8f3803334402b0aeea examples/src/main/java/kafka/examples/Producer.java 96e98933148d07564c1b30ba8e805e2433c2adc8 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java 0d66fe5f8819194c8624aed4a21105733c20cc8e Diff: https://reviews.apache.org/r/31369/diff/ Testing --- Thanks, Ashish Singh
Re: Review Request 31369: Patch for KAFKA-1982
On April 17, 2015, 11:05 p.m., Jun Rao wrote: Thanks for the patch. A few more minor comments. Jun thanks for the review again. Addressed your comments. - Ashish --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/#review80555 --- On April 18, 2015, midnight, Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31369/ --- (Updated April 18, 2015, midnight) Review request for kafka. Bugs: KAFKA-1982 https://issues.apache.org/jira/browse/KAFKA-1982 Repository: kafka Description --- KAFKA-1982: change kafka.examples.Producer to use the new java producer Diffs - clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java f5cd61c1aa9433524da0b83826a766389de68a0b examples/README 53db6969b2e2d49e23ab13283b9146848e37434e examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 1239394190fe557e025fbd8f3803334402b0aeea examples/src/main/java/kafka/examples/Producer.java 96e98933148d07564c1b30ba8e805e2433c2adc8 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java 0d66fe5f8819194c8624aed4a21105733c20cc8e Diff: https://reviews.apache.org/r/31369/diff/ Testing --- Thanks, Ashish Singh
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review80557 --- Took a quick look. Overall, looks good. A few comments below. clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java https://reviews.apache.org/r/33065/#comment130596 Do you know why the return is changed from int to long? core/src/main/scala/kafka/api/FetchResponse.scala https://reviews.apache.org/r/33065/#comment130594 I am wondering if we need both completed() and remaining() in Send. It seems that one of the two is enough for our usage. Also, not sure how useful reify() is. Currently, it's not used anywhere. core/src/main/scala/kafka/api/FetchResponse.scala https://reviews.apache.org/r/33065/#comment130592 This probably should be def reify() {}? core/src/main/scala/kafka/network/Transmission.scala https://reviews.apache.org/r/33065/#comment130609 Probably this can be moved to the client side? core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/33065/#comment130610 Are those needed since we are importing kafka.api._? - Jun Rao On April 10, 2015, 4:58 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated April 10, 2015, 4:58 a.m.) Review request for kafka. Bugs: KAFKA-1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Diffs - clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a09e470166a1c33639cf0cab26a3bce98ec clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java PRE-CREATION core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b3802ac221d570e7610458e50518b4499e7ed core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78adb760eaeb029466b54f335a29caf3b0f core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 5be393ab8272a49437b5057ed098ccdc42f352e5 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 2fad585f126699ba8d26c901a41bcf6b8198bf62 core/src/main/scala/kafka/api/OffsetCommitRequest.scala cf8e6acc426aef6eb19d862bf6a108a5fc37907a core/src/main/scala/kafka/api/OffsetFetchRequest.scala 67811a752a470bf9bdbc8c5419e8d6e20a006169 core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc7518ad76f9548772522751afb4d046b78 core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987c990fe561c01dac2909f5ed21a506e038 core/src/main/scala/kafka/api/TopicMetadataRequest.scala 7dca09ce637a40e125de05703dc42e8b611971ac core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 69f0397b187a737b4ddf50e390d3c2f418ce6b5d core/src/main/scala/kafka/client/ClientUtils.scala b66424b230463df6641a848b99bb73312ea66e33 core/src/main/scala/kafka/consumer/SimpleConsumer.scala cbef84ac76e62768981f74e71d451f2bda995275 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e250b94626c62b3b7f33ee4180ca3ab69a8821d6 core/src/main/scala/kafka/controller/ControllerChannelManager.scala 97acdb23f6e95554c3e0357aa112eddfc875efbc core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be14d494ae8c87f4443b52db69d273c20316 core/src/main/scala/kafka/network/BlockingChannel.scala 6e2a38eee8e568f9032f95c75fa5899e9715b433 core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala c0d77261353478232ab85591c182be57845b3f13 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b95b73b71252932867c60192b3d5b91efe99e122 core/src/main/scala/kafka/network/ByteBufferSend.scala af30042a4c713418ecd83b6c6c17dfcbdc101c62 core/src/main/scala/kafka/network/Handler.scala a0300336b8cb5a2d5be68b7b48bdbe045bf99324 core/src/main/scala/kafka/network/RequestChannel.scala 1d9c57b0b5a0ad31e4f3d7562f0266af83cc9024 core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION
[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network
[ https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1928: --- Status: In Progress (was: Patch Available) Move kafka.network over to using the network classes in org.apache.kafka.common.network --- Key: KAFKA-1928 URL: https://issues.apache.org/jira/browse/KAFKA-1928 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira Attachments: KAFKA-1928.patch As part of the common package we introduced a bunch of network related code and abstractions. We should look into replacing a lot of what is in kafka.network with this code. Duplicate classes include things like Receive, Send, etc. It is likely possible to also refactor the SocketServer to make use of Selector which should significantly simplify it's code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1690: --- Status: In Progress (was: Patch Available) [~sriharsha], your original patch in KAFKA-1684 only added the SSL support in the network code in scala. Rajini's patch added SSL in the network code in the client. Maybe it's possible for you to reuse some of the code. new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14501020#comment-14501020 ] Sriharsha Chintalapani commented on KAFKA-1690: --- [~junrao] I didn't send my patch earlier because we want to get server side addressed before we can address the clients (last comment of KAFKA-1477 ) . I am ok with re-using some of the code . Will send a new patch once KAFKA-1928 addressed. new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)