Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Tom Graves
Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll apologize 
in advance if you have already covered some of my questions.  I read through 
the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

2) Does the Authorizer class need a setAcls?  Rather then just add to be able 
to set to explicit list and overwrite what was there?  I see the kafka-acl.sh 
lists a removeall so I guess you could do removeall and then add.  I also don't 
see a removeall in the Authorizer class, is it going to loop through them all 
to remove each one?
3) Can someone tell me what the use case to do acls based on the hosts?  I can 
see some possibilities just wondering if we can concrete ones where one user is 
allowed from one host but not another.
4) I'm a bit unclear how the resource works in the Authorizer class.  From 
what I see we have 2 resources - topics and cluster.  If I want to add an acl 
to allow joe to CREATE for the cluster then I call addAcls with  Acl(user: 
joe, ALLOW, Set(*), Set(CREATE)) and cluster?  What if I want to call 
addAcls for DESCRIBE on a topic?  Is the resource then topic or is it the 
topic name?
5) reassigning partitions is a CLUSTER_ACTION or superuser?  Its not totally 
clear to me the differences between these.  what about increasing # of 
partitions?
6) groups are mentioned, are we supporting right away or is that a follow on 
item? (is there going to be a kafka.supergroups)
7) Are there config options for setting acls when I create my topic?  Or do I 
have to create my topic and then run the kafka-acl.sh script to set them?  
Although its very small, there would be possible race there that someone could 
start producing to topic before acls are set.
8) are there configs for cluster level acl defaults?  Or does it default to 
superusers on bringing up new cluster and you have to modify with cli.
thanks,Tom

 On Tuesday, April 21, 2015 7:10 PM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:
   

 I have added the notes to KIP-11 Open question sections.

Thanks
Parth

On 4/21/15, 4:49 PM, Gwen Shapira gshap...@cloudera.com wrote:

Adding my notes from today's call to the thread:

** Deny or Allow all by default? We will add a configuration to
control this. The configuration will default to “allow” for backward
compatibility. Security admins can set it to deny

** Storing ACLs for default authorizers: We'll store them in ZK. We'll
support pointing the authorizer to any ZK.
The use of ZK will be internal to the default authorizer. Authorizer
reads ACLs from cache every hour. We proposed having mechanism
(possibly via new ZK node) to tell broker to refresh the cache
immediately.

** Support deny as permission type - we agreed to keep this.

** Mapping operations to API: We may need to add Group as a resource,
with JoinGroup and OffsetCommit require privilege on the consumer
group.
This can be something we pass now and authorizers can support in
future. - Jay will write specifics to the mailing list discussion.

On Tue, Apr 21, 2015 at 4:32 PM, Jay Kreps jay.kr...@gmail.com wrote:
 Following up on the KIP discussion. Two options for authorizing
consumers
 to read topic t as part of group g:
 1. READ permission on resource /topic/t
 2. READ permission on resource /topic/t AND WRITE permission on /group/g

 The advantage of (1) is that it is simpler. The disadvantage is that any
 member of any group that reads from t can commit offsets as any other
 member of a different group. This doesn't effect data security (who can
 access what) but it is a bit of a management issue--a malicious person
can
 cause data loss or duplicates for another consumer by committing offset.

 I think I favor (2) but it's worth it to think it through.

 -Jay

 On Tue, Apr 21, 2015 at 2:43 PM, Parth Brahmbhatt 
 pbrahmbh...@hortonworks.com wrote:

 Hey Jun,

 Yes and we support wild cards for all acl entities principal, hosts and
 operation.

 Thanks
 Parth

 On 4/21/15, 9:06 AM, Jun Rao j...@confluent.io wrote:

 Harsha, Parth,
 
 Thanks for the clarification. This makes sense. Perhaps we can
clarify the
 meaning of those rules in the wiki.
 
 Related to this, it seems that we need to support wildcard in
cli/request
 protocol for topics?
 
 Jun
 
 On Mon, Apr 20, 2015 at 9:07 PM, Parth Brahmbhatt 
 pbrahmbh...@hortonworks.com wrote:
 
  The iptables on unix supports the DENY operator, not that it should
  matter. The deny operator can also be used to specify ³allow user1
to
 READ
  from topic1 from all hosts but host1,host2². Again we could add a
host
  group semantic and extra complexity around that, not sure if its
worth
 it.
  In addition with DENY operator you are now not forced to create a
 special
  group just to support the authorization use case. I am not convinced
 that
  the operator it self is really all that confusing. There are 3
practical
  use cases:
  - Resource with no acl what so ever - allow access to everyone (
just
 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Parth Brahmbhatt

FYI, I have modified the KIP to include group as resource. In order to
access “joinGroup” and “commitOFfset” APIs the user will need a read
permission on topic and WRITE permission on group.

I plan to open a VOTE thread by noon if there are no more concerns.

Thanks
Parth

On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALID wrote:

Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll
apologize in advance if you have already covered some of my questions.  I
read through the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

   Done.

2) Does the Authorizer class need a setAcls?  Rather then just add to be
able to set to explicit list and overwrite what was there?  I see the
kafka-acl.sh lists a removeall so I guess you could do removeall and then
add.  I also don't see a removeall in the Authorizer class, is it going
to loop through them all to remove each one?

There is an overloaded version of removeAcls in the interface that takes
in resource as the only input and as described in the javadoc all the acls
attached to that resource will be deleted. To cover the setAcl use case
the caller can first call remove and then add.

3) Can someone tell me what the use case to do acls based on the hosts?
I can see some possibilities just wondering if we can concrete ones where
one user is allowed from one host but not another.

I am not sure if I understand the question given the use case you
described in your question is what we are trying to cover with use of
hosts in Acl. There are some additional use cases like “allow access to
any user from host1,host2” but I think primarily it gives the admins the
ability to define acls at a more granular level.

4) I'm a bit unclear how the resource works in the Authorizer class.
From what I see we have 2 resources - topics and cluster.  If I want to
add an acl to allow joe to CREATE for the cluster then I call addAcls
with  Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster?  What
if I want to call addAcls for DESCRIBE on a topic?  Is the resource then
topic or is it the topic name?

We now have 3 resources(added group), please see the updated doc. The
CREATE acl that you described is correct. For any topic operation you
should use topic name as the resource name and for group the user will
provide groupId as resource name.

5) reassigning partitions is a CLUSTER_ACTION or superuser?  Its not
totally clear to me the differences between these.  what about increasing
# of partitions?

I see this as an alter topic operation so it is at topic level and the
user must have alter permissions on topic.  

6) groups are mentioned, are we supporting right away or is that a follow
on item? (is there going to be a kafka.supergroups)

I think it can be a separate jira just for braking down the code review
in smaller chunk. We will support it in first version but I think if we
can not do it for any reason that should not block a release with all the
other authZ work. We made deliberate design choices (like introducing a
principalType in KafkaPrinciapl) to allow supporting groups as an
incremental change.

7) Are there config options for setting acls when I create my topic?  Or
do I have to create my topic and then run the kafka-acl.sh script to set
them?  Although its very small, there would be possible race there that
someone could start producing to topic before acls are set.

We discussed this yesterday and we agreed to go with kafka-acl.sh. Yes
there is a very very small window of vulnerability but I think that really
does not warrant to change the decision in this case.

8) are there configs for cluster level acl defaults?  Or does it default
to superusers on bringing up new cluster and you have to modify with cli.
thanks,Tom

No defaults, the default is superusers will have full access. I don’t
think making assumptions about ones security requirement should be our
burden.


 On Tuesday, April 21, 2015 7:10 PM, Parth Brahmbhatt
pbrahmbh...@hortonworks.com wrote:
   

 I have added the notes to KIP-11 Open question sections.

Thanks
Parth

On 4/21/15, 4:49 PM, Gwen Shapira gshap...@cloudera.com wrote:

Adding my notes from today's call to the thread:

** Deny or Allow all by default? We will add a configuration to
control this. The configuration will default to “allow” for backward
compatibility. Security admins can set it to deny

** Storing ACLs for default authorizers: We'll store them in ZK. We'll
support pointing the authorizer to any ZK.
The use of ZK will be internal to the default authorizer. Authorizer
reads ACLs from cache every hour. We proposed having mechanism
(possibly via new ZK node) to tell broker to refresh the cache
immediately.

** Support deny as permission type - we agreed to keep this.

** Mapping operations to API: We may need to add Group as a resource,
with JoinGroup and OffsetCommit require privilege 

Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-22 Thread Sriharsha Chintalapani
Hi Rajini,
       Thanks for the details. I did go through your code . There was a 
discussion before about not having selector related code into the channel or 
extending the selector it self. 

1. *Support for running potentially long-running delegated tasks outside 
the network thread*: It is recommended that delegated tasks indicated by 
a handshake status of NEED_TASK are run on a separate thread since they may 
block ( 
http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). 
It is easier to encapsulate this in SSLChannel without any changes to 
common code if selection keys are managed within the Channel. 


 This makes sense I can change code to not do it on the network thread.



2. *Renegotiation handshake*: During a read operation, handshake status 
may indicate that renegotiation is required. It will be good to encapsulate 
this state change (and any knowledge of these SSL-specific state 
transitions) within SSLChannel. Our experience was that managing keys and 
state within the SSLChannel rather than in Selector made this code neater. 
Do we even want to support renegotiation. This is a case where user/client 
handshakes with server anonymously

but later want to change and present their identity and establish a new SSL 
session. In our producer or consumers either present their identity ( two -way 
auth) or not.  Since these are long running processes I don’t see that there 
might be a case where they initially establish the session and later present 
their identity.  



*Graceful shutdown of the SSL connection*s: Our experience was that 
we could encapsulate all of the logic for shutting down SSLEngine 
gracefully within SSLChannel when the selection key and state are owned and 
managed by SSLChannel. 

Can’t this be done when channel.close() is called any reason to own the 
selection key.

4. *And finally a minor point:* We found that by managing selection key 
and selection interests within SSLChannel, protocol-independent Selector 
didn't need the concept of handshake at all and all channel state 
management and handshake related code could be held in protocol-specific 
classes. This may be worth taking into consideration since it makes it 
easier for common network layer code to be maintained without any 
understanding of the details of individual security protocols. 
The only thing network code( SocketServer) is aware of channel 
isHandshakeComplete if its not do the handshake

or go about read/write from channel. Yes socketServer need to be aware of 
channel is ready to read or not. But on the other hand

there isn’t too many details of handshake leaked into socketServer.  Either we 
let server know that a channel needs handshake or we keep the selectionKey  
state into channel which means we are adding selector related code into 
channel. 



Thanks,
Harsha


On April 22, 2015 at 3:56:04 AM, Rajini Sivaram (rajinisiva...@googlemail.com) 
wrote:

When we were working on the client-side SSL implementation for Kafka, we  
found that returning selection interest from handshake() method wasn't  
sufficient to handle some of the SSL sequences. We resorted to managing the  
selection key and interest state within SSLChannel to avoid SSL-specific  
knowledge escaping out of SSL classes into protocol-independent network  
code. The current server-side SSL patch doesn't address these scenarios  
yet, but we may want to take these into account while designing the common  
Channel class/interface.  

1. *Support for running potentially long-running delegated tasks outside  
the network thread*: It is recommended that delegated tasks indicated by  
a handshake status of NEED_TASK are run on a separate thread since they may  
block (  
http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html).  
It is easier to encapsulate this in SSLChannel without any changes to  
common code if selection keys are managed within the Channel.  
2. *Renegotiation handshake*: During a read operation, handshake status  
may indicate that renegotiation is required. It will be good to encapsulate  
this state change (and any knowledge of these SSL-specific state  
transitions) within SSLChannel. Our experience was that managing keys and  
state within the SSLChannel rather than in Selector made this code neater.  
3. *Graceful shutdown of the SSL connection*s: Our experience was that  
we could encapsulate all of the logic for shutting down SSLEngine  
gracefully within SSLChannel when the selection key and state are owned and  
managed by SSLChannel.  
4. *And finally a minor point:* We found that by managing selection key  
and selection interests within SSLChannel, protocol-independent Selector  
didn't need the concept of handshake at all and all channel state  
management and handshake related code could be held in protocol-specific  
classes. This may be worth taking into consideration since it makes it  
easier for common network layer code to be maintained without any  

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)

2015-04-22 Thread Andrii Biletskyi
As said above, I spent some time thinking about AlterTopicRequest
semantics and batching.

Firstly, about AlterTopicRequest. Our goal here is to see whether we
can suggest some simple semantics and at the same time let users
change different things in one instruction (hereinafter instruction - is
one of the entries in batch request).
We can resolve arguments according to this schema:
1) If ReplicaAsignment is specified:
it's a reassign partitions request
2) If either Partitions or ReplicationFactor is specified:
   a) If Partitions specified - this is increase partitions case
   b) If ReplicationFactor is specified - this means we need to
automatically
   regenerate replica assignment and treat it as reassign partitions request
Note: this algorithm is a bit inconsistent with the CreateTopicRequest -
with
ReplicaAssignment specified there user can implicitly define Partitions and
ReplicationFactor, in AlterTopicRequest those are completely different
things,
i.e. you can't include new partitions to the ReplicaAssignment to
implicitly ask
controller to increase partitions - controller will simply return
InvalidReplicaAssignment,
because you included unknown partitions.

Secondly, multiple instructions for one topic in batch request. I have a
feeling
it becomes a really big mess now, so suggestions are highly appreciated
here!
Our goal is to consider whether we can let users add multiple instructions
for one topic in one batch but at the same time make it transparent enough
so
we can support blocking on request completion, for that we need to analyze
from the request what is the final expected state of the topic.
And the latter one seems to me a tough issue.
Consider the following AlterTopicRequest:
[1) topic1: change ReplicationFactor from 2 to 3,
 2) topic1: change ReplicaAssignment (taking into account RF is 3 now),
 3) topic2: change ReplicaAssignment (just to include multiple topics)
 4) topic1: change ReplicationFactor from 3 to 1,
 5) topic1: change ReplicaAssignment again (taking into account RF is 1 now)
]
As we discussed earlier, controller will handle it as alter-topic command
and
reassign-partitions. First of all, it will scan all ReplicaAssignment and
assembly
those to one json to create admin path /reassign_partitions once needed.
Now, user would expect we execute instruction sequentially, but we can't do
it
because only one reassign-partitions procedure can be in progress - when
should we trigger reassign-partition - after 1) or after 4)? And what about
topic2 -
we will break the order, but it was supposed we execute instructions
sequentially.
Overall, the logic seems to be very sophisticated, which is a bad sign.
Conceptually,
I think the root problem is that we imply there is an order in sequential
instructions,
but instructions themselves are asynchronous, so really you can't guarantee
any order.
I'm thinking about such solutions now:
1) Prohibit multiple instructions (this seems reasonable if we let users
change multiple
things in scope of now instructions - see the first item)
2) Break apart again AlterTopic and ReassignPartitions - if the
reassignment case is
the only problem here, which I'm not sure about.

