Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? 3) Can someone tell me what the use case to do acls based on the hosts? I can see some possibilities just wondering if we can concrete ones where one user is allowed from one host but not another. 4) I'm a bit unclear how the resource works in the Authorizer class. From what I see we have 2 resources - topics and cluster. If I want to add an acl to allow joe to CREATE for the cluster then I call addAcls with Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster? What if I want to call addAcls for DESCRIBE on a topic? Is the resource then topic or is it the topic name? 5) reassigning partitions is a CLUSTER_ACTION or superuser? Its not totally clear to me the differences between these. what about increasing # of partitions? 6) groups are mentioned, are we supporting right away or is that a follow on item? (is there going to be a kafka.supergroups) 7) Are there config options for setting acls when I create my topic? Or do I have to create my topic and then run the kafka-acl.sh script to set them? Although its very small, there would be possible race there that someone could start producing to topic before acls are set. 8) are there configs for cluster level acl defaults? Or does it default to superusers on bringing up new cluster and you have to modify with cli. thanks,Tom On Tuesday, April 21, 2015 7:10 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: I have added the notes to KIP-11 Open question sections. Thanks Parth On 4/21/15, 4:49 PM, Gwen Shapira gshap...@cloudera.com wrote: Adding my notes from today's call to the thread: ** Deny or Allow all by default? We will add a configuration to control this. The configuration will default to “allow” for backward compatibility. Security admins can set it to deny ** Storing ACLs for default authorizers: We'll store them in ZK. We'll support pointing the authorizer to any ZK. The use of ZK will be internal to the default authorizer. Authorizer reads ACLs from cache every hour. We proposed having mechanism (possibly via new ZK node) to tell broker to refresh the cache immediately. ** Support deny as permission type - we agreed to keep this. ** Mapping operations to API: We may need to add Group as a resource, with JoinGroup and OffsetCommit require privilege on the consumer group. This can be something we pass now and authorizers can support in future. - Jay will write specifics to the mailing list discussion. On Tue, Apr 21, 2015 at 4:32 PM, Jay Kreps jay.kr...@gmail.com wrote: Following up on the KIP discussion. Two options for authorizing consumers to read topic t as part of group g: 1. READ permission on resource /topic/t 2. READ permission on resource /topic/t AND WRITE permission on /group/g The advantage of (1) is that it is simpler. The disadvantage is that any member of any group that reads from t can commit offsets as any other member of a different group. This doesn't effect data security (who can access what) but it is a bit of a management issue--a malicious person can cause data loss or duplicates for another consumer by committing offset. I think I favor (2) but it's worth it to think it through. -Jay On Tue, Apr 21, 2015 at 2:43 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Hey Jun, Yes and we support wild cards for all acl entities principal, hosts and operation. Thanks Parth On 4/21/15, 9:06 AM, Jun Rao j...@confluent.io wrote: Harsha, Parth, Thanks for the clarification. This makes sense. Perhaps we can clarify the meaning of those rules in the wiki. Related to this, it seems that we need to support wildcard in cli/request protocol for topics? Jun On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just
Re: [DISCUSS] KIP-11- Authorization design for kafka security
FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALID wrote: Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER Done. 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? There is an overloaded version of removeAcls in the interface that takes in resource as the only input and as described in the javadoc all the acls attached to that resource will be deleted. To cover the setAcl use case the caller can first call remove and then add. 3) Can someone tell me what the use case to do acls based on the hosts? I can see some possibilities just wondering if we can concrete ones where one user is allowed from one host but not another. I am not sure if I understand the question given the use case you described in your question is what we are trying to cover with use of hosts in Acl. There are some additional use cases like “allow access to any user from host1,host2” but I think primarily it gives the admins the ability to define acls at a more granular level. 4) I'm a bit unclear how the resource works in the Authorizer class. From what I see we have 2 resources - topics and cluster. If I want to add an acl to allow joe to CREATE for the cluster then I call addAcls with Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster? What if I want to call addAcls for DESCRIBE on a topic? Is the resource then topic or is it the topic name? We now have 3 resources(added group), please see the updated doc. The CREATE acl that you described is correct. For any topic operation you should use topic name as the resource name and for group the user will provide groupId as resource name. 5) reassigning partitions is a CLUSTER_ACTION or superuser? Its not totally clear to me the differences between these. what about increasing # of partitions? I see this as an alter topic operation so it is at topic level and the user must have alter permissions on topic. 6) groups are mentioned, are we supporting right away or is that a follow on item? (is there going to be a kafka.supergroups) I think it can be a separate jira just for braking down the code review in smaller chunk. We will support it in first version but I think if we can not do it for any reason that should not block a release with all the other authZ work. We made deliberate design choices (like introducing a principalType in KafkaPrinciapl) to allow supporting groups as an incremental change. 7) Are there config options for setting acls when I create my topic? Or do I have to create my topic and then run the kafka-acl.sh script to set them? Although its very small, there would be possible race there that someone could start producing to topic before acls are set. We discussed this yesterday and we agreed to go with kafka-acl.sh. Yes there is a very very small window of vulnerability but I think that really does not warrant to change the decision in this case. 8) are there configs for cluster level acl defaults? Or does it default to superusers on bringing up new cluster and you have to modify with cli. thanks,Tom No defaults, the default is superusers will have full access. I don’t think making assumptions about ones security requirement should be our burden. On Tuesday, April 21, 2015 7:10 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: I have added the notes to KIP-11 Open question sections. Thanks Parth On 4/21/15, 4:49 PM, Gwen Shapira gshap...@cloudera.com wrote: Adding my notes from today's call to the thread: ** Deny or Allow all by default? We will add a configuration to control this. The configuration will default to “allow” for backward compatibility. Security admins can set it to deny ** Storing ACLs for default authorizers: We'll store them in ZK. We'll support pointing the authorizer to any ZK. The use of ZK will be internal to the default authorizer. Authorizer reads ACLs from cache every hour. We proposed having mechanism (possibly via new ZK node) to tell broker to refresh the cache immediately. ** Support deny as permission type - we agreed to keep this. ** Mapping operations to API: We may need to add Group as a resource, with JoinGroup and OffsetCommit require privilege
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi Rajini, Thanks for the details. I did go through your code . There was a discussion before about not having selector related code into the channel or extending the selector it self. 1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. This makes sense I can change code to not do it on the network thread. 2. *Renegotiation handshake*: During a read operation, handshake status may indicate that renegotiation is required. It will be good to encapsulate this state change (and any knowledge of these SSL-specific state transitions) within SSLChannel. Our experience was that managing keys and state within the SSLChannel rather than in Selector made this code neater. Do we even want to support renegotiation. This is a case where user/client handshakes with server anonymously but later want to change and present their identity and establish a new SSL session. In our producer or consumers either present their identity ( two -way auth) or not. Since these are long running processes I don’t see that there might be a case where they initially establish the session and later present their identity. *Graceful shutdown of the SSL connection*s: Our experience was that we could encapsulate all of the logic for shutting down SSLEngine gracefully within SSLChannel when the selection key and state are owned and managed by SSLChannel. Can’t this be done when channel.close() is called any reason to own the selection key. 4. *And finally a minor point:* We found that by managing selection key and selection interests within SSLChannel, protocol-independent Selector didn't need the concept of handshake at all and all channel state management and handshake related code could be held in protocol-specific classes. This may be worth taking into consideration since it makes it easier for common network layer code to be maintained without any understanding of the details of individual security protocols. The only thing network code( SocketServer) is aware of channel isHandshakeComplete if its not do the handshake or go about read/write from channel. Yes socketServer need to be aware of channel is ready to read or not. But on the other hand there isn’t too many details of handshake leaked into socketServer. Either we let server know that a channel needs handshake or we keep the selectionKey state into channel which means we are adding selector related code into channel. Thanks, Harsha On April 22, 2015 at 3:56:04 AM, Rajini Sivaram (rajinisiva...@googlemail.com) wrote: When we were working on the client-side SSL implementation for Kafka, we found that returning selection interest from handshake() method wasn't sufficient to handle some of the SSL sequences. We resorted to managing the selection key and interest state within SSLChannel to avoid SSL-specific knowledge escaping out of SSL classes into protocol-independent network code. The current server-side SSL patch doesn't address these scenarios yet, but we may want to take these into account while designing the common Channel class/interface. 1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. 2. *Renegotiation handshake*: During a read operation, handshake status may indicate that renegotiation is required. It will be good to encapsulate this state change (and any knowledge of these SSL-specific state transitions) within SSLChannel. Our experience was that managing keys and state within the SSLChannel rather than in Selector made this code neater. 3. *Graceful shutdown of the SSL connection*s: Our experience was that we could encapsulate all of the logic for shutting down SSLEngine gracefully within SSLChannel when the selection key and state are owned and managed by SSLChannel. 4. *And finally a minor point:* We found that by managing selection key and selection interests within SSLChannel, protocol-independent Selector didn't need the concept of handshake at all and all channel state management and handshake related code could be held in protocol-specific classes. This may be worth taking into consideration since it makes it easier for common network layer code to be maintained without any
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
As said above, I spent some time thinking about AlterTopicRequest semantics and batching. Firstly, about AlterTopicRequest. Our goal here is to see whether we can suggest some simple semantics and at the same time let users change different things in one instruction (hereinafter instruction - is one of the entries in batch request). We can resolve arguments according to this schema: 1) If ReplicaAsignment is specified: it's a reassign partitions request 2) If either Partitions or ReplicationFactor is specified: a) If Partitions specified - this is increase partitions case b) If ReplicationFactor is specified - this means we need to automatically regenerate replica assignment and treat it as reassign partitions request Note: this algorithm is a bit inconsistent with the CreateTopicRequest - with ReplicaAssignment specified there user can implicitly define Partitions and ReplicationFactor, in AlterTopicRequest those are completely different things, i.e. you can't include new partitions to the ReplicaAssignment to implicitly ask controller to increase partitions - controller will simply return InvalidReplicaAssignment, because you included unknown partitions. Secondly, multiple instructions for one topic in batch request. I have a feeling it becomes a really big mess now, so suggestions are highly appreciated here! Our goal is to consider whether we can let users add multiple instructions for one topic in one batch but at the same time make it transparent enough so we can support blocking on request completion, for that we need to analyze from the request what is the final expected state of the topic. And the latter one seems to me a tough issue. Consider the following AlterTopicRequest: [1) topic1: change ReplicationFactor from 2 to 3, 2) topic1: change ReplicaAssignment (taking into account RF is 3 now), 3) topic2: change ReplicaAssignment (just to include multiple topics) 4) topic1: change ReplicationFactor from 3 to 1, 5) topic1: change ReplicaAssignment again (taking into account RF is 1 now) ] As we discussed earlier, controller will handle it as alter-topic command and reassign-partitions. First of all, it will scan all ReplicaAssignment and assembly those to one json to create admin path /reassign_partitions once needed. Now, user would expect we execute instruction sequentially, but we can't do it because only one reassign-partitions procedure can be in progress - when should we trigger reassign-partition - after 1) or after 4)? And what about topic2 - we will break the order, but it was supposed we execute instructions sequentially. Overall, the logic seems to be very sophisticated, which is a bad sign. Conceptually, I think the root problem is that we imply there is an order in sequential instructions, but instructions themselves are asynchronous, so really you can't guarantee any order. I'm thinking about such solutions now: 1) Prohibit multiple instructions (this seems reasonable if we let users change multiple things in scope of now instructions - see the first item) 2) Break apart again AlterTopic and ReassignPartitions - if the reassignment case is the only problem here, which I'm not sure about. Thoughts? Thanks, Andrii Biletskyi On Wed, Apr 22, 2015 at 2:59 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thank you for your time. A short summary of our discussion. Answering previous items: 1. 2. I will double check existing error codes to align the list of errors that needs to be added. 3. We agreed to think again about the batch requests semantics. The main concern is that users would expect we allow executing multiple instructions for one topic in one batch. I will start implementation and check whether there are any impediments to handle it this way. The same for AlterTopicRequest - I will try to make request semantics as easy as possible and allow users change different things at one time - e.g. change nr of partitions and replicas in one instruction. 4. We agreed not to add to TMR lag information. 5. We discussed preferred replica command and it was pointed out that generally users shouldn't call this command manually now since this is automatically handled by the cluster. If there are no objections (especially from devops people) I will remove respective request. 6. As discussed AdminClient API is a phase 2 and will go after Wire Protocol extensions. It will be finalized as java-doc after I complete patch for phase 1 - Wire Protocol + server-side code handling requests. Thanks, Andrii Biletskyi On Wed, Apr 22, 2015 at 12:36 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Andrii, thanks for all the hard work on this, it has come a long way. A couple questions and comments on this. For the errors, can we do the following: 1. Remove IllegalArgument from the name, we haven't used that convention for other errors. 2. Normalize this list with the existing errors. For example, elsewhere when you give an invalid
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. This makes sense I can change code to not do it on the network thread. Right now we are doing the handshake as part of the processor ( it shouldn’t be in acceptor) and we have multiple processors thread. Do we still see this as an issue if it happens on the same thread as processor? . -- Harsha Sent with Airmail On April 22, 2015 at 7:18:17 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) wrote: Hi Rajini, Thanks for the details. I did go through your code . There was a discussion before about not having selector related code into the channel or extending the selector it self. 1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. This makes sense I can change code to not do it on the network thread. 2. *Renegotiation handshake*: During a read operation, handshake status may indicate that renegotiation is required. It will be good to encapsulate this state change (and any knowledge of these SSL-specific state transitions) within SSLChannel. Our experience was that managing keys and state within the SSLChannel rather than in Selector made this code neater. Do we even want to support renegotiation. This is a case where user/client handshakes with server anonymously but later want to change and present their identity and establish a new SSL session. In our producer or consumers either present their identity ( two -way auth) or not. Since these are long running processes I don’t see that there might be a case where they initially establish the session and later present their identity. *Graceful shutdown of the SSL connection*s: Our experience was that we could encapsulate all of the logic for shutting down SSLEngine gracefully within SSLChannel when the selection key and state are owned and managed by SSLChannel. Can’t this be done when channel.close() is called any reason to own the selection key. 4. *And finally a minor point:* We found that by managing selection key and selection interests within SSLChannel, protocol-independent Selector didn't need the concept of handshake at all and all channel state management and handshake related code could be held in protocol-specific classes. This may be worth taking into consideration since it makes it easier for common network layer code to be maintained without any understanding of the details of individual security protocols. The only thing network code( SocketServer) is aware of channel isHandshakeComplete if its not do the handshake or go about read/write from channel. Yes socketServer need to be aware of channel is ready to read or not. But on the other hand there isn’t too many details of handshake leaked into socketServer. Either we let server know that a channel needs handshake or we keep the selectionKey state into channel which means we are adding selector related code into channel. Thanks, Harsha On April 22, 2015 at 3:56:04 AM, Rajini Sivaram (rajinisiva...@googlemail.com) wrote: When we were working on the client-side SSL implementation for Kafka, we found that returning selection interest from handshake() method wasn't sufficient to handle some of the SSL sequences. We resorted to managing the selection key and interest state within SSLChannel to avoid SSL-specific knowledge escaping out of SSL classes into protocol-independent network code. The current server-side SSL patch doesn't address these scenarios yet, but we may want to take these into account while designing the common Channel class/interface. 1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. 2. *Renegotiation handshake*: During a read operation, handshake status may indicate that renegotiation is required. It will be good to encapsulate this state change (and any knowledge of these SSL-specific state transitions) within
[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506489#comment-14506489 ] Parth Brahmbhatt commented on KAFKA-1688: - Created reviewboard https://reviews.apache.org/r/33431/diff/ against branch origin/trunk Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Parth Brahmbhatt Fix For: 0.8.3 Attachments: KAFKA-1688.patch, KAFKA-1688_2015-04-10_11:08:39.patch Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-1688: Attachment: KAFKA-1688.patch Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Parth Brahmbhatt Fix For: 0.8.3 Attachments: KAFKA-1688.patch, KAFKA-1688_2015-04-10_11:08:39.patch Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 33431: Patch for KAFKA-1688
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33431/ --- Review request for kafka. Bugs: KAFKA-1688 https://issues.apache.org/jira/browse/KAFKA-1688 Repository: kafka Description --- KAFKA-1688: initial check in. Merge remote-tracking branch 'origin/trunk' into trunk Merge remote-tracking branch 'origin/trunk' into 2032 Merge branch '2032' into trunk KAFKA-1688: Add authorization. Merge remote-tracking branch 'origin/trunk' into trunk Conflicts: core/src/main/scala/kafka/admin/AdminUtils.scala core/src/main/scala/kafka/admin/TopicCommand.scala core/src/main/scala/kafka/network/RequestChannel.scala core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaServer.scala core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala Fixing some merge errors. Merge remote-tracking branch 'origin/trunk' into trunk Conflicts: core/src/main/scala/kafka/common/ErrorMapping.scala Converted some code to idomatic scala. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk Removing some unintended changes. Reverting the topic config related changes public classes and interfaces to support pluggable authorizer implementation for kafka Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk all public entities for pluggable authorizer support in kafka. Diffs - core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.java PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala b4004aa3a1456d337199aa1245fb0ae61f6add46 core/src/main/scala/kafka/server/KafkaConfig.scala cfbbd2be550947dd2b3c8c2cab981fa08fb6d859 core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 62d183248e3be4c83d2c768e762f61f92448c6a6 Diff: https://reviews.apache.org/r/33431/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506527#comment-14506527 ] Dmitry Bugaychenko commented on KAFKA-2029: --- The problem is that the state is distributed - not only controller, but each broker has it considering itself either as leader or as follower. When doing a transition we need to make sure all parties completed it before switching to next partition. Right now transition is implemented via asynchronous messages to brokers without waiting for replies - in this case event single threaded controller might easiliy bring system into incosistent sate (having multiple brokers treating themselves as leades for same partition) Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Attachments: KAFKA-2029.patch, KAFKA-2029.patch Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to
[jira] [Issue Comment Deleted] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Bugaychenko updated KAFKA-2029: -- Comment: was deleted (was: The problem is that the state is distributed - not only controller, but each broker has it considering itself either as leader or as follower. When doing a transition we need to make sure all parties completed it before switching to next partition. Right now transition is implemented via asynchronous messages to brokers without waiting for replies - in this case event single threaded controller might easiliy bring system into incosistent sate (having multiple brokers treating themselves as leades for same partition)) Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Attachments: KAFKA-2029.patch, KAFKA-2029.patch Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance
[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506528#comment-14506528 ] Dmitry Bugaychenko commented on KAFKA-2029: --- The problem is that the state is distributed - not only controller, but each broker has it considering itself either as leader or as follower. When doing a transition we need to make sure all parties completed it before switching to next partition. Right now transition is implemented via asynchronous messages to brokers without waiting for replies - in this case event single threaded controller might easiliy bring system into incosistent sate (having multiple brokers treating themselves as leades for same partition) Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Attachments: KAFKA-2029.patch, KAFKA-2029.patch Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507518#comment-14507518 ] Ewen Cheslack-Postava commented on KAFKA-2121: -- [~stevenz3wu] Yes, it'll be released with 0.8.3. prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Fix For: 0.8.3 Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources.
Jenkins build is back to normal : KafkaPreCommit #83
See https://builds.apache.org/job/KafkaPreCommit/83/changes
[jira] [Commented] (KAFKA-2139) Add a separate controller messge queue with higher priority on broker side
[ https://issues.apache.org/jira/browse/KAFKA-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507480#comment-14507480 ] Jiangjie Qin commented on KAFKA-2139: - Sure, I will be glad to write a wiki and put the stuff I'm thinking in it. I can do it this weekend. It will be a significant change and definitely needs a lot of discussions. In term of this proposal, the message from the same TCP connection will still be processed in order (Originally I was proposing to add controller message to the queue head, that won't work). Current proposal is to have a separate queue for controller requests so the requests from controller to broker connection are still in order. I like the idea to prioritize messages at network layer and probably we can do that. Add a separate controller messge queue with higher priority on broker side --- Key: KAFKA-2139 URL: https://issues.apache.org/jira/browse/KAFKA-2139 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin This ticket is supposed to be working together with KAFKA-2029. There are two issues with current controller to broker messages. 1. On the controller side the message are sent without synchronization. 2. On broker side the controller messages share the same queue as client messages. The problem here is that brokers process the controller messages for the same partition at different times and the variation could be big. This causes unnecessary data loss and prolong the preferred leader election / controlled shutdown/ partition reassignment, etc. KAFKA-2029 was trying to add a boundary between messages for different partitions. For example, before leader migration for previous partition finishes, the leader migration for next partition won't begin. This ticket is trying to let broker process controller messages faster. So the idea is have separate queue to hold controller messages, if there are controller messages, KafkaApi thread will first take care of those messages, otherwise it will proceed messages from clients. Those two tickets are not ultimate solution to current controller problems, but just mitigate them with minor code changes. Moving forward, we still need to think about rewriting controller in a cleaner way. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507509#comment-14507509 ] Steven Zhen Wu commented on KAFKA-2121: --- [~guozhang] thanks. would it go in next 0.8.3 release? prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is?kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I don't have to run this a bunch of times for each. Do you want to have a --host option for --list so that admins could see what acls apply to specific host(s)? Tom On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALID wrote: Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER Done. 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? There is an overloaded version of removeAcls in the interface that takes in resource as the only input and as described in the javadoc all the acls attached to that resource will be deleted. To cover the setAcl use case the caller can first call remove and then add. 3) Can someone tell me what the use case to do acls based on the hosts? I can see some possibilities just wondering if we can concrete ones where one user is allowed from one host but not another. I am not sure if I understand the question given the use case you described in your question is what we are trying to cover with use of hosts in Acl. There are some additional use cases like “allow access to any user from host1,host2” but I think primarily it gives the admins the ability to define acls at a more granular level. 4) I'm a bit unclear how the resource works in the Authorizer class. From what I see we have 2 resources - topics and cluster. If I want to add an acl to allow joe to CREATE for the cluster then I call addAcls with Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster? What if I want to call addAcls for DESCRIBE on a topic? Is the resource then topic or is it the topic name? We now have 3 resources(added group), please see the updated doc. The CREATE acl that you described is correct. For any topic operation you should use topic name as the resource name and for group the user will provide groupId as resource name. 5) reassigning partitions is a CLUSTER_ACTION or superuser? Its not totally clear to me the differences between these. what about increasing # of partitions? I see this as an alter topic operation so it is at topic level and the user must have alter permissions on topic. 6) groups are mentioned, are we supporting right away or is that a follow on item? (is there going to be a kafka.supergroups) I think it can be a separate jira just for braking down the code review in smaller chunk. We will support it in first version but I think if we can not do it for any reason that should not block a release with all the other authZ work. We made deliberate design choices (like introducing a principalType in KafkaPrinciapl) to allow supporting groups as an incremental change. 7) Are there config options for setting acls when I create my topic? Or do I have to create my topic and then run the kafka-acl.sh script to set them? Although its very small, there would be possible race there that someone could start producing to topic before acls are set. We discussed this yesterday and we agreed to go with kafka-acl.sh. Yes there is a very very small window of vulnerability but I think that really does not warrant to change the decision in this case. 8) are there configs for cluster level acl defaults? Or does it default to superusers on bringing up new cluster and you have to modify with cli. thanks,Tom No defaults, the default is superusers will have full access. I don’t think making assumptions about ones security requirement should be our burden. On Tuesday, April 21, 2015 7:10 PM, Parth Brahmbhatt
[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507450#comment-14507450 ] Guozhang Wang commented on KAFKA-2121: -- Thanks for the patch Steven, committed to trunk. prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Jun Rao Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2121: - Resolution: Fixed Assignee: Steven Zhen Wu (was: Jun Rao) Status: Resolved (was: Patch Available) prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter
[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507513#comment-14507513 ] Gwen Shapira commented on KAFKA-1688: - I agree this will be easier to review with subtasks. Maybe take KAFKA-1688 out of KAFKA-1682, open these as subtasks and link to KAFKA-1682 as related? Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Parth Brahmbhatt Fix For: 0.8.3 Attachments: KAFKA-1688.patch, KAFKA-1688_2015-04-10_11:08:39.patch Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-2121: - Fix Version/s: 0.8.3 prevent potential resource leak in KafkaProducer and KafkaConsumer -- Key: KAFKA-2121 URL: https://issues.apache.org/jira/browse/KAFKA-2121 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: Steven Zhen Wu Assignee: Steven Zhen Wu Fix For: 0.8.3 Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, KAFKA-2121_2015-04-20_22:48:31.patch On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Please see all the available options here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it covers both hosts and operations and allows to specify a list for both. Thanks Parth From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Date: Wednesday, April 22, 2015 at 11:02 AM To: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com, dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is? kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I don't have to run this a bunch of times for each. Do you want to have a --host option for --list so that admins could see what acls apply to specific host(s)? Tom On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID wrote: Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER Done. 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? There is an overloaded version of removeAcls in the interface that takes in resource as the only input and as described in the javadoc all the acls attached to that resource will be deleted. To cover the setAcl use case the caller can first call remove and then add. 3) Can someone tell me what the use case to do acls based on the hosts? I can see some possibilities just wondering if we can concrete ones where one user is allowed from one host but not another. I am not sure if I understand the question given the use case you described in your question is what we are trying to cover with use of hosts in Acl. There are some additional use cases like “allow access to any user from host1,host2” but I think primarily it gives the admins the ability to define acls at a more granular level. 4) I'm a bit unclear how the resource works in the Authorizer class. From what I see we have 2 resources - topics and cluster. If I want to add an acl to allow joe to CREATE for the cluster then I call addAcls with Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster? What if I want to call addAcls for DESCRIBE on a topic? Is the resource then topic or is it the topic name? We now have 3 resources(added group), please see the updated doc. The CREATE acl that you described is correct. For any topic operation you should use topic name as the resource name and for group the user will provide groupId as resource name. 5) reassigning partitions is a CLUSTER_ACTION or superuser? Its not totally clear to me the differences between these. what about increasing # of partitions? I see this as an alter topic operation so it is at topic level and the user must have alter permissions on topic. 6) groups are mentioned, are we supporting right away or is that a follow on item? (is there going to be a kafka.supergroups) I think it can be a separate jira just for braking down the code review in smaller chunk. We will support it in first version but I think if we can not do it for any reason that should not block a release with all the other authZ work. We made deliberate design choices (like introducing a principalType in KafkaPrinciapl) to allow supporting groups as an incremental change. 7) Are there config options for setting acls when I create my topic? Or do I have
Re: [DISCUSS] New consumer offset commit API
Hi Ewen, I share the same concern you have about 2), that with the new API sync commit implementation is a bit awkward since we have a single-threaded design in new consumer. The reason that we need to mute other nodes for doing coordinator sync operations like join-group / offset commits / offset fetches is to avoid long blocking due to possible starvation on network selector, so I think they need to be done still. On the other hand, I think users using the commit API will usually fall into three categories: 1) I really care that the offsets to be committed before moving on to fetch more data, so I will wait FOREVER for it to complete. 2) I do not really care about whether it succeeds or not, so just fire commit and let's move on; if it fails it fails (and it will be logged). 3) I care if it succeeds or not, but I do not want to wait indefinitely; so let me know if it does not finish within some timeout or failed (i.e. give me the exceptions / error codes) and I will handle it. The current APIs does not handle case 3) above, which sits between BLOCK FOREVER and DO NOT CARE AT ALL, but most times people would not be very explicit about the exact timeout, but just knowing it is definite and reasonably short is good enough. I think for this we probably do not need an extra timeout / retry settings, but rely on the general request retry settings; similarly we probably do not need cancel. So I wonder if we can do a slightly different modification to API like this: void commit(MapTopicPartition, Long offsets, CommitType type, ConsumerCommitCallback callback); For case 1) people call commit(offsets) which will block forever until it succeeds; For case 2) people call commit(offsets, async) which will return immediately, with not callback upon finishes; For case 3) people call commit(offsets, async, callback), and the callback will be executed when it finishes or #.request retries has exhausted. This API will make much smaller changes to the current implementations as well. Of course if we have a common scenario where users would really care about the exact timeout for async commits, then Future may be a good approach. Guozhang On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, This makes sense. People usually do not want to stop consuming when committing offsets. One corner case about async commit with retries I am thinking is that it is possible that two offset commits interleave with each other and that might create problem. Like you said maybe we can cancel the previous one. Another thing is that whether the future mechanism will only be applied to auto commit or it will also be used in manual commit? Because in new consumer we allow user to provide an offset map for offset commit. Simply canceling a previous pending offset commit does not seem to be ideal in this case because the two commits could be for different partitions. Thanks. Jiangjie (Becket) Qin On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I'd like to get some feedback on changing the offset commit API in the new consumer. Since this is user-facing API I wanted to make sure this gets better visibility than the JIRA ( https://issues.apache.org/jira/browse/KAFKA-2123) might. The motivation is to make it possible to do async commits but be able to tell when the commit completes/fails. I'm suggesting changing the API from void commit(Map offsets, CommitType) to FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); which matches the approach used for the producer. The ConsumerCommitCallback only has one method: public void onCompletion(Exception exception); This enables a few different use cases: * Blocking commit via Future.get(), and blocking with timeouts via Future.get(long, TimeUnit) * See exceptions via the future (see discussion of retries below) * Callback-based notification so you can keep processing messages and only take action if something goes wrong, takes too long, etc. This is the use case that motivated * Fire and forget commits via a shorthand commit() API and ignoring the resulting future. One big difference between this and the producer API is that there isn't any result (except maybe an exception) from commitOffsets. This leads to the somewhat awkward FutureVoid signature. I personally prefer that to the sync/async flag, especially since it also provides a non-blocking interface for checking whether the commit is complete. I posted a WIP patch to the JIRA. In the progress of making it I found a few issues that might be worth discussing: 1. Retries. In the old approach, this was trivial since it only applied to synchronous calls, so we could just loop until the request was successful. Do we want to start introducing a retries mechanism here, and should it apply to all types of requests or are we going to end up with a couple of different retry settings
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Sorry I must be missing it, that is the doc I'm looking at. I don't see how you specify the operation? I don't see a normal --host that goes with list. I see revokehost and granthost, neither I would think apply to list since --principal. says it applies to --list. Copying text here in case I'm looking at the wrong thing: - kafka-acl.sh --help A tool to manage acls for kafka cluster and topics. Following are the options --topic topicname name of topic whose acls you want to add/remove. --cluster this will add/edit cluster level acls where you can control which users can create topics or list all topics for the cluster or send control messages to brokers. --add indicates you are trying to add Acl --remove indicates you are trying to remove acl --grantprincipal principalType: principalName a comma separated list of principalType: principalName where currently supported principalType is user. When this option is not present but --granthost is specified it will default to * : * which is wild card that matches all types and all users. --granthost host1,host2 a comma separated list of hosts from which the --grantpricipal is allowed the actions. When this option is not present but --grantprincipal is specified, it defaults to * which is wildcard matching all hosts. --revokeprincipal principalType: principalName a comma separated list of principalType: principalName where currently supported principalType is user. When this option is not present but --revokehost is specified , it defaults to * : * which is wild card that matches all types and all users. Revoke should only be used to limit the access added by some other grants. A revoke without an accompanying grant is meaningless as by default only principals in grant lists are allowed access based on their acls and everyone else is denied. revokehost host1,host2 a comma separated list of hosts from which the --revokeprincipal will be denied actions. When this option is not present but --revokeprincipal is specified, it defaults to * which is wildcard matching all hosts. --removeAll removes all acls for a topic, only works with topic and not with cluster. --list list all acls --principal principalType: principalName Used only with --list to list all acls for a principal On Wednesday, April 22, 2015 1:08 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Please see all the available options here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it covers both hosts and operations and allows to specify a list for both. ThanksParth From: Tom Graves tgraves...@yahoo.com Reply-To: Tom Graves tgraves...@yahoo.com Date: Wednesday, April 22, 2015 at 11:02 AM To: Parth Brahmbhatt pbrahmbh...@hortonworks.com, dev@kafka.apache.org dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is?kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I don't have to run this a bunch of times for each. Do you want to have a --host option for --list so that admins could see what acls apply to specific host(s)? Tom On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALID wrote: Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER Done. 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? There is an overloaded version of removeAcls in the interface that takes in resource as the only
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Sorry I missed your last questions. I am +0 on adding ―host option for ―list, we could add it for symmetry. Again if this is only a CLI change it can be added later if you mean adding this in authorizer interface then we should make a decision now. Given a choice I would like to actually keep only one option which is resource based get (remove even the get based on principal). I see those (getAcl for principal or host) as special filtering case which can easily be achieved by a third party tool by doing list all topics and calling getAcls for each topic and applying filtering logic on that. I really don’t see the need to make those first class citizens of the authorizer interface given these kind of queries will be issued outside of broker JVM so they will not benefit from the caching and because the storage will be indexed on resource both these options even as a first class API will just scan all topic acls and apply filtering logic. Thanks Parth On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Please see all the available options here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it covers both hosts and operations and allows to specify a list for both. Thanks Parth From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Date: Wednesday, April 22, 2015 at 11:02 AM To: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com, dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is? kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I don't have to run this a bunch of times for each. Do you want to have a --host option for --list so that admins could see what acls apply to specific host(s)? Tom On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID wrote: Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER Done. 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? There is an overloaded version of removeAcls in the interface that takes in resource as the only input and as described in the javadoc all the acls attached to that resource will be deleted. To cover the setAcl use case the caller can first call remove and then add. 3) Can someone tell me what the use case to do acls based on the hosts? I can see some possibilities just wondering if we can concrete ones where one user is allowed from one host but not another. I am not sure if I understand the question given the use case you described in your question is what we are trying to cover with use of hosts in Acl. There are some additional use cases like “allow access to any user from host1,host2” but I think primarily it gives the admins the ability to define acls at a more granular level. 4) I'm a bit unclear how the resource works in the Authorizer class. From what I see we have 2 resources - topics and cluster. If I want to add an acl to allow joe to CREATE for the cluster then I call addAcls with Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster? What if I want to call addAcls for DESCRIBE on a topic? Is the resource then topic or is it the topic name? We now have 3 resources(added group), please see the updated doc. The CREATE acl that you
Re: [DISCUSS] New consumer offset commit API
I second Guozhang's proposal. I do think we need the callback. The current state is that for async commits you actually don't know if it succeeded. However there is a totally valid case where you do need to know if it succeeded but don't need to block, and without the callback you are stuck. I think the futures will likely cause problems since blocking on the future precludes polling which would allow it to complete. -Jay On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Ewen, I share the same concern you have about 2), that with the new API sync commit implementation is a bit awkward since we have a single-threaded design in new consumer. The reason that we need to mute other nodes for doing coordinator sync operations like join-group / offset commits / offset fetches is to avoid long blocking due to possible starvation on network selector, so I think they need to be done still. On the other hand, I think users using the commit API will usually fall into three categories: 1) I really care that the offsets to be committed before moving on to fetch more data, so I will wait FOREVER for it to complete. 2) I do not really care about whether it succeeds or not, so just fire commit and let's move on; if it fails it fails (and it will be logged). 3) I care if it succeeds or not, but I do not want to wait indefinitely; so let me know if it does not finish within some timeout or failed (i.e. give me the exceptions / error codes) and I will handle it. The current APIs does not handle case 3) above, which sits between BLOCK FOREVER and DO NOT CARE AT ALL, but most times people would not be very explicit about the exact timeout, but just knowing it is definite and reasonably short is good enough. I think for this we probably do not need an extra timeout / retry settings, but rely on the general request retry settings; similarly we probably do not need cancel. So I wonder if we can do a slightly different modification to API like this: void commit(MapTopicPartition, Long offsets, CommitType type, ConsumerCommitCallback callback); For case 1) people call commit(offsets) which will block forever until it succeeds; For case 2) people call commit(offsets, async) which will return immediately, with not callback upon finishes; For case 3) people call commit(offsets, async, callback), and the callback will be executed when it finishes or #.request retries has exhausted. This API will make much smaller changes to the current implementations as well. Of course if we have a common scenario where users would really care about the exact timeout for async commits, then Future may be a good approach. Guozhang On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, This makes sense. People usually do not want to stop consuming when committing offsets. One corner case about async commit with retries I am thinking is that it is possible that two offset commits interleave with each other and that might create problem. Like you said maybe we can cancel the previous one. Another thing is that whether the future mechanism will only be applied to auto commit or it will also be used in manual commit? Because in new consumer we allow user to provide an offset map for offset commit. Simply canceling a previous pending offset commit does not seem to be ideal in this case because the two commits could be for different partitions. Thanks. Jiangjie (Becket) Qin On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I'd like to get some feedback on changing the offset commit API in the new consumer. Since this is user-facing API I wanted to make sure this gets better visibility than the JIRA ( https://issues.apache.org/jira/browse/KAFKA-2123) might. The motivation is to make it possible to do async commits but be able to tell when the commit completes/fails. I'm suggesting changing the API from void commit(Map offsets, CommitType) to FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); which matches the approach used for the producer. The ConsumerCommitCallback only has one method: public void onCompletion(Exception exception); This enables a few different use cases: * Blocking commit via Future.get(), and blocking with timeouts via Future.get(long, TimeUnit) * See exceptions via the future (see discussion of retries below) * Callback-based notification so you can keep processing messages and only take action if something goes wrong, takes too long, etc. This is the use case that motivated * Fire and forget commits via a shorthand commit() API and ignoring the resulting future. One big difference between this and the producer API is that there isn't any result (except maybe an exception) from commitOffsets. This leads to the somewhat awkward FutureVoid signature.
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Thanks! Makes sense. Tom On Wednesday, April 22, 2015 1:25 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: You are right , I forgot to mention the ―operation option in CLI , I just added it. Sorry for about the confusion. Thanks Parth On 4/22/15, 11:22 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Sorry I missed your last questions. I am +0 on adding ―host option for ―list, we could add it for symmetry. Again if this is only a CLI change it can be added later if you mean adding this in authorizer interface then we should make a decision now. Given a choice I would like to actually keep only one option which is resource based get (remove even the get based on principal). I see those (getAcl for principal or host) as special filtering case which can easily be achieved by a third party tool by doing list all topics and calling getAcls for each topic and applying filtering logic on that. I really don’t see the need to make those first class citizens of the authorizer interface given these kind of queries will be issued outside of broker JVM so they will not benefit from the caching and because the storage will be indexed on resource both these options even as a first class API will just scan all topic acls and apply filtering logic. Thanks Parth On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Please see all the available options here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+ I nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it covers both hosts and operations and allows to specify a list for both. Thanks Parth From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Date: Wednesday, April 22, 2015 at 11:02 AM To: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com, dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is? kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I don't have to run this a bunch of times for each. Do you want to have a --host option for --list so that admins could see what acls apply to specific host(s)? Tom On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID wrote: Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER Done. 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? There is an overloaded version of removeAcls in the interface that takes in resource as the only input and as described in the javadoc all the acls attached to that resource will be deleted. To cover the setAcl use case the caller can first call remove and then add. 3) Can someone tell me what the use case to do acls based on the hosts? I can see some possibilities just wondering if we can concrete ones where one user is allowed from one host but not another. I am not sure if I understand the question given the use case you described in your question is what we are trying to cover with use of hosts in Acl. There are some additional use cases like “allow access to any user from host1,host2” but I think primarily it gives the admins the ability to define acls at a more granular level. 4) I'm a bit unclear how the resource works in the Authorizer class. From what I see we have 2 resources - topics and cluster. If I want to
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Parth, This is a long thread, so trying to keep up here, sorry if this has been covered before. First, great job on the KIP proposal and work so far. Are we sure that we want to tie host level access to a given user? My understanding is that the ACL will be (omitting some fields) user_a, host1, host2, host3 user_b, host1, host2, host3 So there would potentially be a lot of redundancy in the configs. Does it make sense to have hosts be at the same level as principal in the hierarchy? This way you could just blanket the allowed / denied hosts and only have to worry about the users. So if you follow this, then we can wildcard the user so we can have a separate list of just host-based access. What's the order that the perms would be evaluated if a there was more than one match on a principal ? Is the thought that there wouldn't usually be much overlap on hosts? I guess I can imagine a scenario where I want to offline/online access to a particular hosts or set of hosts and if there was overlap, I'm doing a bunch of alter commands for just a single host. Maybe this is too contrived an example? I agree that having this level of granularity gives flexibility but I wonder if people will actually use it and not just * the hosts for a given user and create separate global list as i mentioned above? The only other system I know of that ties users with hosts for access is MySql and I don't love that model. Companies usually standardize on group authorization anyway, are we complicating that issue with the inclusion of hosts attached to users? Additionally I worry about the debt of big JSON configs in the first place, most non-developers find them non-intuitive already, so anything to ease this I think would be beneficial. Thanks Jeff On Wed, Apr 22, 2015 at 2:22 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Sorry I missed your last questions. I am +0 on adding ―host option for ―list, we could add it for symmetry. Again if this is only a CLI change it can be added later if you mean adding this in authorizer interface then we should make a decision now. Given a choice I would like to actually keep only one option which is resource based get (remove even the get based on principal). I see those (getAcl for principal or host) as special filtering case which can easily be achieved by a third party tool by doing list all topics and calling getAcls for each topic and applying filtering logic on that. I really don’t see the need to make those first class citizens of the authorizer interface given these kind of queries will be issued outside of broker JVM so they will not benefit from the caching and because the storage will be indexed on resource both these options even as a first class API will just scan all topic acls and apply filtering logic. Thanks Parth On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Please see all the available options here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it covers both hosts and operations and allows to specify a list for both. Thanks Parth From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Date: Wednesday, April 22, 2015 at 11:02 AM To: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com, dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is? kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I don't have to run this a bunch of times for each. Do you want to have a --host option for --list so that admins could see what acls apply to specific host(s)? Tom On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves
Re: [DISCUSS] KIP-11- Authorization design for kafka security
You are right , I forgot to mention the ―operation option in CLI , I just added it. Sorry for about the confusion. Thanks Parth On 4/22/15, 11:22 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Sorry I missed your last questions. I am +0 on adding ―host option for ―list, we could add it for symmetry. Again if this is only a CLI change it can be added later if you mean adding this in authorizer interface then we should make a decision now. Given a choice I would like to actually keep only one option which is resource based get (remove even the get based on principal). I see those (getAcl for principal or host) as special filtering case which can easily be achieved by a third party tool by doing list all topics and calling getAcls for each topic and applying filtering logic on that. I really don’t see the need to make those first class citizens of the authorizer interface given these kind of queries will be issued outside of broker JVM so they will not benefit from the caching and because the storage will be indexed on resource both these options even as a first class API will just scan all topic acls and apply filtering logic. Thanks Parth On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Please see all the available options here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+ I nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it covers both hosts and operations and allows to specify a list for both. Thanks Parth From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Date: Wednesday, April 22, 2015 at 11:02 AM To: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com, dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is? kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I don't have to run this a bunch of times for each. Do you want to have a --host option for --list so that admins could see what acls apply to specific host(s)? Tom On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID wrote: Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER Done. 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? There is an overloaded version of removeAcls in the interface that takes in resource as the only input and as described in the javadoc all the acls attached to that resource will be deleted. To cover the setAcl use case the caller can first call remove and then add. 3) Can someone tell me what the use case to do acls based on the hosts? I can see some possibilities just wondering if we can concrete ones where one user is allowed from one host but not another. I am not sure if I understand the question given the use case you described in your question is what we are trying to cover with use of hosts in Acl. There are some additional use cases like “allow access to any user from host1,host2” but I think primarily it gives the admins the ability to define acls at a more granular level. 4) I'm a bit unclear how the resource works in the Authorizer class. From what I see we have 2 resources - topics and cluster. If I want to add an acl to allow joe to CREATE for the cluster then I call addAcls with Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster?
Re: [DISCUSS] KIP-11- Authorization design for kafka security
What about specifying the operation when I go to add or remove? Tom On Wednesday, April 22, 2015 1:23 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Sorry I missed your last questions. I am +0 on adding ―host option for ―list, we could add it for symmetry. Again if this is only a CLI change it can be added later if you mean adding this in authorizer interface then we should make a decision now. Given a choice I would like to actually keep only one option which is resource based get (remove even the get based on principal). I see those (getAcl for principal or host) as special filtering case which can easily be achieved by a third party tool by doing list all topics and calling getAcls for each topic and applying filtering logic on that. I really don’t see the need to make those first class citizens of the authorizer interface given these kind of queries will be issued outside of broker JVM so they will not benefit from the caching and because the storage will be indexed on resource both these options even as a first class API will just scan all topic acls and apply filtering logic. Thanks Parth On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Please see all the available options here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it covers both hosts and operations and allows to specify a list for both. Thanks Parth From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Date: Wednesday, April 22, 2015 at 11:02 AM To: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com, dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is? kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I don't have to run this a bunch of times for each. Do you want to have a --host option for --list so that admins could see what acls apply to specific host(s)? Tom On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote: FYI, I have modified the KIP to include group as resource. In order to access “joinGroup” and “commitOFfset” APIs the user will need a read permission on topic and WRITE permission on group. I plan to open a VOTE thread by noon if there are no more concerns. Thanks Parth On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID wrote: Hey everyone, Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize in advance if you have already covered some of my questions. I read through the wiki and had some comments and questions. 1) public enum Operation needs EDIT changed to ALTER Done. 2) Does the Authorizer class need a setAcls? Rather then just add to be able to set to explicit list and overwrite what was there? I see the kafka-acl.sh lists a removeall so I guess you could do removeall and then add. I also don't see a removeall in the Authorizer class, is it going to loop through them all to remove each one? There is an overloaded version of removeAcls in the interface that takes in resource as the only input and as described in the javadoc all the acls attached to that resource will be deleted. To cover the setAcl use case the caller can first call remove and then add. 3) Can someone tell me what the use case to do acls based on the hosts? I can see some possibilities just wondering if we can concrete ones where one user is allowed from one host but not another. I am not sure if I understand the question given the use case you described in your question is what we are trying to cover with use of hosts in Acl. There are some additional use cases like “allow access to any user from host1,host2” but I think primarily it gives the admins the ability to define acls at a more granular level. 4) I'm a bit unclear how the resource works in the Authorizer class. From what I see we have 2 resources - topics and cluster. If I want to add an acl to allow joe to CREATE for the cluster then I call addAcls with Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster? What if I want to call addAcls
Re: [DISCUSS] New consumer offset commit API
Hi Ewen, Only time I can think of where Application needs to know result of offset was committed or not during graceful shutdown and/or Runtime.addShutdownHook() so consumer application does not get duplicated records upon restart or does not have to deal with eliminating already process offset. Only thing that consumer application will have to handle is after XX retry failure to commit offset. Or would prefer application to manage this last offset commit when offset can not be commit due to failure, connection timeout or any other failure case ? Thanks, Bhavesh On Wed, Apr 22, 2015 at 11:20 AM, Jay Kreps jay.kr...@gmail.com wrote: I second Guozhang's proposal. I do think we need the callback. The current state is that for async commits you actually don't know if it succeeded. However there is a totally valid case where you do need to know if it succeeded but don't need to block, and without the callback you are stuck. I think the futures will likely cause problems since blocking on the future precludes polling which would allow it to complete. -Jay On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Ewen, I share the same concern you have about 2), that with the new API sync commit implementation is a bit awkward since we have a single-threaded design in new consumer. The reason that we need to mute other nodes for doing coordinator sync operations like join-group / offset commits / offset fetches is to avoid long blocking due to possible starvation on network selector, so I think they need to be done still. On the other hand, I think users using the commit API will usually fall into three categories: 1) I really care that the offsets to be committed before moving on to fetch more data, so I will wait FOREVER for it to complete. 2) I do not really care about whether it succeeds or not, so just fire commit and let's move on; if it fails it fails (and it will be logged). 3) I care if it succeeds or not, but I do not want to wait indefinitely; so let me know if it does not finish within some timeout or failed (i.e. give me the exceptions / error codes) and I will handle it. The current APIs does not handle case 3) above, which sits between BLOCK FOREVER and DO NOT CARE AT ALL, but most times people would not be very explicit about the exact timeout, but just knowing it is definite and reasonably short is good enough. I think for this we probably do not need an extra timeout / retry settings, but rely on the general request retry settings; similarly we probably do not need cancel. So I wonder if we can do a slightly different modification to API like this: void commit(MapTopicPartition, Long offsets, CommitType type, ConsumerCommitCallback callback); For case 1) people call commit(offsets) which will block forever until it succeeds; For case 2) people call commit(offsets, async) which will return immediately, with not callback upon finishes; For case 3) people call commit(offsets, async, callback), and the callback will be executed when it finishes or #.request retries has exhausted. This API will make much smaller changes to the current implementations as well. Of course if we have a common scenario where users would really care about the exact timeout for async commits, then Future may be a good approach. Guozhang On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, This makes sense. People usually do not want to stop consuming when committing offsets. One corner case about async commit with retries I am thinking is that it is possible that two offset commits interleave with each other and that might create problem. Like you said maybe we can cancel the previous one. Another thing is that whether the future mechanism will only be applied to auto commit or it will also be used in manual commit? Because in new consumer we allow user to provide an offset map for offset commit. Simply canceling a previous pending offset commit does not seem to be ideal in this case because the two commits could be for different partitions. Thanks. Jiangjie (Becket) Qin On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I'd like to get some feedback on changing the offset commit API in the new consumer. Since this is user-facing API I wanted to make sure this gets better visibility than the JIRA ( https://issues.apache.org/jira/browse/KAFKA-2123) might. The motivation is to make it possible to do async commits but be able to tell when the commit completes/fails. I'm suggesting changing the API from void commit(Map offsets, CommitType) to FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); which matches the approach used for the producer. The
[jira] [Commented] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.
[ https://issues.apache.org/jira/browse/KAFKA-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14508203#comment-14508203 ] Jiangjie Qin commented on KAFKA-2138: - Updated reviewboard https://reviews.apache.org/r/33417/diff/ against branch origin/trunk KafkaProducer does not honor the retry backoff time. Key: KAFKA-2138 URL: https://issues.apache.org/jira/browse/KAFKA-2138 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Critical Attachments: KAFKA-2138.patch, KAFKA-2138_2015-04-22_17:19:33.patch In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are not checking it in drain() as well. The problem is that if we have two partitions both on the same node, suppose Partition 1 should backoff while partition 2 should not. Currently partition 1's backoff time will be ignored. We should check the lastAttemptMs in drain() as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33417: Patch for KAFKA-2138
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/ --- (Updated April 23, 2015, 12:19 a.m.) Review request for kafka. Bugs: KAFKA-2138 https://issues.apache.org/jira/browse/KAFKA-2138 Repository: kafka Description (updated) --- Incorporated Joel's comments. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c Diff: https://reviews.apache.org/r/33417/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.
[ https://issues.apache.org/jira/browse/KAFKA-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2138: Attachment: KAFKA-2138_2015-04-22_17:19:33.patch KafkaProducer does not honor the retry backoff time. Key: KAFKA-2138 URL: https://issues.apache.org/jira/browse/KAFKA-2138 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Priority: Critical Attachments: KAFKA-2138.patch, KAFKA-2138_2015-04-22_17:19:33.patch In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are not checking it in drain() as well. The problem is that if we have two partitions both on the same node, suppose Partition 1 should backoff while partition 2 should not. Currently partition 1's backoff time will be ignored. We should check the lastAttemptMs in drain() as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Hi Roshan, Use the 'auto' value maybe will break the rule and mess up the configuration. @Jay, any thoughts? Thanks, Honghai Chen -Original Message- From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm] Sent: Thursday, April 23, 2015 6:27 AM To: dev@kafka.apache.org; Roshan Naik Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system +1 (non-binding). -- Harsha On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote: I see that it is safe to keep it this off by default due to some concerns. Eventually, for settings such as this whose 'preferred' value is platform specific (or based on other criteria), it might be worth considering having a default value that is not a constant but an 'auto' value .. When kafka boots up it can automatically use the preferred value. Ofcourse it would have to documented as to what auto means for a given platform. -roshan On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote: +1. This is an important performance fix for Windows-based clusters. -Jakob On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote: Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hi Jeff, Thanks a lot for the review. I think you have a valid point about acls being duplicated and the simplest solution would be to modify acls class so they hold a set of principals instead of single principal. i.e user_a,user_b has READ,WRITE,DESCRIBE Permissions on Topic1 from Host1, Host2, Host3. I think the evaluation order only matters for the permissionType which is Deny acls should be evaluated before allow acls. To give you an example suppose we have following acls acl1 - user1 is allowed to READ from all hosts. acl2 - host1 is allowed to READ regardless of who is the user. acl3 - host2 is allowed to READ regardless of who is the user. acl4 - user1 is denied to READ from host1. As stated in the KIP we first evaluate DENY so if user1 tries to access from host1 he will be denied(acl4), even though both user1 and host1 has acl’s for allow with wildcards (acl1, acl2). If user1 tried to READ from host2 , the action will be allowed and it does not matter if we match acl3 or acl1 so I don’t think the evaluation order matters here. “Will people actually use hosts with users?” I really don’t know but given ACl’s are part of our Public APIs I thought it is better to try and cover more use cases. If others think this extra complexity is not worth the value its adding please raise your concerns so we can discuss if it should be removed from the acl structure. Note that even in absence of hosts from ACL users will still be able to whitelist/blacklist host as long as we start supporting principalType = “host”, easy to add and can be an incremental improvement. They will however loose the ability to restrict access to users just from a set of hosts. We agreed to offer a CLI to overcome the JSON acl config https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+In terface#KIP-11-AuthorizationInterface-AclManagement(CLI). I still like Jsons but that probably has something to do with me being a developer :-). Thanks Parth On 4/22/15, 11:38 AM, Jeff Holoman jholo...@cloudera.com wrote: Parth, This is a long thread, so trying to keep up here, sorry if this has been covered before. First, great job on the KIP proposal and work so far. Are we sure that we want to tie host level access to a given user? My understanding is that the ACL will be (omitting some fields) user_a, host1, host2, host3 user_b, host1, host2, host3 So there would potentially be a lot of redundancy in the configs. Does it make sense to have hosts be at the same level as principal in the hierarchy? This way you could just blanket the allowed / denied hosts and only have to worry about the users. So if you follow this, then we can wildcard the user so we can have a separate list of just host-based access. What's the order that the perms would be evaluated if a there was more than one match on a principal ? Is the thought that there wouldn't usually be much overlap on hosts? I guess I can imagine a scenario where I want to offline/online access to a particular hosts or set of hosts and if there was overlap, I'm doing a bunch of alter commands for just a single host. Maybe this is too contrived an example? I agree that having this level of granularity gives flexibility but I wonder if people will actually use it and not just * the hosts for a given user and create separate global list as i mentioned above? The only other system I know of that ties users with hosts for access is MySql and I don't love that model. Companies usually standardize on group authorization anyway, are we complicating that issue with the inclusion of hosts attached to users? Additionally I worry about the debt of big JSON configs in the first place, most non-developers find them non-intuitive already, so anything to ease this I think would be beneficial. Thanks Jeff On Wed, Apr 22, 2015 at 2:22 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Sorry I missed your last questions. I am +0 on adding ―host option for ―list, we could add it for symmetry. Again if this is only a CLI change it can be added later if you mean adding this in authorizer interface then we should make a decision now. Given a choice I would like to actually keep only one option which is resource based get (remove even the get based on principal). I see those (getAcl for principal or host) as special filtering case which can easily be achieved by a third party tool by doing list all topics and calling getAcls for each topic and applying filtering logic on that. I really don’t see the need to make those first class citizens of the authorizer interface given these kind of queries will be issued outside of broker JVM so they will not benefit from the caching and because the storage will be indexed on resource both these options even as a first class API will just scan all topic acls and apply filtering logic. Thanks Parth On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Please see all the available options here
[jira] [Commented] (KAFKA-2139) Add a separate controller messge queue with higher priority on broker side
[ https://issues.apache.org/jira/browse/KAFKA-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507933#comment-14507933 ] Jay Kreps commented on KAFKA-2139: -- Awesome, greatly appreciated. I agree that a separate queue with a dedicated worker thread sounds more promising if we can come up with a clean way to distinguish controller vs non-controller traffic at the network layer. Add a separate controller messge queue with higher priority on broker side --- Key: KAFKA-2139 URL: https://issues.apache.org/jira/browse/KAFKA-2139 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin This ticket is supposed to be working together with KAFKA-2029. There are two issues with current controller to broker messages. 1. On the controller side the message are sent without synchronization. 2. On broker side the controller messages share the same queue as client messages. The problem here is that brokers process the controller messages for the same partition at different times and the variation could be big. This causes unnecessary data loss and prolong the preferred leader election / controlled shutdown/ partition reassignment, etc. KAFKA-2029 was trying to add a boundary between messages for different partitions. For example, before leader migration for previous partition finishes, the leader migration for next partition won't begin. This ticket is trying to let broker process controller messages faster. So the idea is have separate queue to hold controller messages, if there are controller messages, KafkaApi thread will first take care of those messages, otherwise it will proceed messages from clients. Those two tickets are not ultimate solution to current controller problems, but just mitigate them with minor code changes. Moving forward, we still need to think about rewriting controller in a cleaner way. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33417: Patch for KAFKA-2138
On April 21, 2015, 11:56 p.m., Guozhang Wang wrote: This piece of logic has been quite complex and awkward to me now, for example in ready() a node will only not be considered if ALL of its partitions are either not sendable or are in the backoff period, and the reason we want to get ready nodes before drain is to check if they are really ready or not. This is mainly because 1) we need to be careful when calling client.poll() later about the timeout value in order to avoid busy waiting, 2) we need to make sure if metadata refresh is needed, it needs to be sent as higher priority than other requests. I suggest re-writing this fraction of code to make it clearer, in the following process: 0. while handle metadata response and update the metadata, check for ANY partitions if their leader is not known; if there is set metadata.requestUpdate. So we do not need to do this step anymore at the start of run(). 1. get all the ready nodes based on their connection state only (i.e. no peeking in RecordAccumulator), and record the node_backoff as min (reconnection_backoff - time_waited) of all nodes; if one of these node is connected or connecting, this backoff should be 0. 2. for each of ready nodes, try to drain their corresponding partitions in RecordAccumulator while considering or kinds of conditions (full, expired, exhausted, etc...), and record the data_backoff as min (retry_backoff - time_waited) of all partitions; if one of the partitions is immediately sendable, this backoff should be 0. 3. formulate produce request and call client.poll() with timeout = reconnection_backoff 0 ? recconection_backoff : retry_backoff. 4. in NetworkClient.poll(), the logic of maybeUpdateMetadata while update metadataTimeout can also be simplified. This may contain some flaw, Jiangjie / Ewen let me know if you see any issues. Jiangjie Qin wrote: Hi Guozhang, I think that makes sense. We should exchange the checking order to check connection ready first then the data ready. I'll try to submit a refactored patch and will throw questions if there is anything. Thanks. I agree with the suggestions to rewrite but I really think that is orthogonal to this simple (and correct) fix. I think we should commit this and keep the jira open for further refactoring. - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/#review81097 --- On April 21, 2015, 10:51 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/ --- (Updated April 21, 2015, 10:51 p.m.) Review request for kafka. Bugs: KAFKA-2138 https://issues.apache.org/jira/browse/KAFKA-2138 Repository: kafka Description --- Patch for KAFKA-2138 honor retry backoff in KafkaProducer Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c Diff: https://reviews.apache.org/r/33417/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 33417: Patch for KAFKA-2138
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/#review81238 --- Ship it! clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java https://reviews.apache.org/r/33417/#comment131504 typo clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java https://reviews.apache.org/r/33417/#comment131513 Node1 should be the only ready node clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java https://reviews.apache.org/r/33417/#comment131506 batche - batch clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java https://reviews.apache.org/r/33417/#comment131508 paritition - partition clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java https://reviews.apache.org/r/33417/#comment131509 tp1 should backoff while tp2 should not clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java https://reviews.apache.org/r/33417/#comment131514 Node1 should be the only ready node - Joel Koshy On April 21, 2015, 10:51 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/ --- (Updated April 21, 2015, 10:51 p.m.) Review request for kafka. Bugs: KAFKA-2138 https://issues.apache.org/jira/browse/KAFKA-2138 Repository: kafka Description --- Patch for KAFKA-2138 honor retry backoff in KafkaProducer Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c Diff: https://reviews.apache.org/r/33417/diff/ Testing --- Thanks, Jiangjie Qin
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1 (non-binding). -- Harsha On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote: I see that it is safe to keep it this off by default due to some concerns. Eventually, for settings such as this whose 'preferred' value is platform specific (or based on other criteria), it might be worth considering having a default value that is not a constant but an 'auto' value .. When kafka boots up it can automatically use the preferred value. Ofcourse it would have to documented as to what auto means for a given platform. -roshan On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote: +1. This is an important performance fix for Windows-based clusters. -Jakob On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote: Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1. This is an important performance fix for Windows-based clusters. -Jakob On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote: Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+ pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
Re: [DISCUSS] KIP-11- Authorization design for kafka security
same question here as well on hosts. If I would like to secure a topic, regardless where the topic is, I would like it to be secured. It is hard to imagine that one would like a topic to be secured on one host, but not on the other unless a topic spreads over multiple hosts really meant to be totally different things. Then it will be really needed. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com From: Jeff Holoman jholo...@cloudera.com To: dev@kafka.apache.org Cc: Tom Graves tgraves...@yahoo.com Date: 04/22/2015 02:40 PM Subject:Re: [DISCUSS] KIP-11- Authorization design for kafka security Parth, This is a long thread, so trying to keep up here, sorry if this has been covered before. First, great job on the KIP proposal and work so far. Are we sure that we want to tie host level access to a given user? My understanding is that the ACL will be (omitting some fields) user_a, host1, host2, host3 user_b, host1, host2, host3 So there would potentially be a lot of redundancy in the configs. Does it make sense to have hosts be at the same level as principal in the hierarchy? This way you could just blanket the allowed / denied hosts and only have to worry about the users. So if you follow this, then we can wildcard the user so we can have a separate list of just host-based access. What's the order that the perms would be evaluated if a there was more than one match on a principal ? Is the thought that there wouldn't usually be much overlap on hosts? I guess I can imagine a scenario where I want to offline/online access to a particular hosts or set of hosts and if there was overlap, I'm doing a bunch of alter commands for just a single host. Maybe this is too contrived an example? I agree that having this level of granularity gives flexibility but I wonder if people will actually use it and not just * the hosts for a given user and create separate global list as i mentioned above? The only other system I know of that ties users with hosts for access is MySql and I don't love that model. Companies usually standardize on group authorization anyway, are we complicating that issue with the inclusion of hosts attached to users? Additionally I worry about the debt of big JSON configs in the first place, most non-developers find them non-intuitive already, so anything to ease this I think would be beneficial. Thanks Jeff On Wed, Apr 22, 2015 at 2:22 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Sorry I missed your last questions. I am +0 on adding ―host option for ―list, we could add it for symmetry. Again if this is only a CLI change it can be added later if you mean adding this in authorizer interface then we should make a decision now. Given a choice I would like to actually keep only one option which is resource based get (remove even the get based on principal). I see those (getAcl for principal or host) as special filtering case which can easily be achieved by a third party tool by doing list all topics and calling getAcls for each topic and applying filtering logic on that. I really don’t see the need to make those first class citizens of the authorizer interface given these kind of queries will be issued outside of broker JVM so they will not benefit from the caching and because the storage will be indexed on resource both these options even as a first class API will just scan all topic acls and apply filtering logic. Thanks Parth On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: Please see all the available options here https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization +I nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it covers both hosts and operations and allows to specify a list for both. Thanks Parth From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com Date: Wednesday, April 22, 2015 at 11:02 AM To: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com, dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security Thanks for the explanations Parth. On the configs questions, the way I see it is its more likely to accidentally give everyone access, especially since you have to run a separate command to change the acls. If there was some config for defaults, a cluster admin could change that to be nobody or certain set of users, then grant others permissions. This would also remove the race between commands. This is something you can always add later though if people request it. So in kafka-acl.sh how do I actually tell it what the operation is? kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate where does READ, WRITE, etc go? Can specify as a list so I
Re: Review Request 31850: Patch for KAFKA-1660
On April 20, 2015, 5:30 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 157 https://reviews.apache.org/r/31850/diff/9/?file=931821#file931821line157 Read locks are very expensive. I am pretty worried about this. If we want to do this we need to do a pretty detailed examination of the perf impact. Jiangjie Qin wrote: Hi Jay, I looked into the ReentrantReaderWriterLock implementation and it seems under the hood it uses CompareAndSet which should provide similar performance as atomic integer. But I agree this definitely largely depends on implementation. I modified o.a.k.clients.tools.ProducerPerformance a little bit to make it multiple threaded. The performance in following tests settings are very similar which are all ~1M messages/second when target is 10M message/sec. 1. 10 thread with latest trunk 2. 10 threads using atomic integer AtomicInteger 3. 10 threads using ReaderWriterLock When I increase the thread number to 50. It drops to about 0.82M messages/second in all cases. It seems reader lock did not introduce performance issue. Hey Jay, do you have any other performance tests in mind that we should run? - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review80753 --- On April 21, 2015, 12:38 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/ --- (Updated April 21, 2015, 12:38 a.m.) Review request for kafka. Bugs: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Repository: kafka Description --- A minor fix. Incorporated Guozhang's comments. Modify according to the latest conclusion. Patch for the finally passed KIP-15git status Addressed Joel and Guozhang's comments. rebased on trunk Rebase on trunk Addressed Joel's comments. Addressed Joel's comments Addressed Jay's comments Changed javadoc per Jay's suggestion Change java doc as Jay suggested. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b70e1a3d406338d4b9ddd6188d2820e87545a9b6 clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db Diff: https://reviews.apache.org/r/31850/diff/ Testing --- Unit tests passed. Thanks, Jiangjie Qin
Re: Review Request 33088: add heartbeat to coordinator
On April 22, 2015, 2:33 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/api/RequestKeys.scala, lines 55-58 https://reviews.apache.org/r/33088/diff/1/?file=923567#file923567line55 Can we just add these two request into keyToNameAndDeserializerMap? I undid this change since Gwen's patch addressed the ApiKeys issue: https://issues.apache.org/jira/browse/KAFKA-2115 On April 22, 2015, 2:33 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/network/RequestChannel.scala, line 29 https://reviews.apache.org/r/33088/diff/1/?file=923576#file923576line29 Seems ApiKeys are not used? I undid this change since Gwen's patch addressed the ApiKeys issue: https://issues.apache.org/jira/browse/KAFKA-2115 On April 22, 2015, 2:33 a.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 414 https://reviews.apache.org/r/33088/diff/1/?file=923568#file923568line414 We do not need to cut the socket connection from the coordinator. Comment has been removed in later rb update. - Onur --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/#review79762 --- On April 18, 2015, 7:16 p.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33088/ --- (Updated April 18, 2015, 7:16 p.m.) Review request for kafka. Bugs: KAFKA-1334 https://issues.apache.org/jira/browse/KAFKA-1334 Repository: kafka Description --- add heartbeat to coordinator todo: - see how it performs under real load - add error code handling on the consumer side - implement the partition assignors Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 456b602245e111880e1b8b361319cabff38ee0e9 core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala 2f5797064d4131ecfc9d2750d9345a9fa3972a9a core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala df60cbc35d09937b4e9c737c67229889c69d8698 core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 8defa2e41c92f1ebe255177679d275c70dae5b3e core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION core/src/main/scala/kafka/coordinator/GroupRegistry.scala 94ef5829b3a616c90018af1db7627bfe42e259e5 core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala 821e26e97eaa97b5f4520474fff0fedbf406c82a core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION core/src/main/scala/kafka/server/DelayedOperationKey.scala b673e43b0ba401b2e22f27aef550e3ab0ef4323c core/src/main/scala/kafka/server/KafkaApis.scala b4004aa3a1456d337199aa1245fb0ae61f6add46 core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33088/diff/ Testing --- Thanks, Onur Karaman
Re: Review Request 33204: Patch for KAFKA-1646
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33204/ --- (Updated April 22, 2015, 10:13 a.m.) Review request for kafka. Bugs: KAFKA-1646 https://issues.apache.org/jira/browse/KAFKA-1646 Repository: kafka Description (updated) --- Kafka 1646 fix improve for set position correctly. Diffs (updated) - core/src/main/scala/kafka/log/FileMessageSet.scala 2522604bd985c513527fa0c863a7df677ff7a503 core/src/main/scala/kafka/log/Log.scala 5563f2de8113a0ece8929bec9c75dbf892abbb66 core/src/main/scala/kafka/log/LogConfig.scala da55a348f37a3d6d99032c39398f7ccb11068f42 core/src/main/scala/kafka/log/LogSegment.scala ed039539ac18ea4d65144073915cf112f7374631 core/src/main/scala/kafka/server/KafkaConfig.scala cfbbd2be550947dd2b3c8c2cab981fa08fb6d859 core/src/main/scala/kafka/server/KafkaServer.scala c63f4ba9d622817ea8636d4e6135fba917ce085a core/src/main/scala/kafka/utils/CoreUtils.scala c473a034bc3a00ccddbbc0506388a5a7763df950 Diff: https://reviews.apache.org/r/33204/diff/ Testing --- Thanks, Honghai Chen
[jira] [Updated] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Honghai Chen updated KAFKA-1646: Attachment: KAFKA-1646.patch Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506767#comment-14506767 ] Honghai Chen commented on KAFKA-1646: - Created reviewboard against branch origin/trunk Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506767#comment-14506767 ] Honghai Chen edited comment on KAFKA-1646 at 4/22/15 10:15 AM: --- New code review board https://reviews.apache.org/r/33204/diff/2/ patch against trunk also attached. was (Author: waldenchen): Created reviewboard against branch origin/trunk Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+ pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506795#comment-14506795 ] Honghai Chen commented on KAFKA-1646: - Created reviewboard against branch origin/trunk Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, KAFKA-1646_20150422.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Honghai Chen updated KAFKA-1646: Comment: was deleted (was: Created reviewboard against branch origin/trunk) Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, KAFKA-1646_20150422.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Honghai Chen updated KAFKA-1646: Attachment: KAFKA-1646_20150422.patch Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, KAFKA-1646_20150422.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
When we were working on the client-side SSL implementation for Kafka, we found that returning selection interest from handshake() method wasn't sufficient to handle some of the SSL sequences. We resorted to managing the selection key and interest state within SSLChannel to avoid SSL-specific knowledge escaping out of SSL classes into protocol-independent network code. The current server-side SSL patch doesn't address these scenarios yet, but we may want to take these into account while designing the common Channel class/interface. 1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. 2. *Renegotiation handshake*: During a read operation, handshake status may indicate that renegotiation is required. It will be good to encapsulate this state change (and any knowledge of these SSL-specific state transitions) within SSLChannel. Our experience was that managing keys and state within the SSLChannel rather than in Selector made this code neater. 3. *Graceful shutdown of the SSL connection*s: Our experience was that we could encapsulate all of the logic for shutting down SSLEngine gracefully within SSLChannel when the selection key and state are owned and managed by SSLChannel. 4. *And finally a minor point:* We found that by managing selection key and selection interests within SSLChannel, protocol-independent Selector didn't need the concept of handshake at all and all channel state management and handshake related code could be held in protocol-specific classes. This may be worth taking into consideration since it makes it easier for common network layer code to be maintained without any understanding of the details of individual security protocols. The channel classes we used are included in the patch in https://issues.apache.org/jira/browse/KAFKA-1690. The patch contains unit tests to validate these scenarios as well as other buffer overflow conditions which may be useful for server-side code when the scenarios described above are implemented. Regards, Rajini On Tue, Apr 21, 2015 at 11:13 PM, Sriharsha Chintalapani harsh...@fastmail.fm wrote: Hi Jay, Thanks for the review. 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... I’ve non-blocking handshake on the server side as well as for new producer client. Blocking handshake is only done for BlockingChannel.scala and it just loops over the non-blocking hand shake until the context is established. So on the server side (SocketServer.scala) as it goes through the steps and returns “READ or WRITE” signal for next step. For BlockingChannel the worst case I look at is the connection timeout but most times this handshake will finish up much quicker . I am cleaning up the code will send up a patch in next few days . 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should probably just leave those be to reduce scope here. So blocking channel used not only by simple consumer but also ControllerChannelManager and controlled shutdown also. Are we planning on deprecating it. I think at least for ControllerChannelManager it makes sense to have a blocking channel. If the users want to lock down the cluster i.e no PLAINTEXT channels are allowed than all the communication has to go through either SSL and KERBEROS so in this case we need add this capability to BlockingChannel. 3. Can we change the APIs to drop the getters when that is not required by the API being implemented. In general we don't use setters and getters as a naming convention. My bad on adding getters and setters :). I’ll work on removing it and change the KIP accordingly. I still need some accessor methods though . Thanks, Harsha On April 21, 2015 at 2:51:15 PM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Sriharsha, Thanks for the excellent write-up. Couple of minor questions: 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should