Thoughts?

Thanks,
Andrii Biletskyi




On Wed, Apr 22, 2015 at 2:59 AM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Guys,

 Thank you for your time. A short summary of our discussion.
 Answering previous items:

 1. 2. I will double check existing error codes to align the list of
 errors that needs to be added.

 3. We agreed to think again about the batch requests semantics.
 The main concern is that users would expect we allow executing
 multiple instructions for one topic in one batch.
 I will start implementation and check whether there are any impediments
 to handle it this way.

 The same for AlterTopicRequest - I will try to make request semantics
 as easy as possible and allow users change different things at one
 time - e.g. change nr of partitions and replicas in one instruction.

 4. We agreed not to add to TMR lag information.

 5. We discussed preferred replica command and it was pointed out
 that generally users shouldn't call this command manually now since
 this is automatically handled by the cluster.
 If there are no objections (especially from devops people) I will remove
 respective request.

 6. As discussed AdminClient API is a phase 2 and will go after Wire
 Protocol extensions. It will be finalized as java-doc after I complete
 patch for phase 1 - Wire Protocol + server-side code handling requests.

 Thanks,
 Andrii Biletskyi

 On Wed, Apr 22, 2015 at 12:36 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Andrii, thanks for all the hard work on this, it has come a long way.

 A couple questions and comments on this.

 For the errors, can we do the following:
 1. Remove IllegalArgument from the name, we haven't used that convention
 for other errors.
 2. Normalize this list with the existing errors. For example, elsewhere
 when you give an invalid 

Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-22 Thread Sriharsha Chintalapani
1. *Support for running potentially long-running delegated tasks outside 
the network thread*: It is recommended that delegated tasks indicated by 
a handshake status of NEED_TASK are run on a separate thread since they may 
block ( 
http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). 
It is easier to encapsulate this in SSLChannel without any changes to 
common code if selection keys are managed within the Channel. 


 This makes sense I can change code to not do it on the network thread.

Right now we are doing the handshake as part of the processor ( it shouldn’t be 
in acceptor) and we have multiple processors thread. Do we still see this as an 
issue if it happens on the same thread as processor? . 





-- 
Harsha
Sent with Airmail

On April 22, 2015 at 7:18:17 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) 
wrote:

Hi Rajini,
       Thanks for the details. I did go through your code . There was a 
discussion before about not having selector related code into the channel or 
extending the selector it self. 

1. *Support for running potentially long-running delegated tasks outside 
the network thread*: It is recommended that delegated tasks indicated by 
a handshake status of NEED_TASK are run on a separate thread since they may 
block ( 
http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). 
It is easier to encapsulate this in SSLChannel without any changes to 
common code if selection keys are managed within the Channel. 


 This makes sense I can change code to not do it on the network thread.



2. *Renegotiation handshake*: During a read operation, handshake status 
may indicate that renegotiation is required. It will be good to encapsulate 
this state change (and any knowledge of these SSL-specific state 
transitions) within SSLChannel. Our experience was that managing keys and 
state within the SSLChannel rather than in Selector made this code neater. 
Do we even want to support renegotiation. This is a case where user/client 
handshakes with server anonymously

but later want to change and present their identity and establish a new SSL 
session. In our producer or consumers either present their identity ( two -way 
auth) or not.  Since these are long running processes I don’t see that there 
might be a case where they initially establish the session and later present 
their identity.  



*Graceful shutdown of the SSL connection*s: Our experience was that 
we could encapsulate all of the logic for shutting down SSLEngine 
gracefully within SSLChannel when the selection key and state are owned and 
managed by SSLChannel. 

Can’t this be done when channel.close() is called any reason to own the 
selection key.

4. *And finally a minor point:* We found that by managing selection key 
and selection interests within SSLChannel, protocol-independent Selector 
didn't need the concept of handshake at all and all channel state 
management and handshake related code could be held in protocol-specific 
classes. This may be worth taking into consideration since it makes it 
easier for common network layer code to be maintained without any 
understanding of the details of individual security protocols. 
The only thing network code( SocketServer) is aware of channel 
isHandshakeComplete if its not do the handshake

or go about read/write from channel. Yes socketServer need to be aware of 
channel is ready to read or not. But on the other hand

there isn’t too many details of handshake leaked into socketServer.  Either we 
let server know that a channel needs handshake or we keep the selectionKey  
state into channel which means we are adding selector related code into 
channel. 



Thanks,
Harsha


On April 22, 2015 at 3:56:04 AM, Rajini Sivaram (rajinisiva...@googlemail.com) 
wrote:

When we were working on the client-side SSL implementation for Kafka, we
found that returning selection interest from handshake() method wasn't
sufficient to handle some of the SSL sequences. We resorted to managing the
selection key and interest state within SSLChannel to avoid SSL-specific
knowledge escaping out of SSL classes into protocol-independent network
code. The current server-side SSL patch doesn't address these scenarios
yet, but we may want to take these into account while designing the common
Channel class/interface.

1. *Support for running potentially long-running delegated tasks outside
the network thread*: It is recommended that delegated tasks indicated by
a handshake status of NEED_TASK are run on a separate thread since they may
block (
http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html).
It is easier to encapsulate this in SSLChannel without any changes to
common code if selection keys are managed within the Channel.
2. *Renegotiation handshake*: During a read operation, handshake status
may indicate that renegotiation is required. It will be good to encapsulate
this state change (and any knowledge of these SSL-specific state
transitions) within 

[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-04-22 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506489#comment-14506489
 ] 

Parth Brahmbhatt commented on KAFKA-1688:
-

Created reviewboard https://reviews.apache.org/r/33431/diff/
 against branch origin/trunk

 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Parth Brahmbhatt
 Fix For: 0.8.3

 Attachments: KAFKA-1688.patch, KAFKA-1688_2015-04-10_11:08:39.patch


 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1688) Add authorization interface and naive implementation

2015-04-22 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-1688:

Attachment: KAFKA-1688.patch

 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Parth Brahmbhatt
 Fix For: 0.8.3

 Attachments: KAFKA-1688.patch, KAFKA-1688_2015-04-10_11:08:39.patch


 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 33431: Patch for KAFKA-1688

2015-04-22 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33431/
---

Review request for kafka.


Bugs: KAFKA-1688
https://issues.apache.org/jira/browse/KAFKA-1688


Repository: kafka


Description
---

KAFKA-1688: initial check in.


Merge remote-tracking branch 'origin/trunk' into trunk


Merge remote-tracking branch 'origin/trunk' into 2032


Merge branch '2032' into trunk


KAFKA-1688: Add authorization.


Merge remote-tracking branch 'origin/trunk' into trunk

Conflicts:
core/src/main/scala/kafka/admin/AdminUtils.scala
core/src/main/scala/kafka/admin/TopicCommand.scala
core/src/main/scala/kafka/network/RequestChannel.scala
core/src/main/scala/kafka/server/KafkaApis.scala
core/src/main/scala/kafka/server/KafkaServer.scala
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

Fixing some merge errors.


Merge remote-tracking branch 'origin/trunk' into trunk

Conflicts:
core/src/main/scala/kafka/common/ErrorMapping.scala

Converted some code to idomatic scala.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


Removing some unintended changes.


Reverting the topic config related changes


public classes and interfaces to support pluggable authorizer implementation 
for kafka


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


all public entities for pluggable authorizer support in kafka.


Diffs
-

  core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
c75c68589681b2c9d6eba2b440ce5e58cddf6370 
  core/src/main/scala/kafka/network/RequestChannel.scala 
1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 
  core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION 
  core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION 
  core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION 
  core/src/main/scala/kafka/security/auth/Operation.java PRE-CREATION 
  core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 
b4004aa3a1456d337199aa1245fb0ae61f6add46 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
cfbbd2be550947dd2b3c8c2cab981fa08fb6d859 
  core/src/main/scala/kafka/server/KafkaServer.scala 
c63f4ba9d622817ea8636d4e6135fba917ce085a 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
62d183248e3be4c83d2c768e762f61f92448c6a6 

Diff: https://reviews.apache.org/r/33431/diff/


Testing
---


Thanks,

Parth Brahmbhatt



[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-04-22 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506527#comment-14506527
 ] 

Dmitry Bugaychenko commented on KAFKA-2029:
---

The problem is that the state is distributed - not only controller, but each 
broker has it considering itself either as leader or as follower. When doing a 
transition we need to make sure all parties completed it before switching to 
next partition. Right now transition is implemented via asynchronous messages 
to brokers without waiting for replies - in this case event single threaded 
controller might easiliy bring system into incosistent sate (having multiple 
brokers treating themselves as leades for same partition)

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical
 Attachments: KAFKA-2029.patch, KAFKA-2029.patch


 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
   // ODKL Patch: prevent infinite hang on trying to send message to a 
 dead broker.
   // TODO: Move timeout to config
   if (!stateInfo.messageQueue.offer((request, callback), 10, 
 TimeUnit.SECONDS)) {
 error(Timed out trying to send message to broker  + 
 brokerId.toString)
 // Do not throw, as it brings controller into completely 
 non-functional state
 // Controller to broker state change requests batch is not empty 
 while creating a new one
 //throw new IllegalStateException(Timed out trying to send 
 message to broker  + brokerId.toString)
   }
 case None =
   warn(Not sending request %s to broker %d, since it is 
 offline..format(request, brokerId))
   }
 }
   }
 {code}
 # When broker which is a controler starts shut down if auto leader rebalance 
 is running it deadlocks in the end (shutdown thread owns the lock and waits 
 for rebalance thread to 

[jira] [Issue Comment Deleted] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-04-22 Thread Dmitry Bugaychenko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Bugaychenko updated KAFKA-2029:
--
Comment: was deleted

(was: The problem is that the state is distributed - not only controller, but 
each broker has it considering itself either as leader or as follower. When 
doing a transition we need to make sure all parties completed it before 
switching to next partition. Right now transition is implemented via 
asynchronous messages to brokers without waiting for replies - in this case 
event single threaded controller might easiliy bring system into incosistent 
sate (having multiple brokers treating themselves as leades for same partition))

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical
 Attachments: KAFKA-2029.patch, KAFKA-2029.patch


 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
   // ODKL Patch: prevent infinite hang on trying to send message to a 
 dead broker.
   // TODO: Move timeout to config
   if (!stateInfo.messageQueue.offer((request, callback), 10, 
 TimeUnit.SECONDS)) {
 error(Timed out trying to send message to broker  + 
 brokerId.toString)
 // Do not throw, as it brings controller into completely 
 non-functional state
 // Controller to broker state change requests batch is not empty 
 while creating a new one
 //throw new IllegalStateException(Timed out trying to send 
 message to broker  + brokerId.toString)
   }
 case None =
   warn(Not sending request %s to broker %d, since it is 
 offline..format(request, brokerId))
   }
 }
   }
 {code}
 # When broker which is a controler starts shut down if auto leader rebalance 
 is running it deadlocks in the end (shutdown thread owns the lock and waits 
 for rebalance thread to exit and rebalance 

[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-04-22 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506528#comment-14506528
 ] 

Dmitry Bugaychenko commented on KAFKA-2029:
---

The problem is that the state is distributed - not only controller, but each 
broker has it considering itself either as leader or as follower. When doing a 
transition we need to make sure all parties completed it before switching to 
next partition. Right now transition is implemented via asynchronous messages 
to brokers without waiting for replies - in this case event single threaded 
controller might easiliy bring system into incosistent sate (having multiple 
brokers treating themselves as leades for same partition)

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical
 Attachments: KAFKA-2029.patch, KAFKA-2029.patch


 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
   // ODKL Patch: prevent infinite hang on trying to send message to a 
 dead broker.
   // TODO: Move timeout to config
   if (!stateInfo.messageQueue.offer((request, callback), 10, 
 TimeUnit.SECONDS)) {
 error(Timed out trying to send message to broker  + 
 brokerId.toString)
 // Do not throw, as it brings controller into completely 
 non-functional state
 // Controller to broker state change requests batch is not empty 
 while creating a new one
 //throw new IllegalStateException(Timed out trying to send 
 message to broker  + brokerId.toString)
   }
 case None =
   warn(Not sending request %s to broker %d, since it is 
 offline..format(request, brokerId))
   }
 }
   }
 {code}
 # When broker which is a controler starts shut down if auto leader rebalance 
 is running it deadlocks in the end (shutdown thread owns the lock and waits 
 for rebalance thread to 

[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-22 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507518#comment-14507518
 ] 

Ewen Cheslack-Postava commented on KAFKA-2121:
--

[~stevenz3wu] Yes, it'll be released with 0.8.3.

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Fix For: 0.8.3

 Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
 KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, 
 KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, 
 KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, 
 KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, 
 KAFKA-2121_2015-04-20_22:48:31.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   

Jenkins build is back to normal : KafkaPreCommit #83

2015-04-22 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/83/changes



[jira] [Commented] (KAFKA-2139) Add a separate controller messge queue with higher priority on broker side

2015-04-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507480#comment-14507480
 ] 

Jiangjie Qin commented on KAFKA-2139:
-

Sure, I will be glad to write a wiki and put the stuff I'm thinking in it. I 
can do it this weekend. It will be a significant change and definitely needs a 
lot of discussions.
In term of this proposal, the message from the same TCP connection will still 
be processed in order (Originally I was proposing to add controller message to 
the queue head, that won't work). Current proposal is to have a separate queue 
for controller requests so the requests from controller to broker connection 
are still in order. I like the idea to prioritize messages at network layer and 
probably we can do that.

 Add a separate controller messge queue with higher priority on broker side 
 ---

 Key: KAFKA-2139
 URL: https://issues.apache.org/jira/browse/KAFKA-2139
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin

 This ticket is supposed to be working together with KAFKA-2029. 
 There are two issues with current controller to broker messages.
 1. On the controller side the message are sent without synchronization.
 2. On broker side the controller messages share the same queue as client 
 messages.
 The problem here is that brokers process the controller messages for the same 
 partition at different times and the variation could be big. This causes 
 unnecessary data loss and prolong the preferred leader election / controlled 
 shutdown/ partition reassignment, etc.
 KAFKA-2029 was trying to add a boundary between messages for different 
 partitions. For example, before leader migration for previous partition 
 finishes, the leader migration for next partition won't begin.
 This ticket is trying to let broker process controller messages faster. So 
 the idea is have separate queue to hold controller messages, if there are 
 controller messages, KafkaApi thread will first take care of those messages, 
 otherwise it will proceed messages from clients.
 Those two tickets are not ultimate solution to current controller problems, 
 but just mitigate them with minor code changes. Moving forward, we still need 
 to think about rewriting controller in a cleaner way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-22 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507509#comment-14507509
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

[~guozhang] thanks. would it go in next 0.8.3 release?

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
 KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, 
 KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, 
 KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, 
 KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, 
 KAFKA-2121_2015-04-20_22:48:31.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Tom Graves
Thanks for the explanations Parth.
On the configs questions, the way I see it is its more likely to accidentally 
give everyone access, especially since you have to run a separate command to 
change the acls. If there was some config for defaults, a cluster admin could 
change that to be nobody or certain set of users, then grant others 
permissions.  This would also remove the race between commands.  This is 
something you can always add later though if people request it. 
So in kafka-acl.sh how do I actually tell it what the operation is?kafka-acl.sh 
--topic testtopic --add --grandprincipal user:joe,user:kate 
where does READ, WRITE, etc go?  Can specify as a list so I don't have to run 
this a bunch of times for each.
Do you want to have a --host option for --list so that admins could see what 
acls apply to specific host(s)?
Tom 


 On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:
   

 
FYI, I have modified the KIP to include group as resource. In order to
access “joinGroup” and “commitOFfset” APIs the user will need a read
permission on topic and WRITE permission on group.

I plan to open a VOTE thread by noon if there are no more concerns.

Thanks
Parth

On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALID wrote:

Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll
apologize in advance if you have already covered some of my questions.  I
read through the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

    Done.

2) Does the Authorizer class need a setAcls?  Rather then just add to be
able to set to explicit list and overwrite what was there?  I see the
kafka-acl.sh lists a removeall so I guess you could do removeall and then
add.  I also don't see a removeall in the Authorizer class, is it going
to loop through them all to remove each one?

    There is an overloaded version of removeAcls in the interface that takes
in resource as the only input and as described in the javadoc all the acls
attached to that resource will be deleted. To cover the setAcl use case
the caller can first call remove and then add.

3) Can someone tell me what the use case to do acls based on the hosts?
I can see some possibilities just wondering if we can concrete ones where
one user is allowed from one host but not another.

    I am not sure if I understand the question given the use case you
described in your question is what we are trying to cover with use of
hosts in Acl. There are some additional use cases like “allow access to
any user from host1,host2” but I think primarily it gives the admins the
ability to define acls at a more granular level.

4) I'm a bit unclear how the resource works in the Authorizer class.
From what I see we have 2 resources - topics and cluster.  If I want to
add an acl to allow joe to CREATE for the cluster then I call addAcls
with  Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster?  What
if I want to call addAcls for DESCRIBE on a topic?  Is the resource then
topic or is it the topic name?

    We now have 3 resources(added group), please see the updated doc. The
CREATE acl that you described is correct. For any topic operation you
should use topic name as the resource name and for group the user will
provide groupId as resource name.

5) reassigning partitions is a CLUSTER_ACTION or superuser?  Its not
totally clear to me the differences between these.  what about increasing
# of partitions?

    I see this as an alter topic operation so it is at topic level and the
user must have alter permissions on topic.    

6) groups are mentioned, are we supporting right away or is that a follow
on item? (is there going to be a kafka.supergroups)

    I think it can be a separate jira just for braking down the code review
in smaller chunk. We will support it in first version but I think if we
can not do it for any reason that should not block a release with all the
other authZ work. We made deliberate design choices (like introducing a
principalType in KafkaPrinciapl) to allow supporting groups as an
incremental change.

7) Are there config options for setting acls when I create my topic?  Or
do I have to create my topic and then run the kafka-acl.sh script to set
them?  Although its very small, there would be possible race there that
someone could start producing to topic before acls are set.

    We discussed this yesterday and we agreed to go with kafka-acl.sh. Yes
there is a very very small window of vulnerability but I think that really
does not warrant to change the decision in this case.

8) are there configs for cluster level acl defaults?  Or does it default
to superusers on bringing up new cluster and you have to modify with cli.
thanks,Tom

    No defaults, the default is superusers will have full access. I don’t
think making assumptions about ones security requirement should be our
burden.


    On Tuesday, April 21, 2015 7:10 PM, Parth Brahmbhatt

[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507450#comment-14507450
 ] 

Guozhang Wang commented on KAFKA-2121:
--

Thanks for the patch Steven, committed to trunk.

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
 KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, 
 KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, 
 KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, 
 KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, 
 KAFKA-2121_2015-04-20_22:48:31.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we 

[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2121:
-
Resolution: Fixed
  Assignee: Steven Zhen Wu  (was: Jun Rao)
Status: Resolved  (was: Patch Available)

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
 KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, 
 KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, 
 KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, 
 KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, 
 KAFKA-2121_2015-04-20_22:48:31.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter 

[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-04-22 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507513#comment-14507513
 ] 

Gwen Shapira commented on KAFKA-1688:
-

I agree this will be easier to review with subtasks.

Maybe take KAFKA-1688 out of KAFKA-1682, open these as subtasks and link to 
KAFKA-1682 as related?

 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Parth Brahmbhatt
 Fix For: 0.8.3

 Attachments: KAFKA-1688.patch, KAFKA-1688_2015-04-10_11:08:39.patch


 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-22 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2121:
-
Fix Version/s: 0.8.3

 prevent potential resource leak in KafkaProducer and KafkaConsumer
 --

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu
 Fix For: 0.8.3

 Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
 KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, 
 KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, 
 KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, 
 KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, 
 KAFKA-2121_2015-04-20_22:48:31.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Parth Brahmbhatt
Please see all the available options here 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface#KIP-11-AuthorizationInterface-AclManagement(CLI)
 . I think it covers both hosts and operations and allows to specify a list for 
both.

Thanks
Parth

From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Date: Wednesday, April 22, 2015 at 11:02 AM
To: Parth Brahmbhatt 
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com, 
dev@kafka.apache.orgmailto:dev@kafka.apache.org 
dev@kafka.apache.orgmailto:dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security

Thanks for the explanations Parth.

On the configs questions, the way I see it is its more likely to accidentally 
give everyone access, especially since you have to run a separate command to 
change the acls. If there was some config for defaults, a cluster admin could 
change that to be nobody or certain set of users, then grant others 
permissions.  This would also remove the race between commands.  This is 
something you can always add later though if people request it.

So in kafka-acl.sh how do I actually tell it what the operation is?
kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate

where does READ, WRITE, etc go?  Can specify as a list so I don't have to run 
this a bunch of times for each.

Do you want to have a --host option for --list so that admins could see what 
acls apply to specific host(s)?

Tom



On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote:



FYI, I have modified the KIP to include group as resource. In order to
access “joinGroup” and “commitOFfset” APIs the user will need a read
permission on topic and WRITE permission on group.

I plan to open a VOTE thread by noon if there are no more concerns.

Thanks
Parth

On 4/22/15, 9:03 AM, Tom Graves 
tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID wrote:

Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll
apologize in advance if you have already covered some of my questions.  I
read through the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

Done.

2) Does the Authorizer class need a setAcls?  Rather then just add to be
able to set to explicit list and overwrite what was there?  I see the
kafka-acl.sh lists a removeall so I guess you could do removeall and then
add.  I also don't see a removeall in the Authorizer class, is it going
to loop through them all to remove each one?

There is an overloaded version of removeAcls in the interface that takes
in resource as the only input and as described in the javadoc all the acls
attached to that resource will be deleted. To cover the setAcl use case
the caller can first call remove and then add.

3) Can someone tell me what the use case to do acls based on the hosts?
I can see some possibilities just wondering if we can concrete ones where
one user is allowed from one host but not another.

I am not sure if I understand the question given the use case you
described in your question is what we are trying to cover with use of
hosts in Acl. There are some additional use cases like “allow access to
any user from host1,host2” but I think primarily it gives the admins the
ability to define acls at a more granular level.

4) I'm a bit unclear how the resource works in the Authorizer class.
From what I see we have 2 resources - topics and cluster.  If I want to
add an acl to allow joe to CREATE for the cluster then I call addAcls
with  Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster?  What
if I want to call addAcls for DESCRIBE on a topic?  Is the resource then
topic or is it the topic name?

We now have 3 resources(added group), please see the updated doc. The
CREATE acl that you described is correct. For any topic operation you
should use topic name as the resource name and for group the user will
provide groupId as resource name.

5) reassigning partitions is a CLUSTER_ACTION or superuser?  Its not
totally clear to me the differences between these.  what about increasing
# of partitions?

I see this as an alter topic operation so it is at topic level and the
user must have alter permissions on topic.

6) groups are mentioned, are we supporting right away or is that a follow
on item? (is there going to be a kafka.supergroups)

I think it can be a separate jira just for braking down the code review
in smaller chunk. We will support it in first version but I think if we
can not do it for any reason that should not block a release with all the
other authZ work. We made deliberate design choices (like introducing a
principalType in KafkaPrinciapl) to allow supporting groups as an
incremental change.

7) Are there config options for setting acls when I create my topic?  Or
do I have 

Re: [DISCUSS] New consumer offset commit API

2015-04-22 Thread Guozhang Wang
Hi Ewen,

I share the same concern you have about 2), that with the new API sync
commit implementation is a bit awkward since we have a single-threaded
design in new consumer. The reason that we need to mute other nodes for
doing coordinator sync operations like join-group / offset commits / offset
fetches is to avoid long blocking due to possible starvation on network
selector, so I think they need to be done still.

On the other hand, I think users using the commit API will usually fall
into three categories:

1) I really care that the offsets to be committed before moving on to fetch
more data, so I will wait FOREVER for it to complete.

2) I do not really care about whether it succeeds or not, so just fire
commit and let's move on; if it fails it fails (and it will be logged).

3) I care if it succeeds or not, but I do not want to wait indefinitely; so
let me know if it does not finish within some timeout or failed (i.e. give
me the exceptions / error codes) and I will handle it.

The current APIs does not handle case 3) above, which sits between BLOCK
FOREVER and DO NOT CARE AT ALL, but most times people would not be very
explicit about the exact timeout, but just knowing it is definite and
reasonably short is good enough. I think for this we probably do not need
an extra timeout / retry settings, but rely on the general request retry
settings; similarly we probably do not need cancel.

So I wonder if we can do a slightly different modification to API like this:

void commit(MapTopicPartition, Long offsets, CommitType type,
ConsumerCommitCallback callback);

For case 1) people call commit(offsets) which will block forever until it
succeeds;

For case 2) people call commit(offsets, async) which will return
immediately, with not callback upon finishes;

For case 3) people call commit(offsets, async, callback), and the
callback will be executed when it finishes or #.request retries has
exhausted.

This API will make much smaller changes to the current implementations as
well. Of course if we have a common scenario where users would really care
about the exact timeout for async commits, then Future may be a good
approach.

Guozhang


On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Ewen,

 This makes sense. People usually do not want to stop consuming when
 committing offsets.

 One corner case about async commit with retries I am thinking is that it
 is possible that two offset commits interleave with each other and that
 might create problem. Like you said maybe we can cancel the previous one.

 Another thing is that whether the future mechanism will only be applied to
 auto commit or it will also be used in manual commit? Because in new
 consumer we allow user to provide an offset map for offset commit. Simply
 canceling a previous pending offset commit does not seem to be ideal in
 this case because the two commits could be for different partitions.

 Thanks.

 Jiangjie (Becket) Qin

 On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 I'd like to get some feedback on changing the offset commit API in the new
 consumer. Since this is user-facing API I wanted to make sure this gets
 better visibility than the JIRA (
 https://issues.apache.org/jira/browse/KAFKA-2123) might.
 
 The motivation is to make it possible to do async commits but be able to
 tell when the commit completes/fails. I'm suggesting changing the API from
 
 void commit(Map offsets, CommitType)
 
 to
 
 FutureVoid commit(MapTopicPartition, Long offsets,
 ConsumerCommitCallback callback);
 
 which matches the approach used for the producer. The
 ConsumerCommitCallback only has one method:
 
 public void onCompletion(Exception exception);
 
 This enables a few different use cases:
 
 * Blocking commit via Future.get(), and blocking with timeouts via
 Future.get(long, TimeUnit)
 * See exceptions via the future (see discussion of retries below)
 * Callback-based notification so you can keep processing messages and only
 take action if something goes wrong, takes too long, etc. This is the use
 case that motivated
 * Fire and forget commits via a shorthand commit() API and ignoring the
 resulting future.
 
 One big difference between this and the producer API is that there isn't
 any result (except maybe an exception) from commitOffsets. This leads to
 the somewhat awkward FutureVoid signature. I personally prefer that to
 the sync/async flag, especially since it also provides a non-blocking
 interface for checking whether the commit is complete.
 
 I posted a WIP patch to the JIRA. In the progress of making it I found a
 few issues that might be worth discussing:
 
 1. Retries. In the old approach, this was trivial since it only applied to
 synchronous calls, so we could just loop until the request was successful.
 Do we want to start introducing a retries mechanism here, and should it
 apply to all types of requests or are we going to end up with a couple of
 different retry settings 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Tom Graves
Sorry I must be missing it, that is the doc I'm looking at.
I don't see how you specify the operation?
I don't see a normal --host that goes with list.  I see revokehost and 
granthost, neither I would think apply to list since --principal. says it 
applies to --list.
Copying text here in case I'm looking at the wrong thing:
   
   - kafka-acl.sh --help   
A tool to manage acls for kafka cluster and topics. Following are the options   
--topic topicname name of topic whose acls you want to add/remove.   
--cluster this will add/edit cluster level acls where you can control which 
users can create topics or list all topics for the cluster or send control 
messages to brokers.   
--add indicates you are trying to add Acl   
--remove indicates you are trying to remove acl   
--grantprincipal principalType: principalName a comma separated list of 
principalType: principalName where currently supported principalType is user. 
When this option is not present but --granthost is specified it will default to 
* : * which is wild card that matches all types and all users.    
--granthost host1,host2 a comma separated list of hosts from which the 
--grantpricipal is allowed the actions. When this option is not present but 
--grantprincipal is specified, it defaults to * which is wildcard matching all 
hosts.   
--revokeprincipal principalType: principalName a comma separated list of 
principalType: principalName where currently supported principalType is user. 
When this option is not present but --revokehost  is specified , it defaults to 
* : * which is wild card that matches all types and all users. Revoke should 
only be used to limit the access added by some other grants. A revoke without 
an accompanying grant is meaningless as by default only principals in grant 
lists are allowed access based on their acls and everyone else is denied.   
revokehost host1,host2 a comma separated list of hosts from which the 
--revokeprincipal will be denied actions. When this option is not present but 
--revokeprincipal is specified, it defaults to * which is wildcard matching all 
hosts.   
--removeAll removes all acls for a topic, only works with topic and not with 
cluster.   
--list list all acls   
--principal principalType: principalName Used only with --list to list all 
acls for a principal
 



 On Wednesday, April 22, 2015 1:08 PM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:
   

 Please see all the available options here 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface#KIP-11-AuthorizationInterface-AclManagement(CLI)
 . I think it covers both hosts and operations and allows to specify a list for 
both.
ThanksParth
From: Tom Graves tgraves...@yahoo.com
Reply-To: Tom Graves tgraves...@yahoo.com
Date: Wednesday, April 22, 2015 at 11:02 AM
To: Parth Brahmbhatt pbrahmbh...@hortonworks.com, dev@kafka.apache.org 
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security

Thanks for the explanations Parth.
On the configs questions, the way I see it is its more likely to accidentally 
give everyone access, especially since you have to run a separate command to 
change the acls. If there was some config for defaults, a cluster admin could 
change that to be nobody or certain set of users, then grant others 
permissions.  This would also remove the race between commands.  This is 
something you can always add later though if people request it. 
So in kafka-acl.sh how do I actually tell it what the operation is?kafka-acl.sh 
--topic testtopic --add --grandprincipal user:joe,user:kate 
where does READ, WRITE, etc go?  Can specify as a list so I don't have to run 
this a bunch of times for each.
Do you want to have a --host option for --list so that admins could see what 
acls apply to specific host(s)?
Tom


On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:



FYI, I have modified the KIP to include group as resource. In order to
access “joinGroup” and “commitOFfset” APIs the user will need a read
permission on topic and WRITE permission on group.

I plan to open a VOTE thread by noon if there are no more concerns.

Thanks
Parth

On 4/22/15, 9:03 AM, Tom Graves tgraves...@yahoo.com.INVALID wrote:

Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll
apologize in advance if you have already covered some of my questions.  I
read through the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

    Done.

2) Does the Authorizer class need a setAcls?  Rather then just add to be
able to set to explicit list and overwrite what was there?  I see the
kafka-acl.sh lists a removeall so I guess you could do removeall and then
add.  I also don't see a removeall in the Authorizer class, is it going
to loop through them all to remove each one?

    There is an overloaded version of removeAcls in the interface that takes
in resource as the only 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Parth Brahmbhatt
Sorry I missed your last questions. I am +0 on adding ―host option for
―list, we could add it for symmetry. Again if this is only a CLI change it
can be added later if you mean adding this in authorizer interface then we
should make a decision now.

Given a choice I would like to actually keep only one option which is
resource based get (remove even the get based on principal). I see those
(getAcl for principal or host) as special filtering case which can easily
be achieved by a third party tool by doing list all topics and calling
getAcls for each topic and applying filtering logic on that.  I really
don’t see the need to make those first class citizens of the authorizer
interface given these kind of queries will be issued outside of broker JVM
so they will not benefit from the caching and because the storage will be
indexed on resource both these options even as a first class API will just
scan all topic acls and apply filtering logic.

Thanks
Parth

On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Please see all the available options here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I
nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it
covers both hosts and operations and allows to specify a list for both.

Thanks
Parth

From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Date: Wednesday, April 22, 2015 at 11:02 AM
To: Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com,
dev@kafka.apache.orgmailto:dev@kafka.apache.org
dev@kafka.apache.orgmailto:dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security

Thanks for the explanations Parth.

On the configs questions, the way I see it is its more likely to
accidentally give everyone access, especially since you have to run a
separate command to change the acls. If there was some config for
defaults, a cluster admin could change that to be nobody or certain set
of users, then grant others permissions.  This would also remove the race
between commands.  This is something you can always add later though if
people request it.

So in kafka-acl.sh how do I actually tell it what the operation is?
kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate

where does READ, WRITE, etc go?  Can specify as a list so I don't have to
run this a bunch of times for each.

Do you want to have a --host option for --list so that admins could see
what acls apply to specific host(s)?

Tom



On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote:



FYI, I have modified the KIP to include group as resource. In order to
access “joinGroup” and “commitOFfset” APIs the user will need a read
permission on topic and WRITE permission on group.

I plan to open a VOTE thread by noon if there are no more concerns.

Thanks
Parth

On 4/22/15, 9:03 AM, Tom Graves
tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID wrote:

Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll
apologize in advance if you have already covered some of my questions.  I
read through the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

Done.

2) Does the Authorizer class need a setAcls?  Rather then just add to be
able to set to explicit list and overwrite what was there?  I see the
kafka-acl.sh lists a removeall so I guess you could do removeall and then
add.  I also don't see a removeall in the Authorizer class, is it going
to loop through them all to remove each one?

There is an overloaded version of removeAcls in the interface that
takes
in resource as the only input and as described in the javadoc all the acls
attached to that resource will be deleted. To cover the setAcl use case
the caller can first call remove and then add.

3) Can someone tell me what the use case to do acls based on the hosts?
I can see some possibilities just wondering if we can concrete ones where
one user is allowed from one host but not another.

I am not sure if I understand the question given the use case you
described in your question is what we are trying to cover with use of
hosts in Acl. There are some additional use cases like “allow access to
any user from host1,host2” but I think primarily it gives the admins the
ability to define acls at a more granular level.

4) I'm a bit unclear how the resource works in the Authorizer class.
From what I see we have 2 resources - topics and cluster.  If I want to
add an acl to allow joe to CREATE for the cluster then I call addAcls
with  Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster?  What
if I want to call addAcls for DESCRIBE on a topic?  Is the resource then
topic or is it the topic name?

We now have 3 resources(added group), please see the updated doc. The
CREATE acl that you 

Re: [DISCUSS] New consumer offset commit API

2015-04-22 Thread Jay Kreps
I second Guozhang's proposal. I do think we need the callback. The current
state is that for async commits you actually don't know if it succeeded.
However there is a totally valid case where you do need to know if it
succeeded but don't need to block, and without the callback you are stuck.
I think the futures will likely cause problems since blocking on the future
precludes polling which would allow it to complete.

-Jay

On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Ewen,

 I share the same concern you have about 2), that with the new API sync
 commit implementation is a bit awkward since we have a single-threaded
 design in new consumer. The reason that we need to mute other nodes for
 doing coordinator sync operations like join-group / offset commits / offset
 fetches is to avoid long blocking due to possible starvation on network
 selector, so I think they need to be done still.

 On the other hand, I think users using the commit API will usually fall
 into three categories:

 1) I really care that the offsets to be committed before moving on to fetch
 more data, so I will wait FOREVER for it to complete.

 2) I do not really care about whether it succeeds or not, so just fire
 commit and let's move on; if it fails it fails (and it will be logged).

 3) I care if it succeeds or not, but I do not want to wait indefinitely; so
 let me know if it does not finish within some timeout or failed (i.e. give
 me the exceptions / error codes) and I will handle it.

 The current APIs does not handle case 3) above, which sits between BLOCK
 FOREVER and DO NOT CARE AT ALL, but most times people would not be very
 explicit about the exact timeout, but just knowing it is definite and
 reasonably short is good enough. I think for this we probably do not need
 an extra timeout / retry settings, but rely on the general request retry
 settings; similarly we probably do not need cancel.

 So I wonder if we can do a slightly different modification to API like
 this:

 void commit(MapTopicPartition, Long offsets, CommitType type,
 ConsumerCommitCallback callback);

 For case 1) people call commit(offsets) which will block forever until it
 succeeds;

 For case 2) people call commit(offsets, async) which will return
 immediately, with not callback upon finishes;

 For case 3) people call commit(offsets, async, callback), and the
 callback will be executed when it finishes or #.request retries has
 exhausted.

 This API will make much smaller changes to the current implementations as
 well. Of course if we have a common scenario where users would really care
 about the exact timeout for async commits, then Future may be a good
 approach.

 Guozhang


 On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  Hey Ewen,
 
  This makes sense. People usually do not want to stop consuming when
  committing offsets.
 
  One corner case about async commit with retries I am thinking is that it
  is possible that two offset commits interleave with each other and that
  might create problem. Like you said maybe we can cancel the previous one.
 
  Another thing is that whether the future mechanism will only be applied
 to
  auto commit or it will also be used in manual commit? Because in new
  consumer we allow user to provide an offset map for offset commit. Simply
  canceling a previous pending offset commit does not seem to be ideal in
  this case because the two commits could be for different partitions.
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote:
 
  I'd like to get some feedback on changing the offset commit API in the
 new
  consumer. Since this is user-facing API I wanted to make sure this gets
  better visibility than the JIRA (
  https://issues.apache.org/jira/browse/KAFKA-2123) might.
  
  The motivation is to make it possible to do async commits but be able to
  tell when the commit completes/fails. I'm suggesting changing the API
 from
  
  void commit(Map offsets, CommitType)
  
  to
  
  FutureVoid commit(MapTopicPartition, Long offsets,
  ConsumerCommitCallback callback);
  
  which matches the approach used for the producer. The
  ConsumerCommitCallback only has one method:
  
  public void onCompletion(Exception exception);
  
  This enables a few different use cases:
  
  * Blocking commit via Future.get(), and blocking with timeouts via
  Future.get(long, TimeUnit)
  * See exceptions via the future (see discussion of retries below)
  * Callback-based notification so you can keep processing messages and
 only
  take action if something goes wrong, takes too long, etc. This is the
 use
  case that motivated
  * Fire and forget commits via a shorthand commit() API and ignoring the
  resulting future.
  
  One big difference between this and the producer API is that there isn't
  any result (except maybe an exception) from commitOffsets. This leads to
  the somewhat awkward FutureVoid signature. 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Tom Graves
Thanks!  Makes sense.
Tom 


 On Wednesday, April 22, 2015 1:25 PM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:
   

 You are right , I forgot to mention the ―operation option in CLI , I just
added it. Sorry for about the confusion.

Thanks
Parth

On 4/22/15, 11:22 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Sorry I missed your last questions. I am +0 on adding ―host option for
―list, we could add it for symmetry. Again if this is only a CLI change it
can be added later if you mean adding this in authorizer interface then we
should make a decision now.

Given a choice I would like to actually keep only one option which is
resource based get (remove even the get based on principal). I see those
(getAcl for principal or host) as special filtering case which can easily
be achieved by a third party tool by doing list all topics and calling
getAcls for each topic and applying filtering logic on that.  I really
don’t see the need to make those first class citizens of the authorizer
interface given these kind of queries will be issued outside of broker JVM
so they will not benefit from the caching and because the storage will be
indexed on resource both these options even as a first class API will just
scan all topic acls and apply filtering logic.

Thanks
Parth

On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Please see all the available options here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+
I
nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it
covers both hosts and operations and allows to specify a list for both.

Thanks
Parth

From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Date: Wednesday, April 22, 2015 at 11:02 AM
To: Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com,
dev@kafka.apache.orgmailto:dev@kafka.apache.org
dev@kafka.apache.orgmailto:dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security

Thanks for the explanations Parth.

On the configs questions, the way I see it is its more likely to
accidentally give everyone access, especially since you have to run a
separate command to change the acls. If there was some config for
defaults, a cluster admin could change that to be nobody or certain set
of users, then grant others permissions.  This would also remove the race
between commands.  This is something you can always add later though if
people request it.

So in kafka-acl.sh how do I actually tell it what the operation is?
kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate

where does READ, WRITE, etc go?  Can specify as a list so I don't have to
run this a bunch of times for each.

Do you want to have a --host option for --list so that admins could see
what acls apply to specific host(s)?

Tom



On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote:



FYI, I have modified the KIP to include group as resource. In order to
access “joinGroup” and “commitOFfset” APIs the user will need a read
permission on topic and WRITE permission on group.

I plan to open a VOTE thread by noon if there are no more concerns.

Thanks
Parth

On 4/22/15, 9:03 AM, Tom Graves
tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID
wrote:

Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll
apologize in advance if you have already covered some of my questions.
I
read through the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

    Done.

2) Does the Authorizer class need a setAcls?  Rather then just add to be
able to set to explicit list and overwrite what was there?  I see the
kafka-acl.sh lists a removeall so I guess you could do removeall and
then
add.  I also don't see a removeall in the Authorizer class, is it going
to loop through them all to remove each one?

    There is an overloaded version of removeAcls in the interface that
takes
in resource as the only input and as described in the javadoc all the
acls
attached to that resource will be deleted. To cover the setAcl use case
the caller can first call remove and then add.

3) Can someone tell me what the use case to do acls based on the hosts?
I can see some possibilities just wondering if we can concrete ones
where
one user is allowed from one host but not another.

    I am not sure if I understand the question given the use case you
described in your question is what we are trying to cover with use of
hosts in Acl. There are some additional use cases like “allow access to
any user from host1,host2” but I think primarily it gives the admins the
ability to define acls at a more granular level.

4) I'm a bit unclear how the resource works in the Authorizer class.
From what I see we have 2 resources - topics and cluster.  If I want to

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Jeff Holoman
Parth,

This is a long thread, so trying to keep up here, sorry if this has been
covered before. First, great job on the KIP proposal and work so far.

Are we sure that we want to tie host level access to a given user? My
understanding is that the ACL will be (omitting some fields)

user_a, host1, host2, host3
user_b, host1, host2, host3

So there would potentially be a lot of redundancy in the configs. Does it
make sense to have hosts be at the same level as principal in the
hierarchy? This way you could just blanket the allowed / denied hosts and
only have to worry about the users. So if you follow this, then

we can wildcard the user so we can have a separate list of just host-based
access. What's the order that the perms would be evaluated if a there was
more than one match on a principal ?

Is the thought that there wouldn't usually be much overlap on hosts? I
guess I can imagine a scenario where I want to offline/online access to a
particular hosts or set of hosts and if there was overlap, I'm doing a
bunch of alter commands for just a single host. Maybe this is too contrived
an example?

I agree that having this level of granularity gives flexibility but I
wonder if people will actually use it and not just * the hosts for a given
user and create separate global list as i mentioned above?

The only other system I know of that ties users with hosts for access is
MySql and I don't love that model. Companies usually standardize on group
authorization anyway, are we complicating that issue with the inclusion of
hosts attached to users? Additionally I worry about the debt of big JSON
configs in the first place, most non-developers find them non-intuitive
already, so anything to ease this I think would be beneficial.


Thanks

Jeff

On Wed, Apr 22, 2015 at 2:22 PM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:

 Sorry I missed your last questions. I am +0 on adding ―host option for
 ―list, we could add it for symmetry. Again if this is only a CLI change it
 can be added later if you mean adding this in authorizer interface then we
 should make a decision now.

 Given a choice I would like to actually keep only one option which is
 resource based get (remove even the get based on principal). I see those
 (getAcl for principal or host) as special filtering case which can easily
 be achieved by a third party tool by doing list all topics and calling
 getAcls for each topic and applying filtering logic on that.  I really
 don’t see the need to make those first class citizens of the authorizer
 interface given these kind of queries will be issued outside of broker JVM
 so they will not benefit from the caching and because the storage will be
 indexed on resource both these options even as a first class API will just
 scan all topic acls and apply filtering logic.

 Thanks
 Parth

 On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
 wrote:

 Please see all the available options here
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I
 nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it
 covers both hosts and operations and allows to specify a list for both.
 
 Thanks
 Parth
 
 From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
 Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
 Date: Wednesday, April 22, 2015 at 11:02 AM
 To: Parth Brahmbhatt
 pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com,
 dev@kafka.apache.orgmailto:dev@kafka.apache.org
 dev@kafka.apache.orgmailto:dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security
 
 Thanks for the explanations Parth.
 
 On the configs questions, the way I see it is its more likely to
 accidentally give everyone access, especially since you have to run a
 separate command to change the acls. If there was some config for
 defaults, a cluster admin could change that to be nobody or certain set
 of users, then grant others permissions.  This would also remove the race
 between commands.  This is something you can always add later though if
 people request it.
 
 So in kafka-acl.sh how do I actually tell it what the operation is?
 kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate
 
 where does READ, WRITE, etc go?  Can specify as a list so I don't have to
 run this a bunch of times for each.
 
 Do you want to have a --host option for --list so that admins could see
 what acls apply to specific host(s)?
 
 Tom
 
 
 
 On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt
 pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote:
 
 
 
 FYI, I have modified the KIP to include group as resource. In order to
 access “joinGroup” and “commitOFfset” APIs the user will need a read
 permission on topic and WRITE permission on group.
 
 I plan to open a VOTE thread by noon if there are no more concerns.
 
 Thanks
 Parth
 
 On 4/22/15, 9:03 AM, Tom Graves
 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Parth Brahmbhatt
You are right , I forgot to mention the ―operation option in CLI , I just
added it. Sorry for about the confusion.

Thanks
Parth

On 4/22/15, 11:22 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Sorry I missed your last questions. I am +0 on adding ―host option for
―list, we could add it for symmetry. Again if this is only a CLI change it
can be added later if you mean adding this in authorizer interface then we
should make a decision now.

Given a choice I would like to actually keep only one option which is
resource based get (remove even the get based on principal). I see those
(getAcl for principal or host) as special filtering case which can easily
be achieved by a third party tool by doing list all topics and calling
getAcls for each topic and applying filtering logic on that.  I really
don’t see the need to make those first class citizens of the authorizer
interface given these kind of queries will be issued outside of broker JVM
so they will not benefit from the caching and because the storage will be
indexed on resource both these options even as a first class API will just
scan all topic acls and apply filtering logic.

Thanks
Parth

On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Please see all the available options here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+
I
nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it
covers both hosts and operations and allows to specify a list for both.

Thanks
Parth

From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Date: Wednesday, April 22, 2015 at 11:02 AM
To: Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com,
dev@kafka.apache.orgmailto:dev@kafka.apache.org
dev@kafka.apache.orgmailto:dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security

Thanks for the explanations Parth.

On the configs questions, the way I see it is its more likely to
accidentally give everyone access, especially since you have to run a
separate command to change the acls. If there was some config for
defaults, a cluster admin could change that to be nobody or certain set
of users, then grant others permissions.  This would also remove the race
between commands.  This is something you can always add later though if
people request it.

So in kafka-acl.sh how do I actually tell it what the operation is?
kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate

where does READ, WRITE, etc go?  Can specify as a list so I don't have to
run this a bunch of times for each.

Do you want to have a --host option for --list so that admins could see
what acls apply to specific host(s)?

Tom



On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote:



FYI, I have modified the KIP to include group as resource. In order to
access “joinGroup” and “commitOFfset” APIs the user will need a read
permission on topic and WRITE permission on group.

I plan to open a VOTE thread by noon if there are no more concerns.

Thanks
Parth

On 4/22/15, 9:03 AM, Tom Graves
tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID
wrote:

Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll
apologize in advance if you have already covered some of my questions.
I
read through the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

Done.

2) Does the Authorizer class need a setAcls?  Rather then just add to be
able to set to explicit list and overwrite what was there?  I see the
kafka-acl.sh lists a removeall so I guess you could do removeall and
then
add.  I also don't see a removeall in the Authorizer class, is it going
to loop through them all to remove each one?

There is an overloaded version of removeAcls in the interface that
takes
in resource as the only input and as described in the javadoc all the
acls
attached to that resource will be deleted. To cover the setAcl use case
the caller can first call remove and then add.

3) Can someone tell me what the use case to do acls based on the hosts?
I can see some possibilities just wondering if we can concrete ones
where
one user is allowed from one host but not another.

I am not sure if I understand the question given the use case you
described in your question is what we are trying to cover with use of
hosts in Acl. There are some additional use cases like “allow access to
any user from host1,host2” but I think primarily it gives the admins the
ability to define acls at a more granular level.

4) I'm a bit unclear how the resource works in the Authorizer class.
From what I see we have 2 resources - topics and cluster.  If I want to
add an acl to allow joe to CREATE for the cluster then I call addAcls
with  Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster?  

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Tom Graves

What about specifying the operation when I go to add or remove?
Tom 


 On Wednesday, April 22, 2015 1:23 PM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:
   

 Sorry I missed your last questions. I am +0 on adding ―host option for
―list, we could add it for symmetry. Again if this is only a CLI change it
can be added later if you mean adding this in authorizer interface then we
should make a decision now.

Given a choice I would like to actually keep only one option which is
resource based get (remove even the get based on principal). I see those
(getAcl for principal or host) as special filtering case which can easily
be achieved by a third party tool by doing list all topics and calling
getAcls for each topic and applying filtering logic on that.  I really
don’t see the need to make those first class citizens of the authorizer
interface given these kind of queries will be issued outside of broker JVM
so they will not benefit from the caching and because the storage will be
indexed on resource both these options even as a first class API will just
scan all topic acls and apply filtering logic.

Thanks
Parth

On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
wrote:

Please see all the available options here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I
nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it
covers both hosts and operations and allows to specify a list for both.

Thanks
Parth

From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
Date: Wednesday, April 22, 2015 at 11:02 AM
To: Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com,
dev@kafka.apache.orgmailto:dev@kafka.apache.org
dev@kafka.apache.orgmailto:dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security

Thanks for the explanations Parth.

On the configs questions, the way I see it is its more likely to
accidentally give everyone access, especially since you have to run a
separate command to change the acls. If there was some config for
defaults, a cluster admin could change that to be nobody or certain set
of users, then grant others permissions.  This would also remove the race
between commands.  This is something you can always add later though if
people request it.

So in kafka-acl.sh how do I actually tell it what the operation is?
kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate

where does READ, WRITE, etc go?  Can specify as a list so I don't have to
run this a bunch of times for each.

Do you want to have a --host option for --list so that admins could see
what acls apply to specific host(s)?

Tom



On Wednesday, April 22, 2015 11:38 AM, Parth Brahmbhatt
pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com wrote:



FYI, I have modified the KIP to include group as resource. In order to
access “joinGroup” and “commitOFfset” APIs the user will need a read
permission on topic and WRITE permission on group.

I plan to open a VOTE thread by noon if there are no more concerns.

Thanks
Parth

On 4/22/15, 9:03 AM, Tom Graves
tgraves...@yahoo.com.INVALIDmailto:tgraves...@yahoo.com.INVALID wrote:

Hey everyone,
Sorry to jump in on the conversation so late. I'm new to Kafka. I'll
apologize in advance if you have already covered some of my questions.  I
read through the wiki and had some comments and questions.
1) public enum Operation needs EDIT changed to ALTER

    Done.

2) Does the Authorizer class need a setAcls?  Rather then just add to be
able to set to explicit list and overwrite what was there?  I see the
kafka-acl.sh lists a removeall so I guess you could do removeall and then
add.  I also don't see a removeall in the Authorizer class, is it going
to loop through them all to remove each one?

    There is an overloaded version of removeAcls in the interface that
takes
in resource as the only input and as described in the javadoc all the acls
attached to that resource will be deleted. To cover the setAcl use case
the caller can first call remove and then add.

3) Can someone tell me what the use case to do acls based on the hosts?
I can see some possibilities just wondering if we can concrete ones where
one user is allowed from one host but not another.

    I am not sure if I understand the question given the use case you
described in your question is what we are trying to cover with use of
hosts in Acl. There are some additional use cases like “allow access to
any user from host1,host2” but I think primarily it gives the admins the
ability to define acls at a more granular level.

4) I'm a bit unclear how the resource works in the Authorizer class.
From what I see we have 2 resources - topics and cluster.  If I want to
add an acl to allow joe to CREATE for the cluster then I call addAcls
with  Acl(user: joe, ALLOW, Set(*), Set(CREATE)) and cluster?  What
if I want to call addAcls 

Re: [DISCUSS] New consumer offset commit API

2015-04-22 Thread Bhavesh Mistry
Hi Ewen,

Only time I can think of where Application needs to know result of offset
was committed or not during graceful shutdown and/or
Runtime.addShutdownHook() so consumer application does not get duplicated
records upon restart or does not have to deal with eliminating  already
process offset.  Only thing that consumer application will have to handle
is after XX retry failure to commit offset.  Or would prefer application to
manage this last offset commit when offset can not be commit due to
failure, connection timeout or any other failure case ?

Thanks,
Bhavesh



On Wed, Apr 22, 2015 at 11:20 AM, Jay Kreps jay.kr...@gmail.com wrote:

 I second Guozhang's proposal. I do think we need the callback. The current
 state is that for async commits you actually don't know if it succeeded.
 However there is a totally valid case where you do need to know if it
 succeeded but don't need to block, and without the callback you are stuck.
 I think the futures will likely cause problems since blocking on the future
 precludes polling which would allow it to complete.

 -Jay

 On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang wangg...@gmail.com
 wrote:

  Hi Ewen,
 
  I share the same concern you have about 2), that with the new API sync
  commit implementation is a bit awkward since we have a single-threaded
  design in new consumer. The reason that we need to mute other nodes for
  doing coordinator sync operations like join-group / offset commits /
 offset
  fetches is to avoid long blocking due to possible starvation on network
  selector, so I think they need to be done still.
 
  On the other hand, I think users using the commit API will usually fall
  into three categories:
 
  1) I really care that the offsets to be committed before moving on to
 fetch
  more data, so I will wait FOREVER for it to complete.
 
  2) I do not really care about whether it succeeds or not, so just fire
  commit and let's move on; if it fails it fails (and it will be logged).
 
  3) I care if it succeeds or not, but I do not want to wait indefinitely;
 so
  let me know if it does not finish within some timeout or failed (i.e.
 give
  me the exceptions / error codes) and I will handle it.
 
  The current APIs does not handle case 3) above, which sits between BLOCK
  FOREVER and DO NOT CARE AT ALL, but most times people would not be very
  explicit about the exact timeout, but just knowing it is definite and
  reasonably short is good enough. I think for this we probably do not need
  an extra timeout / retry settings, but rely on the general request retry
  settings; similarly we probably do not need cancel.
 
  So I wonder if we can do a slightly different modification to API like
  this:
 
  void commit(MapTopicPartition, Long offsets, CommitType type,
  ConsumerCommitCallback callback);
 
  For case 1) people call commit(offsets) which will block forever until
 it
  succeeds;
 
  For case 2) people call commit(offsets, async) which will return
  immediately, with not callback upon finishes;
 
  For case 3) people call commit(offsets, async, callback), and the
  callback will be executed when it finishes or #.request retries has
  exhausted.
 
  This API will make much smaller changes to the current implementations as
  well. Of course if we have a common scenario where users would really
 care
  about the exact timeout for async commits, then Future may be a good
  approach.
 
  Guozhang
 
 
  On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
 
   Hey Ewen,
  
   This makes sense. People usually do not want to stop consuming when
   committing offsets.
  
   One corner case about async commit with retries I am thinking is that
 it
   is possible that two offset commits interleave with each other and that
   might create problem. Like you said maybe we can cancel the previous
 one.
  
   Another thing is that whether the future mechanism will only be applied
  to
   auto commit or it will also be used in manual commit? Because in new
   consumer we allow user to provide an offset map for offset commit.
 Simply
   canceling a previous pending offset commit does not seem to be ideal in
   this case because the two commits could be for different partitions.
  
   Thanks.
  
   Jiangjie (Becket) Qin
  
   On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  
   I'd like to get some feedback on changing the offset commit API in the
  new
   consumer. Since this is user-facing API I wanted to make sure this
 gets
   better visibility than the JIRA (
   https://issues.apache.org/jira/browse/KAFKA-2123) might.
   
   The motivation is to make it possible to do async commits but be able
 to
   tell when the commit completes/fails. I'm suggesting changing the API
  from
   
   void commit(Map offsets, CommitType)
   
   to
   
   FutureVoid commit(MapTopicPartition, Long offsets,
   ConsumerCommitCallback callback);
   
   which matches the approach used for the producer. The
   

[jira] [Commented] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.

2015-04-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14508203#comment-14508203
 ] 

Jiangjie Qin commented on KAFKA-2138:
-

Updated reviewboard https://reviews.apache.org/r/33417/diff/
 against branch origin/trunk

 KafkaProducer does not honor the retry backoff time.
 

 Key: KAFKA-2138
 URL: https://issues.apache.org/jira/browse/KAFKA-2138
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Critical
 Attachments: KAFKA-2138.patch, KAFKA-2138_2015-04-22_17:19:33.patch


 In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are 
 not checking it in drain() as well.
 The problem is that if we have two partitions both on the same node, suppose 
 Partition 1 should backoff while partition 2 should not. Currently partition 
 1's backoff time will be ignored.
 We should check the lastAttemptMs in drain() as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33417: Patch for KAFKA-2138

2015-04-22 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33417/
---

(Updated April 23, 2015, 12:19 a.m.)


Review request for kafka.


Bugs: KAFKA-2138
https://issues.apache.org/jira/browse/KAFKA-2138


Repository: kafka


Description (updated)
---

Incorporated Joel's comments.


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 

Diff: https://reviews.apache.org/r/33417/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-2138) KafkaProducer does not honor the retry backoff time.

2015-04-22 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2138:

Attachment: KAFKA-2138_2015-04-22_17:19:33.patch

 KafkaProducer does not honor the retry backoff time.
 

 Key: KAFKA-2138
 URL: https://issues.apache.org/jira/browse/KAFKA-2138
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
Priority: Critical
 Attachments: KAFKA-2138.patch, KAFKA-2138_2015-04-22_17:19:33.patch


 In KafkaProducer, we only check the batch.lastAttemptMs in ready. But we are 
 not checking it in drain() as well.
 The problem is that if we have two partitions both on the same node, suppose 
 Partition 1 should backoff while partition 2 should not. Currently partition 
 1's backoff time will be ignored.
 We should check the lastAttemptMs in drain() as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-22 Thread Honghai Chen
Hi Roshan,
Use the 'auto' value maybe will break the rule and mess up the 
configuration. @Jay, any thoughts?

Thanks, Honghai Chen 

-Original Message-
From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm] 
Sent: Thursday, April 23, 2015 6:27 AM
To: dev@kafka.apache.org; Roshan Naik
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

+1 (non-binding).

-- 
Harsha


On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote:

I see that it is safe to keep it this off by default due to some concerns.  
Eventually, for settings such as this whose 'preferred' value is platform  
specific (or based on other criteria), it might be worth considering  
having a default value that is not a constant but an 'auto' value .. When  
kafka boots up it can automatically use the preferred value. Ofcourse it  
would have to documented as to what auto means for a given platform.  

-roshan  


On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote:  

+1. This is an important performance fix for Windows-based clusters.  
  
-Jakob  
  
On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com  
wrote:  
 Fix the issue Sriram mentioned. Code review and jira/KIP updated.  
  
 Below are detail description for the scenarios:  
 1.If do clear shutdown, the last log file will be truncated to its  
real size since the close() function of FileMessageSet will call trim(),  
 2.If crash, then when restart, will go through the process of  
recover() and the last log file will be truncate to its real size, (and  
the position will be moved to end of the file)  
 3.When service start and open existing file  
 a.Will run the LogSegment constructor which has NO parameter  
preallocate,  
 b.Then in FileMessageSet, the end in FileMessageSet will be  
Int.MaxValue, and then  
channel.position(math.min(channel.size().toInt, end)) will make the  
position be end of the file,  
 c.If recover needed, the recover function will truncate file to end of  
valid data, and also move the position to it,  
  
 4.When service running and need create new log segment and new  
FileMessageSet  
  
 a.If preallocate = truei.the end in FileMessageSet will be 0, the  
file size will be initFileSize, and then  
channel.position(math.min(channel.size().toInt, end)) will make the  
position be 0,  
  
 b.Else if preallocate = falsei.backward compatible, the end in  
FileMessageSet will be Int.MaxValue, the file size will be 0, and  
then channel.position(math.min(channel.size().toInt, end)) will make  
the position be 0,  
  
  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre  
allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+  
file+system  
 https://issues.apache.org/jira/browse/KAFKA-1646  
 https://reviews.apache.org/r/33204/diff/2/  
  
 Thanks, Honghai Chen  
 http://aka.ms/kafka  
 http://aka.ms/manifold  
  
 -Original Message-  
 From: Honghai Chen  
 Sent: Wednesday, April 22, 2015 11:12 AM  
 To: dev@kafka.apache.org  
 Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume  
performance under windows and some old Linux file system  
  
 Hi Sriram,  
 One sentence of code missed, will update code review board and  
KIP soon.  
 For LogSegment and FileMessageSet, must use different  
constructor function for existing file and new file, then the code   
channel.position(math.min(channel.size().toInt, end))  will make sure  
the position at end of existing file.  
  
 Thanks, Honghai Chen  
  
 -Original Message-  
 From: Jay Kreps [mailto:jay.kr...@gmail.com]  
 Sent: Wednesday, April 22, 2015 5:22 AM  
 To: dev@kafka.apache.org  
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume  
performance under windows and some old Linux file system  
  
 My understanding of the patch is that clean shutdown truncates the file  
back to it's true size (and reallocates it on startup). Hard crash is  
handled by the normal recovery which should truncate off the empty  
portion of the file.  
  
 On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian   
srsubraman...@linkedin.com.invalid wrote:  
  
 Could you describe how recovery works in this mode? Say, we had a 250  
 MB preallocated segment and we wrote till 50MB and crashed. Till what  
 point do we recover? Also, on startup, how is the append end pointer  
 set even on a clean shutdown? How does the FileChannel end position  
 get set to 50 MB instead of 250 MB? The existing code might just work  
 for it but explaining that would be useful.  
  
 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:  
  
 +1. I've tried this on Linux and it helps reduce the spikes in append  
 +(and  
 hence producer) latency for high throughput writes. I am not entirely  
 sure why but my suspicion is that in the absence of preallocation,  
 you see spikes writes need to happen faster 

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Parth Brahmbhatt
Hi Jeff,

Thanks a lot for the review. I think you have a valid point about acls
being duplicated and the simplest solution would be to modify acls class
so they hold a set of principals instead of single principal. i.e

user_a,user_b has READ,WRITE,DESCRIBE Permissions on Topic1 from
Host1, Host2, Host3.

I think the evaluation order only matters for the permissionType which is
Deny acls should be evaluated before allow acls. To give you an example
suppose we have following acls

acl1 - user1 is allowed to READ from all hosts.
acl2 - host1 is allowed to READ regardless of who is the user.
acl3 - host2 is allowed to READ regardless of who is the user.

acl4 - user1 is denied to READ from host1.

As stated in the KIP we first evaluate DENY so if user1 tries to access
from host1 he will be denied(acl4), even though both user1 and host1 has
acl’s for allow with wildcards (acl1, acl2).
If user1 tried to READ from host2 , the action will be allowed and it does
not matter if we match acl3 or acl1 so I don’t think the evaluation order
matters here.

“Will people actually use hosts with users?” I really don’t know but given
ACl’s are part of our Public APIs I thought it is better to try and cover
more use cases. If others think this extra complexity is not worth the
value its adding please raise your concerns so we can discuss if it should
be removed from the acl structure. Note that even in absence of hosts from
ACL users will still be able to whitelist/blacklist host as long as we
start supporting principalType = “host”, easy to add and can be an
incremental improvement. They will however loose the ability to restrict
access to users just from a set of hosts.

We agreed to offer a CLI to overcome the JSON acl config
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+In
terface#KIP-11-AuthorizationInterface-AclManagement(CLI). I still like
Jsons but that probably has something to do with me being a developer :-).

Thanks
Parth

On 4/22/15, 11:38 AM, Jeff Holoman jholo...@cloudera.com wrote:

Parth,

This is a long thread, so trying to keep up here, sorry if this has been
covered before. First, great job on the KIP proposal and work so far.

Are we sure that we want to tie host level access to a given user? My
understanding is that the ACL will be (omitting some fields)

user_a, host1, host2, host3
user_b, host1, host2, host3

So there would potentially be a lot of redundancy in the configs. Does it
make sense to have hosts be at the same level as principal in the
hierarchy? This way you could just blanket the allowed / denied hosts and
only have to worry about the users. So if you follow this, then

we can wildcard the user so we can have a separate list of just host-based
access. What's the order that the perms would be evaluated if a there was
more than one match on a principal ?

Is the thought that there wouldn't usually be much overlap on hosts? I
guess I can imagine a scenario where I want to offline/online access to a
particular hosts or set of hosts and if there was overlap, I'm doing a
bunch of alter commands for just a single host. Maybe this is too
contrived
an example?

I agree that having this level of granularity gives flexibility but I
wonder if people will actually use it and not just * the hosts for a given
user and create separate global list as i mentioned above?

The only other system I know of that ties users with hosts for access is
MySql and I don't love that model. Companies usually standardize on group
authorization anyway, are we complicating that issue with the inclusion of
hosts attached to users? Additionally I worry about the debt of big JSON
configs in the first place, most non-developers find them non-intuitive
already, so anything to ease this I think would be beneficial.


Thanks

Jeff

On Wed, Apr 22, 2015 at 2:22 PM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:

 Sorry I missed your last questions. I am +0 on adding ―host option for
 ―list, we could add it for symmetry. Again if this is only a CLI change
it
 can be added later if you mean adding this in authorizer interface then
we
 should make a decision now.

 Given a choice I would like to actually keep only one option which is
 resource based get (remove even the get based on principal). I see those
 (getAcl for principal or host) as special filtering case which can
easily
 be achieved by a third party tool by doing list all topics and calling
 getAcls for each topic and applying filtering logic on that.  I really
 don’t see the need to make those first class citizens of the authorizer
 interface given these kind of queries will be issued outside of broker
JVM
 so they will not benefit from the caching and because the storage will
be
 indexed on resource both these options even as a first class API will
just
 scan all topic acls and apply filtering logic.

 Thanks
 Parth

 On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
 wrote:

 Please see all the available options here
 
 

[jira] [Commented] (KAFKA-2139) Add a separate controller messge queue with higher priority on broker side

2015-04-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507933#comment-14507933
 ] 

Jay Kreps commented on KAFKA-2139:
--

Awesome, greatly appreciated. I agree that a separate queue with a dedicated 
worker thread sounds more promising if we can come up with a clean way to 
distinguish controller vs non-controller traffic at the network layer.

 Add a separate controller messge queue with higher priority on broker side 
 ---

 Key: KAFKA-2139
 URL: https://issues.apache.org/jira/browse/KAFKA-2139
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin

 This ticket is supposed to be working together with KAFKA-2029. 
 There are two issues with current controller to broker messages.
 1. On the controller side the message are sent without synchronization.
 2. On broker side the controller messages share the same queue as client 
 messages.
 The problem here is that brokers process the controller messages for the same 
 partition at different times and the variation could be big. This causes 
 unnecessary data loss and prolong the preferred leader election / controlled 
 shutdown/ partition reassignment, etc.
 KAFKA-2029 was trying to add a boundary between messages for different 
 partitions. For example, before leader migration for previous partition 
 finishes, the leader migration for next partition won't begin.
 This ticket is trying to let broker process controller messages faster. So 
 the idea is have separate queue to hold controller messages, if there are 
 controller messages, KafkaApi thread will first take care of those messages, 
 otherwise it will proceed messages from clients.
 Those two tickets are not ultimate solution to current controller problems, 
 but just mitigate them with minor code changes. Moving forward, we still need 
 to think about rewriting controller in a cleaner way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33417: Patch for KAFKA-2138

2015-04-22 Thread Joel Koshy


 On April 21, 2015, 11:56 p.m., Guozhang Wang wrote:
  This piece of logic has been quite complex and awkward to me now, for 
  example in ready() a node will only not be considered if ALL of its 
  partitions are either not sendable or are in the backoff period, and the 
  reason we want to get ready nodes before drain is to check if they are 
  really ready or not. This is mainly because 1) we need to be careful when 
  calling client.poll() later about the timeout value in order to avoid busy 
  waiting, 2) we need to make sure if metadata refresh is needed, it needs to 
  be sent as higher priority than other requests.
  
  I suggest re-writing this fraction of code to make it clearer, in the 
  following process:
  
  0. while handle metadata response and update the metadata, check for ANY 
  partitions if their leader is not known; if there is set 
  metadata.requestUpdate. So we do not need to do this step anymore at the 
  start of run().
  
  1. get all the ready nodes based on their connection state only (i.e. no 
  peeking in RecordAccumulator), and record the node_backoff as min 
  (reconnection_backoff - time_waited) of all nodes; if one of these node is 
  connected or connecting, this backoff should be 0.
  
  2. for each of ready nodes, try to drain their corresponding partitions in 
  RecordAccumulator while considering or kinds of conditions (full, expired, 
  exhausted, etc...), and record the data_backoff as min (retry_backoff - 
  time_waited) of all partitions; if one of the partitions is immediately 
  sendable, this backoff should be 0.
  
  3. formulate produce request and call client.poll() with timeout = 
  reconnection_backoff  0 ? recconection_backoff : retry_backoff.
  
  4. in NetworkClient.poll(), the logic of maybeUpdateMetadata while update 
  metadataTimeout can also be simplified.
  
  This may contain some flaw, Jiangjie / Ewen let me know if you see any 
  issues.
 
 Jiangjie Qin wrote:
 Hi Guozhang, I think that makes sense. We should exchange the checking 
 order to check connection ready first then the data ready. I'll try to submit 
 a refactored patch and will throw questions if there is anything. Thanks.

I agree with the suggestions to rewrite but I really think that is orthogonal 
to this simple (and correct) fix. I think we should commit this and keep the 
jira open for further refactoring.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33417/#review81097
---


On April 21, 2015, 10:51 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33417/
 ---
 
 (Updated April 21, 2015, 10:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2138
 https://issues.apache.org/jira/browse/KAFKA-2138
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Patch for KAFKA-2138 honor retry backoff in KafkaProducer
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
 
 Diff: https://reviews.apache.org/r/33417/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 33417: Patch for KAFKA-2138

2015-04-22 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33417/#review81238
---

Ship it!



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
https://reviews.apache.org/r/33417/#comment131504

typo



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
https://reviews.apache.org/r/33417/#comment131513

Node1 should be the only ready node



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
https://reviews.apache.org/r/33417/#comment131506

batche - batch



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
https://reviews.apache.org/r/33417/#comment131508

paritition - partition



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
https://reviews.apache.org/r/33417/#comment131509

tp1 should backoff while tp2 should not



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
https://reviews.apache.org/r/33417/#comment131514

Node1 should be the only ready node


- Joel Koshy


On April 21, 2015, 10:51 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33417/
 ---
 
 (Updated April 21, 2015, 10:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2138
 https://issues.apache.org/jira/browse/KAFKA-2138
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Patch for KAFKA-2138 honor retry backoff in KafkaProducer
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
 
 Diff: https://reviews.apache.org/r/33417/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-22 Thread Sriharsha Chintalapani
+1 (non-binding).

-- 
Harsha


On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote:

I see that it is safe to keep it this off by default due to some concerns.  
Eventually, for settings such as this whose 'preferred' value is platform  
specific (or based on other criteria), it might be worth considering  
having a default value that is not a constant but an 'auto' value .. When  
kafka boots up it can automatically use the preferred value. Ofcourse it  
would have to documented as to what auto means for a given platform.  

-roshan  


On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote:  

+1. This is an important performance fix for Windows-based clusters.  
  
-Jakob  
  
On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com  
wrote:  
 Fix the issue Sriram mentioned. Code review and jira/KIP updated.  
  
 Below are detail description for the scenarios:  
 1.If do clear shutdown, the last log file will be truncated to its  
real size since the close() function of FileMessageSet will call trim(),  
 2.If crash, then when restart, will go through the process of  
recover() and the last log file will be truncate to its real size, (and  
the position will be moved to end of the file)  
 3.When service start and open existing file  
 a.Will run the LogSegment constructor which has NO parameter  
preallocate,  
 b.Then in FileMessageSet, the end in FileMessageSet will be  
Int.MaxValue, and then  
channel.position(math.min(channel.size().toInt, end)) will make the  
position be end of the file,  
 c.If recover needed, the recover function will truncate file to end of  
valid data, and also move the position to it,  
  
 4.When service running and need create new log segment and new  
FileMessageSet  
  
 a.If preallocate = truei.the end in FileMessageSet will be 0, the  
file size will be initFileSize, and then  
channel.position(math.min(channel.size().toInt, end)) will make the  
position be 0,  
  
 b.Else if preallocate = falsei.backward compatible, the end in  
FileMessageSet will be Int.MaxValue, the file size will be 0, and  
then channel.position(math.min(channel.size().toInt, end)) will make  
the position be 0,  
  
  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre  
allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+  
file+system  
 https://issues.apache.org/jira/browse/KAFKA-1646  
 https://reviews.apache.org/r/33204/diff/2/  
  
 Thanks, Honghai Chen  
 http://aka.ms/kafka  
 http://aka.ms/manifold  
  
 -Original Message-  
 From: Honghai Chen  
 Sent: Wednesday, April 22, 2015 11:12 AM  
 To: dev@kafka.apache.org  
 Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume  
performance under windows and some old Linux file system  
  
 Hi Sriram,  
 One sentence of code missed, will update code review board and  
KIP soon.  
 For LogSegment and FileMessageSet, must use different  
constructor function for existing file and new file, then the code   
channel.position(math.min(channel.size().toInt, end))  will make sure  
the position at end of existing file.  
  
 Thanks, Honghai Chen  
  
 -Original Message-  
 From: Jay Kreps [mailto:jay.kr...@gmail.com]  
 Sent: Wednesday, April 22, 2015 5:22 AM  
 To: dev@kafka.apache.org  
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume  
performance under windows and some old Linux file system  
  
 My understanding of the patch is that clean shutdown truncates the file  
back to it's true size (and reallocates it on startup). Hard crash is  
handled by the normal recovery which should truncate off the empty  
portion of the file.  
  
 On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian   
srsubraman...@linkedin.com.invalid wrote:  
  
 Could you describe how recovery works in this mode? Say, we had a 250  
 MB preallocated segment and we wrote till 50MB and crashed. Till what  
 point do we recover? Also, on startup, how is the append end pointer  
 set even on a clean shutdown? How does the FileChannel end position  
 get set to 50 MB instead of 250 MB? The existing code might just work  
 for it but explaining that would be useful.  
  
 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:  
  
 +1. I've tried this on Linux and it helps reduce the spikes in append  
 +(and  
 hence producer) latency for high throughput writes. I am not entirely  
 sure why but my suspicion is that in the absence of preallocation,  
 you see spikes writes need to happen faster than the time it takes  
 Linux to allocate the next block to the file.  
   
 It will be great to see some performance test results too.  
   
 On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com  
wrote:  
   
  I'm also +1 on this. The change is quite small and may actually  
 help perf on Linux as well (we've never tried this).  
   
  I have a lot of concerns on testing the various failure conditions  
  but I think since it 

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-22 Thread Jakob Homan
+1.  This is an important performance fix for Windows-based clusters.

-Jakob

On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote:
 Fix the issue Sriram mentioned. Code review and jira/KIP updated.

 Below are detail description for the scenarios:
 1.If do clear shutdown,  the last log file will be truncated to its real size 
 since the close() function of FileMessageSet will call trim(),
 2.If crash, then when restart,  will go through the process of recover() and 
 the last log file will be truncate to its real size, (and the position will 
 be moved to end of the file)
 3.When service start and open existing file
 a.Will run the LogSegment constructor which has NO parameter preallocate,
 b.Then in FileMessageSet,  the end in FileMessageSet will be Int.MaxValue,  
  and then channel.position(math.min(channel.size().toInt, end))  will make 
 the position be end of the file,
 c.If recover needed, the recover function will truncate file to end of valid 
 data, and also move the position to it,

 4.When service running and need create new log segment and new FileMessageSet

 a.If preallocate = truei.the end in FileMessageSet will be 0,  the file 
 size will be initFileSize, and then 
 channel.position(math.min(channel.size().toInt, end))  will make the 
 position be 0,

 b.Else if preallocate = falsei.backward compatible, the end in 
 FileMessageSet will be Int.MaxValue, the file size will be 0,  and then 
 channel.position(math.min(channel.size().toInt, end))  will make the 
 position be 0,

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
 https://issues.apache.org/jira/browse/KAFKA-1646
 https://reviews.apache.org/r/33204/diff/2/

 Thanks, Honghai Chen
 http://aka.ms/kafka
 http://aka.ms/manifold

 -Original Message-
 From: Honghai Chen
 Sent: Wednesday, April 22, 2015 11:12 AM
 To: dev@kafka.apache.org
 Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
 performance under windows and some old Linux file system

 Hi Sriram,
 One sentence of code missed, will update code review board and KIP 
 soon.
 For LogSegment and FileMessageSet, must use different constructor 
 function for existing file and new file, then the code  
 channel.position(math.min(channel.size().toInt, end))  will make sure the 
 position at end of existing file.

 Thanks, Honghai Chen

 -Original Message-
 From: Jay Kreps [mailto:jay.kr...@gmail.com]
 Sent: Wednesday, April 22, 2015 5:22 AM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
 performance under windows and some old Linux file system

 My understanding of the patch is that clean shutdown truncates the file back 
 to it's true size (and reallocates it on startup). Hard crash is handled by 
 the normal recovery which should truncate off the empty portion of the file.

 On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian  
 srsubraman...@linkedin.com.invalid wrote:

 Could you describe how recovery works in this mode? Say, we had a 250
 MB preallocated segment and we wrote till 50MB and crashed. Till what
 point do we recover? Also, on startup, how is the append end pointer
 set even on a clean shutdown? How does the FileChannel end position
 get set to 50 MB instead of 250 MB? The existing code might just work
 for it but explaining that would be useful.

 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:

 +1. I've tried this on Linux and it helps reduce the spikes in append
 +(and
 hence producer) latency for high throughput writes. I am not entirely
 sure why but my suspicion is that in the absence of preallocation,
 you see spikes writes need to happen faster than the time it takes
 Linux to allocate the next block to the file.
 
 It will be great to see some performance test results too.
 
 On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  I'm also +1 on this. The change is quite small and may actually
 help perf  on Linux as well (we've never tried this).
 
  I have a lot of concerns on testing the various failure conditions
  but I think since it will be off by default the risk is not too high.
 
  -Jay
 
  On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen
 honghai.c...@microsoft.com
  wrote:
 
   I wrote a KIP for this after some discussion on KAFKA-1646.
   https://issues.apache.org/jira/browse/KAFKA-1646
  
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
 pre
 allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
 file+system
   The RB is here: https://reviews.apache.org/r/33204/diff/
  
   Thanks, Honghai
  
  
 
 
 
 
 --
 Thanks,
 Neha




Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-22 Thread Tong Li
same question here as well on hosts. If I would like to secure a topic,
regardless where the topic is, I would like it to be secured. It is hard to
imagine that one would like a topic to be secured on one host, but not on
the other unless a topic spreads over multiple hosts really meant to be
totally different things. Then it will be really needed.

Thanks.

Tong Li
OpenStack  Kafka Community Development
Building 501/B205
liton...@us.ibm.com



From:   Jeff Holoman jholo...@cloudera.com
To: dev@kafka.apache.org
Cc: Tom Graves tgraves...@yahoo.com
Date:   04/22/2015 02:40 PM
Subject:Re: [DISCUSS] KIP-11- Authorization design for kafka security



Parth,

This is a long thread, so trying to keep up here, sorry if this has been
covered before. First, great job on the KIP proposal and work so far.

Are we sure that we want to tie host level access to a given user? My
understanding is that the ACL will be (omitting some fields)

user_a, host1, host2, host3
user_b, host1, host2, host3

So there would potentially be a lot of redundancy in the configs. Does it
make sense to have hosts be at the same level as principal in the
hierarchy? This way you could just blanket the allowed / denied hosts and
only have to worry about the users. So if you follow this, then

we can wildcard the user so we can have a separate list of just host-based
access. What's the order that the perms would be evaluated if a there was
more than one match on a principal ?

Is the thought that there wouldn't usually be much overlap on hosts? I
guess I can imagine a scenario where I want to offline/online access to a
particular hosts or set of hosts and if there was overlap, I'm doing a
bunch of alter commands for just a single host. Maybe this is too contrived
an example?

I agree that having this level of granularity gives flexibility but I
wonder if people will actually use it and not just * the hosts for a given
user and create separate global list as i mentioned above?

The only other system I know of that ties users with hosts for access is
MySql and I don't love that model. Companies usually standardize on group
authorization anyway, are we complicating that issue with the inclusion of
hosts attached to users? Additionally I worry about the debt of big JSON
configs in the first place, most non-developers find them non-intuitive
already, so anything to ease this I think would be beneficial.


Thanks

Jeff

On Wed, Apr 22, 2015 at 2:22 PM, Parth Brahmbhatt 
pbrahmbh...@hortonworks.com wrote:

 Sorry I missed your last questions. I am +0 on adding ―host option for
 ―list, we could add it for symmetry. Again if this is only a CLI change
it
 can be added later if you mean adding this in authorizer interface then
we
 should make a decision now.

 Given a choice I would like to actually keep only one option which is
 resource based get (remove even the get based on principal). I see those
 (getAcl for principal or host) as special filtering case which can easily
 be achieved by a third party tool by doing list all topics and calling
 getAcls for each topic and applying filtering logic on that.  I really
 don’t see the need to make those first class citizens of the authorizer
 interface given these kind of queries will be issued outside of broker
JVM
 so they will not benefit from the caching and because the storage will be
 indexed on resource both these options even as a first class API will
just
 scan all topic acls and apply filtering logic.

 Thanks
 Parth

 On 4/22/15, 11:08 AM, Parth Brahmbhatt pbrahmbh...@hortonworks.com
 wrote:

 Please see all the available options here
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization
+I
 nterface#KIP-11-AuthorizationInterface-AclManagement(CLI) . I think it
 covers both hosts and operations and allows to specify a list for both.
 
 Thanks
 Parth
 
 From: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
 Reply-To: Tom Graves tgraves...@yahoo.commailto:tgraves...@yahoo.com
 Date: Wednesday, April 22, 2015 at 11:02 AM
 To: Parth Brahmbhatt
 pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com,
 dev@kafka.apache.orgmailto:dev@kafka.apache.org
 dev@kafka.apache.orgmailto:dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP-11- Authorization design for kafka security
 
 Thanks for the explanations Parth.
 
 On the configs questions, the way I see it is its more likely to
 accidentally give everyone access, especially since you have to run a
 separate command to change the acls. If there was some config for
 defaults, a cluster admin could change that to be nobody or certain set
 of users, then grant others permissions.  This would also remove the
race
 between commands.  This is something you can always add later though if
 people request it.
 
 So in kafka-acl.sh how do I actually tell it what the operation is?
 kafka-acl.sh --topic testtopic --add --grandprincipal user:joe,user:kate
 
 where does READ, WRITE, etc go?  Can specify as a list so I 

Re: Review Request 31850: Patch for KAFKA-1660

2015-04-22 Thread Jiangjie Qin


 On April 20, 2015, 5:30 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
   line 157
  https://reviews.apache.org/r/31850/diff/9/?file=931821#file931821line157
 
  Read locks are very expensive. I am pretty worried about this. If we 
  want to do this we need to do a pretty detailed examination of the perf 
  impact.
 
 Jiangjie Qin wrote:
 Hi Jay, I looked into the ReentrantReaderWriterLock implementation and it 
 seems under the hood it uses CompareAndSet which should provide similar 
 performance as atomic integer. But I agree this definitely largely depends on 
 implementation.
 I modified o.a.k.clients.tools.ProducerPerformance a little bit to make 
 it multiple threaded. The performance in following tests settings are very 
 similar which are all ~1M messages/second when target is 10M message/sec.
 1. 10 thread with latest trunk
 2. 10 threads using atomic integer AtomicInteger
 3. 10 threads using ReaderWriterLock
 When I increase the thread number to 50. It drops to about 0.82M 
 messages/second in all cases.
 It seems reader lock did not introduce performance issue.

Hey Jay, do you have any other performance tests in mind that we should run?


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review80753
---


On April 21, 2015, 12:38 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31850/
 ---
 
 (Updated April 21, 2015, 12:38 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1660
 https://issues.apache.org/jira/browse/KAFKA-1660
 
 
 Repository: kafka
 
 
 Description
 ---
 
 A minor fix.
 
 
 Incorporated Guozhang's comments.
 
 
 Modify according to the latest conclusion.
 
 
 Patch for the finally passed KIP-15git status
 
 
 Addressed Joel and Guozhang's comments.
 
 
 rebased on trunk
 
 
 Rebase on trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments
 
 
 Addressed Jay's comments
 
 
 Changed javadoc per Jay's suggestion
 
 
 Change java doc as Jay suggested.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b70e1a3d406338d4b9ddd6188d2820e87545a9b6 
   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
 6913090af03a455452b0b5c3df78f266126b3854 
   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
   
 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
 fee322fa0dd9704374db4a6964246a7d2918d3e4 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c2fdc23239bd2196cd912c3d121b591f21393eab 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
 
 Diff: https://reviews.apache.org/r/31850/diff/
 
 
 Testing
 ---
 
 Unit tests passed.
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 33088: add heartbeat to coordinator

2015-04-22 Thread Onur Karaman


 On April 22, 2015, 2:33 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/api/RequestKeys.scala, lines 55-58
  https://reviews.apache.org/r/33088/diff/1/?file=923567#file923567line55
 
  Can we just add these two request into keyToNameAndDeserializerMap?

I undid this change since Gwen's patch addressed the ApiKeys issue: 
https://issues.apache.org/jira/browse/KAFKA-2115


 On April 22, 2015, 2:33 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/network/RequestChannel.scala, line 29
  https://reviews.apache.org/r/33088/diff/1/?file=923576#file923576line29
 
  Seems ApiKeys are not used?

I undid this change since Gwen's patch addressed the ApiKeys issue: 
https://issues.apache.org/jira/browse/KAFKA-2115


 On April 22, 2015, 2:33 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 414
  https://reviews.apache.org/r/33088/diff/1/?file=923568#file923568line414
 
  We do not need to cut the socket connection from the coordinator.

Comment has been removed in later rb update.


- Onur


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33088/#review79762
---


On April 18, 2015, 7:16 p.m., Onur Karaman wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33088/
 ---
 
 (Updated April 18, 2015, 7:16 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1334
 https://issues.apache.org/jira/browse/KAFKA-1334
 
 
 Repository: kafka
 
 
 Description
 ---
 
 add heartbeat to coordinator
 
 todo:
 - see how it performs under real load
 - add error code handling on the consumer side
 - implement the partition assignors
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 456b602245e111880e1b8b361319cabff38ee0e9 
   core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala 
 2f5797064d4131ecfc9d2750d9345a9fa3972a9a 
   core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 
 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 
   core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala 
 df60cbc35d09937b4e9c737c67229889c69d8698 
   core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 
 8defa2e41c92f1ebe255177679d275c70dae5b3e 
   core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION 
   core/src/main/scala/kafka/coordinator/GroupRegistry.scala 
 94ef5829b3a616c90018af1db7627bfe42e259e5 
   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala 
 821e26e97eaa97b5f4520474fff0fedbf406c82a 
   core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION 
   core/src/main/scala/kafka/server/DelayedOperationKey.scala 
 b673e43b0ba401b2e22f27aef550e3ab0ef4323c 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 b4004aa3a1456d337199aa1245fb0ae61f6add46 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 c63f4ba9d622817ea8636d4e6135fba917ce085a 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 18680ce100f10035175cc0263ba7787ab0f6a17a 
   core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33088/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Onur Karaman
 




Re: Review Request 33204: Patch for KAFKA-1646

2015-04-22 Thread Honghai Chen

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33204/
---

(Updated April 22, 2015, 10:13 a.m.)


Review request for kafka.


Bugs: KAFKA-1646
https://issues.apache.org/jira/browse/KAFKA-1646


Repository: kafka


Description (updated)
---

Kafka 1646 fix improve for set position correctly.


Diffs (updated)
-

  core/src/main/scala/kafka/log/FileMessageSet.scala 
2522604bd985c513527fa0c863a7df677ff7a503 
  core/src/main/scala/kafka/log/Log.scala 
5563f2de8113a0ece8929bec9c75dbf892abbb66 
  core/src/main/scala/kafka/log/LogConfig.scala 
da55a348f37a3d6d99032c39398f7ccb11068f42 
  core/src/main/scala/kafka/log/LogSegment.scala 
ed039539ac18ea4d65144073915cf112f7374631 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
cfbbd2be550947dd2b3c8c2cab981fa08fb6d859 
  core/src/main/scala/kafka/server/KafkaServer.scala 
c63f4ba9d622817ea8636d4e6135fba917ce085a 
  core/src/main/scala/kafka/utils/CoreUtils.scala 
c473a034bc3a00ccddbbc0506388a5a7763df950 

Diff: https://reviews.apache.org/r/33204/diff/


Testing
---


Thanks,

Honghai Chen



[jira] [Updated] (KAFKA-1646) Improve consumer read performance for Windows

2015-04-22 Thread Honghai Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Honghai Chen updated KAFKA-1646:

Attachment: KAFKA-1646.patch

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
 KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
 KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-04-22 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506767#comment-14506767
 ] 

Honghai Chen commented on KAFKA-1646:
-

Created reviewboard  against branch origin/trunk

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
 KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
 KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1646) Improve consumer read performance for Windows

2015-04-22 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506767#comment-14506767
 ] 

Honghai Chen edited comment on KAFKA-1646 at 4/22/15 10:15 AM:
---

New code review board https://reviews.apache.org/r/33204/diff/2/
patch against trunk also attached.


was (Author: waldenchen):
Created reviewboard  against branch origin/trunk

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646.patch, KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
 KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
 KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-22 Thread Honghai Chen
Fix the issue Sriram mentioned. Code review and jira/KIP updated.

Below are detail description for the scenarios:
1.If do clear shutdown,  the last log file will be truncated to its real size 
since the close() function of FileMessageSet will call trim(), 
2.If crash, then when restart,  will go through the process of recover() and 
the last log file will be truncate to its real size, (and the position will be 
moved to end of the file)
3.When service start and open existing file
a.Will run the LogSegment constructor which has NO parameter preallocate, 
b.Then in FileMessageSet,  the end in FileMessageSet will be Int.MaxValue,   
and then channel.position(math.min(channel.size().toInt, end))  will make the 
position be end of the file,
c.If recover needed, the recover function will truncate file to end of valid 
data, and also move the position to it,

4.When service running and need create new log segment and new FileMessageSet

a.If preallocate = truei.the end in FileMessageSet will be 0,  the file size 
will be initFileSize, and then 
channel.position(math.min(channel.size().toInt, end))  will make the position 
be 0,

b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet 
will be Int.MaxValue, the file size will be 0,  and then 
channel.position(math.min(channel.size().toInt, end))  will make the position 
be 0,

https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
https://issues.apache.org/jira/browse/KAFKA-1646 
https://reviews.apache.org/r/33204/diff/2/ 

Thanks, Honghai Chen
http://aka.ms/kafka 
http://aka.ms/manifold 

-Original Message-
From: Honghai Chen 
Sent: Wednesday, April 22, 2015 11:12 AM
To: dev@kafka.apache.org
Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

Hi Sriram,
One sentence of code missed, will update code review board and KIP soon.
For LogSegment and FileMessageSet, must use different constructor 
function for existing file and new file, then the code  
channel.position(math.min(channel.size().toInt, end))  will make sure the 
position at end of existing file.

Thanks, Honghai Chen 

-Original Message-
From: Jay Kreps [mailto:jay.kr...@gmail.com]
Sent: Wednesday, April 22, 2015 5:22 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

My understanding of the patch is that clean shutdown truncates the file back to 
it's true size (and reallocates it on startup). Hard crash is handled by the 
normal recovery which should truncate off the empty portion of the file.

On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian  
srsubraman...@linkedin.com.invalid wrote:

 Could you describe how recovery works in this mode? Say, we had a 250 
 MB preallocated segment and we wrote till 50MB and crashed. Till what 
 point do we recover? Also, on startup, how is the append end pointer 
 set even on a clean shutdown? How does the FileChannel end position 
 get set to 50 MB instead of 250 MB? The existing code might just work 
 for it but explaining that would be useful.

 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:

 +1. I've tried this on Linux and it helps reduce the spikes in append 
 +(and
 hence producer) latency for high throughput writes. I am not entirely 
 sure why but my suspicion is that in the absence of preallocation, 
 you see spikes writes need to happen faster than the time it takes 
 Linux to allocate the next block to the file.
 
 It will be great to see some performance test results too.
 
 On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  I'm also +1 on this. The change is quite small and may actually 
 help perf  on Linux as well (we've never tried this).
 
  I have a lot of concerns on testing the various failure conditions 
  but I think since it will be off by default the risk is not too high.
 
  -Jay
 
  On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen 
 honghai.c...@microsoft.com
  wrote:
 
   I wrote a KIP for this after some discussion on KAFKA-1646.
   https://issues.apache.org/jira/browse/KAFKA-1646
  
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
 pre
 allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
 file+system
   The RB is here: https://reviews.apache.org/r/33204/diff/
  
   Thanks, Honghai
  
  
 
 
 
 
 --
 Thanks,
 Neha




[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-04-22 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14506795#comment-14506795
 ] 

Honghai Chen commented on KAFKA-1646:
-

Created reviewboard  against branch origin/trunk

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
 KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
 KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, 
 KAFKA-1646_20150422.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-1646) Improve consumer read performance for Windows

2015-04-22 Thread Honghai Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Honghai Chen updated KAFKA-1646:

Comment: was deleted

(was: Created reviewboard  against branch origin/trunk)

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
 KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
 KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, 
 KAFKA-1646_20150422.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1646) Improve consumer read performance for Windows

2015-04-22 Thread Honghai Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Honghai Chen updated KAFKA-1646:

Attachment: KAFKA-1646_20150422.patch

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
 KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
 KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, 
 KAFKA-1646_20150422.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-22 Thread Rajini Sivaram
When we were working on the client-side SSL implementation for Kafka, we
found that returning selection interest from handshake() method wasn't
sufficient to handle some of the SSL sequences. We resorted to managing the
selection key and interest state within SSLChannel to avoid SSL-specific
knowledge escaping out of SSL classes into protocol-independent network
code. The current server-side SSL patch doesn't address these scenarios
yet, but we may want to take these into account while designing the common
Channel class/interface.

   1. *Support for running potentially long-running delegated tasks outside
   the network thread*: It is recommended that delegated tasks indicated by
   a handshake status of NEED_TASK are run on a separate thread since they may
   block (
   http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html).
   It is easier to encapsulate this in SSLChannel without any changes to
   common code if selection keys are managed within the Channel.
   2. *Renegotiation handshake*: During a read operation, handshake status
   may indicate that renegotiation is required. It will be good to encapsulate
   this state change (and any knowledge of these SSL-specific state
   transitions) within SSLChannel. Our experience was that managing keys and
   state within the SSLChannel rather than in Selector made this code neater.
   3. *Graceful shutdown of the SSL connection*s: Our experience was that
   we could encapsulate all of the logic for shutting down SSLEngine
   gracefully within SSLChannel when the selection key and state are owned and
   managed by SSLChannel.
   4. *And finally a minor point:* We found that by managing selection key
   and selection interests within SSLChannel, protocol-independent Selector
   didn't need the concept of handshake at all and all channel state
   management and handshake related code could be held in protocol-specific
   classes. This may be worth taking into consideration since it makes it
   easier for common network layer code to be maintained without any
   understanding of the details of individual security protocols.

The channel classes we used are included in the patch in
https://issues.apache.org/jira/browse/KAFKA-1690. The patch contains unit
tests to validate these scenarios as well as other buffer overflow
conditions which may be useful for server-side code when the scenarios
described above are implemented.
Regards,

Rajini



On Tue, Apr 21, 2015 at 11:13 PM, Sriharsha Chintalapani 
harsh...@fastmail.fm wrote:

 Hi Jay,
   Thanks for the review.

1. Isn't the blocking handshake going to be a performance concern? Can
 we
 do the handshake non-blocking instead? If anything that causes connections
 to drop can incur blocking network roundtrips won't that eat up all the
 network threads immediately? I guess I would have to look at that code to
 know...
 I’ve non-blocking handshake on the server side as well as for new
 producer client.  Blocking handshake is only done for BlockingChannel.scala
 and it just loops over the non-blocking hand shake until the context is
 established. So on the server side (SocketServer.scala) as it goes through
 the steps and returns “READ or WRITE” signal for next step.  For
 BlockingChannel the worst case I look at is the connection timeout but most
 times this handshake will finish up much quicker . I am cleaning up the
 code will send up a patch in next few days .

 2. Do we need to support blocking channel at all? That is just for the old
 clients, and I think we should probably just leave those be to reduce
 scope
 here.
 So blocking channel used not only by simple consumer but also
 ControllerChannelManager and controlled shutdown also. Are we planning on
 deprecating it. I think at least for ControllerChannelManager it makes
 sense  to have a blocking channel. If the users want to lock down the
 cluster i.e no PLAINTEXT channels are allowed than all the communication
 has to go through either SSL and KERBEROS so in this case we need add this
 capability to BlockingChannel.



 3. Can we change the APIs to drop the getters when that is not required by
 the API being implemented. In general we don't use setters and getters as
 a
 naming convention.

 My bad on adding getters and setters :). I’ll work on removing it and
 change the KIP accordingly. I still need some accessor methods though .

 Thanks,

 Harsha



 On April 21, 2015 at 2:51:15 PM, Jay Kreps (jay.kr...@gmail.com) wrote:

 Hey Sriharsha,

 Thanks for the excellent write-up.

 Couple of minor questions:

 1. Isn't the blocking handshake going to be a performance concern? Can we
 do the handshake non-blocking instead? If anything that causes connections
 to drop can incur blocking network roundtrips won't that eat up all the
 network threads immediately? I guess I would have to look at that code to
 know...

 2. Do we need to support blocking channel at all? That is just for the old
 clients, and I think we should