[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297733#comment-15297733
 ] 

Greg Fodor commented on KAFKA-3745:
---

Yes, the join key needs to be added to the final joined record. I suppose like 
you mention a nicer approach may be to just to emit the joined record with a 
null in place of where the key will go, and then do a simple .map after the 
join to fill in the key, this seems better than what I am doing now.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297660#comment-15297660
 ] 

Guozhang Wang commented on KAFKA-3745:
--

Just wanted to make sure that we are on the same page: even if the join key is 
not accessible in {{ValueJoiner}}, the returned key-value pair will still 
inherit the join key, so it is not "lost". I guess your scenario requires you 
to get the join-key value while doing the join computation inside 
{{ValueJoiner}}, but need to get confirmed.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-3749, fix "BOOSTRAP_SERVERS_DOC" typo

2016-05-23 Thread manuzhang
GitHub user manuzhang opened a pull request:

https://github.com/apache/kafka/pull/1420

KAFKA-3749, fix "BOOSTRAP_SERVERS_DOC" typo



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manuzhang/kafka KAFKA-3749

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1420.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1420


commit 748f5f12bcbed5e57eb1ea6a38e8e86c2bc9a1b0
Author: manuzhang 
Date:   2016-05-24T01:54:22Z

KAFKA-3749, fix "BOOSTRAP_SERVERS_DOC" typo




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3749) "BOOSTRAP_SERVERS_DOC" typo in CommonClientConfigs

2016-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297519#comment-15297519
 ] 

ASF GitHub Bot commented on KAFKA-3749:
---

GitHub user manuzhang opened a pull request:

https://github.com/apache/kafka/pull/1420

KAFKA-3749, fix "BOOSTRAP_SERVERS_DOC" typo



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manuzhang/kafka KAFKA-3749

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1420.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1420


commit 748f5f12bcbed5e57eb1ea6a38e8e86c2bc9a1b0
Author: manuzhang 
Date:   2016-05-24T01:54:22Z

KAFKA-3749, fix "BOOSTRAP_SERVERS_DOC" typo




> "BOOSTRAP_SERVERS_DOC" typo in CommonClientConfigs
> --
>
> Key: KAFKA-3749
> URL: https://issues.apache.org/jira/browse/KAFKA-3749
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.10.0.0
>Reporter: Manu Zhang
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3750) "BOOSTRAP_SERVERS_DOC" typo in CommonClientConfigs

2016-05-23 Thread Manu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manu Zhang resolved KAFKA-3750.
---
Resolution: Duplicate

sorry, only at-least-once is guaranteed. 

> "BOOSTRAP_SERVERS_DOC" typo in CommonClientConfigs
> --
>
> Key: KAFKA-3750
> URL: https://issues.apache.org/jira/browse/KAFKA-3750
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.10.0.0
>Reporter: Manu Zhang
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3750) "BOOSTRAP_SERVERS_DOC" typo in CommonClientConfigs

2016-05-23 Thread Manu Zhang (JIRA)
Manu Zhang created KAFKA-3750:
-

 Summary: "BOOSTRAP_SERVERS_DOC" typo in CommonClientConfigs
 Key: KAFKA-3750
 URL: https://issues.apache.org/jira/browse/KAFKA-3750
 Project: Kafka
  Issue Type: Improvement
  Components: config
Affects Versions: 0.10.0.0
Reporter: Manu Zhang
Priority: Trivial






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-05-23 Thread Jun Rao
Parth,

Thanks for the explanation. A couple of more questions.

110. What does getDelegationTokenAs mean?

111. What's the typical rate of getting and renewing delegation tokens?
That may have an impact on whether they should be directed to the
controller.

Jun

On Mon, May 23, 2016 at 1:19 PM, parth brahmbhatt <
brahmbhatt.pa...@gmail.com> wrote:

> Hi Jun,
>
> Thanks for reviewing.
>
> * We could add a Cluster action to add acls on who can request delegation
> tokens. I don't see the use case for that yet but down the line when we
> start supporting getDelegationTokenAs it will be necessary.
> * Yes we recommend tokens to be only used/distributed over secure channels.
> * Depending on what design we end up choosing Invalidation will be
> responsibility of every broker or controller.
> * I am not sure if I documented somewhere that invalidation will directly
> go through zookeeper but that is not the intent. Invalidation will either
> be request based or due to expiration. No direct zookeeper interaction from
> any client.
> * "Broker also stores the DelegationToken without the hmac in the
> zookeeper." : Sorry about the confusion. The sole purpose of zookeeper in
> this design is as distribution channel for tokens between all brokers and a
> layer that ensures only tokens that were generated by making a request to a
> broker will be accepted (more on this in second paragraph). The token
> consists of few elements (owner, renewer, uuid , expiration, hmac) , one of
> which is the finally generated hmac but hmac it self is derivable if you
> have all the other elements of the token + secret key to generate hmac.
> Given zookeeper does not provide SSL support we do not want the entire
> token to be wire transferred to zookeeper as that will be an insecure wire
> transfer. Instead we only store all the other elements of a delegation
> tokens. Brokers can read these elements and because they also have access
> to secret key they will be able to generate hmac on their end.
>
> One of the alternative proposed is to avoid zookeeper altogether. A Client
> will call broker with required information (owner, renwer, expiration) and
> get back (signed hmac, uuid). Broker won't store this in zookeeper. From
> this point a client can contact any broker with all the delegation token
> info (owner, rewner, expiration, hmac, uuid) the borker will regenerate the
> hmac and as long as it matches with hmac presented by client , broker will
> allow the request to authenticate.  Only problem with this approach is if
> the secret key is compromised any client can now generate random tokens and
> they will still be able to authenticate as any user they like. with
> zookeeper we guarantee that only tokens acquired via a broker (using some
> auth scheme other than delegation token) will be accepted. We need to
> discuss which proposal makes more sense and we can go over it in tomorrow's
> meeting.
>
> Also, can you forward the invite to me?
>
> Thanks
> Parth
>
>
>
> On Mon, May 23, 2016 at 10:35 AM, Jun Rao  wrote:
>
> > Thanks for the KIP. A few comments.
> >
> > 100. This potentially can be useful for Kafka Connect and Kafka rest
> proxy
> > where a worker agent will need to run a task on behalf of a client. We
> will
> > likely need to change how those services use Kafka clients
> > (producer/consumer). Instead of a shared client per worker, we will need
> a
> > client per user task since the authentication happens at the connection
> > level. For Kafka Connect, the renewer will be the workers. So, we
> probably
> > need to allow multiple renewers. For Kafka rest proxy, the renewer can
> > probably just be the creator of the token.
> >
> > 101. Do we need new acl on who can request delegation tokens?
> >
> > 102. Do we recommend people to send delegation tokens in an encrypted
> > channel?
> >
> > 103. Who is responsible for expiring tokens, every broker?
> >
> > 104. For invalidating tokens, would it be better to do it in a request
> > instead of going to ZK directly?
> >
> > 105. The terminology of client in the wiki sometimes refers to the end
> > client and some other times refers to the client using the delegation
> > tokens. It would be useful to distinguish between the two.
> >
> > 106. Could you explain the sentence "Broker also stores the
> DelegationToken
> > without the hmac in the zookeeper." a bit more? I thought the delegation
> > token is the hmac.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, May 23, 2016 at 9:22 AM, Jun Rao  wrote:
> >
> > > Hi, Harsha,
> > >
> > > Just sent out a KIP meeting invite. We can discuss this in the meeting
> > > tomorrow.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, May 19, 2016 at 8:47 AM, Harsha  wrote:
> > >
> > >> Hi All,
> > >>Can we have a KIP meeting around this. The KIP is up for
> > >>sometime and if there are any questions lets quickly hash
> out
> > >>details.
> > >>

[jira] [Created] (KAFKA-3749) "BOOSTRAP_SERVERS_DOC" typo in CommonClientConfigs

2016-05-23 Thread Manu Zhang (JIRA)
Manu Zhang created KAFKA-3749:
-

 Summary: "BOOSTRAP_SERVERS_DOC" typo in CommonClientConfigs
 Key: KAFKA-3749
 URL: https://issues.apache.org/jira/browse/KAFKA-3749
 Project: Kafka
  Issue Type: Improvement
  Components: config
Affects Versions: 0.10.0.0
Reporter: Manu Zhang
Priority: Trivial






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297477#comment-15297477
 ] 

Greg Fodor commented on KAFKA-3745:
---

Yep, I admit this is definitely not the most common case. But when it happens, 
the key is basically lost, so the workaround results in passing additional 
state through the system which seems undesirable.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Becket Qin
It might worth thinking a little further. We have discussed this before
that we want to avoid holding all the group metadata in memory.

I am thinking about the following end state:

1. Enable compression on the offset topic.
2. Instead of holding the entire group metadata in memory on the brokers,
each broker only keeps a [group -> Offset] map, the offset points to the
message in the offset topic which holds the latest metadata of the group.
3. DescribeGroupResponse will read from the offset topic directly like a
normal consumption, except that only exactly one message will be returned.
4. SyncGroupResponse will read the message, extract the assignment part and
send back the partition assignment. We can compress the partition
assignment before sends it out if we want.

Jiangjie (Becket) Qin

On Mon, May 23, 2016 at 5:08 PM, Jason Gustafson  wrote:

> >
> > Jason, doesn't gzip (or other compression) basically do this? If the
> topic
> > is a string and the topic is repeated throughout, won't compression
> > basically replace all repeated instances of it with an index reference to
> > the full string?
>
>
> Hey James, yeah, that's probably true, but keep in mind that the
> compression happens on the broker side. It would be nice to have a more
> compact representation so that get some benefit over the wire as well. This
> seems to be less of a concern here, so the bigger gains are probably from
> reducing the number of partitions that need to be listed individually.
>
> -Jason
>
> On Mon, May 23, 2016 at 4:23 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com>
> wrote:
>
> > When figuring out these optimizations, it's worth keeping in mind the
> > improvements when the message is uncompressed vs when it's compressed.
> >
> > When uncompressed:
> > Fixing the Assignment serialization to instead be a topic index into the
> > corresponding member's subscription list would usually be a good thing.
> >
> > I think the proposal is only worse when the topic names are small. The
> > Type.STRING we use in our protocol for the assignment's TOPIC_KEY_NAME is
> > limited in length to Short.MAX_VALUE, so our strings are first prepended
> > with 2 bytes to indicate the string size.
> >
> > The new proposal does worse when:
> > 2 + utf_encoded_string_payload_size < index_type_size
> > in other words when:
> > utf_encoded_string_payload_size < index_type_size - 2
> >
> > If the index type ends up being Type.INT32, then the proposal is worse
> when
> > the topic is length 1.
> > If the index type ends up being Type.INT64, then the proposal is worse
> when
> > the topic is less than length 6.
> >
> > When compressed:
> > As James Cheng brought up, I'm not sure how things change when
> compression
> > comes into the picture. This would be worth investigating.
> >
> > On Mon, May 23, 2016 at 4:05 PM, James Cheng 
> wrote:
> >
> > >
> > > > On May 23, 2016, at 10:59 AM, Jason Gustafson 
> > > wrote:
> > > >
> > > > 2. Maybe there's a better way to lay out the assignment without
> needing
> > > to
> > > > explicitly repeat the topic? For example, the leader could sort the
> > > topics
> > > > for each member and just use an integer to represent the index of
> each
> > > > topic within the sorted list (note this depends on the subscription
> > > > including the full topic list).
> > > >
> > > > Assignment -> [TopicIndex [Partition]]
> > > >
> > >
> > > Jason, doesn't gzip (or other compression) basically do this? If the
> > topic
> > > is a string and the topic is repeated throughout, won't compression
> > > basically replace all repeated instances of it with an index reference
> to
> > > the full string?
> > >
> > > -James
> > >
> > > > You could even combine these two options so that you have only 3
> > integers
> > > > for each topic assignment:
> > > >
> > > > Assignment -> [TopicIndex MinPartition MaxPartition]
> > > >
> > > > There may even be better options with a little more thought. All of
> > this
> > > is
> > > > just part of the client-side protocol, so it wouldn't require any
> > version
> > > > bumps on the broker. What do you think?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > >> The original concern is that regex may not be efficiently supported
> > > >> across-languages, but if there is a neat workaround I would love to
> > > learn.
> > > >>
> > > >> Guozhang
> > > >>
> > > >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma 
> > wrote:
> > > >>
> > > >>> +1 to Jun's suggestion.
> > > >>>
> > > >>> Having said that, as a general point, I think we should consider
> > > >> supporting
> > > >>> topic patterns in the wire protocol. It requires some thinking for
> > > >>> cross-language support, but it seems surmountable and it could make
> > > >> certain
> > > >>> operations a lot more efficient (the fact that a 

[jira] [Commented] (KAFKA-3744) Message format needs to identify serializer

2016-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297473#comment-15297473
 ] 

ASF GitHub Bot commented on KAFKA-3744:
---

GitHub user davek2 opened a pull request:

https://github.com/apache/kafka/pull/1419

Allocate 2 attribute bits to signal payload format

This documentation update proposes a mechanism to signal the serialization 
used for the message payload, resolving issue 
https://issues.apache.org/jira/browse/KAFKA-3744.  No change is made to the 
message structure; two previously-reserved bits in the attribute byte now have 
defined values, and for one of four cases the key field is defined to be a JSON 
object.

No change is required to messaging software.   No change is required to 
existing producer and consumer clients that use pre-agreed payload 
serialization. 

Misc notes:
1) Only one attribute bit would be needed if serialization were always 
signalled using the key field.  But it seems preferable to define two common 
serializations that do not have any dependency on the key field.  Selection of 
the common formats is arbitrary; text and avro seem reasonable but any two 
could be used instead.
2) The compression attribute uses three bits but only two are defined.  If 
the intent is to use all three bits for compression the undefined values should 
be listed as reserved; if not, the timestamp attribute can slide down to bit 2 
and serialization to bits 3~4, leaving bits 5~7 reserved.
3) It's unclear why message field 6 should be called "key" - a 
variable-length field is more likely to be described as "attributes" or 
"metadata", and 1-byte field 3 would be called "flags" instead of "attributes".
4) Field 8 is called "payload" under message format and "value" under 
on-disk format.  Changed to payload in both places.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davek2/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1419.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1419


commit 1d88b8d48cdfe67989bebf239f7588ca24e961b6
Author: Joe 
Date:   2016-05-24T00:32:04Z

Allocate 2 attribute bits for payload format




> Message format needs to identify serializer
> ---
>
> Key: KAFKA-3744
> URL: https://issues.apache.org/jira/browse/KAFKA-3744
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Kay
>Priority: Minor
>
> https://issues.apache.org/jira/browse/KAFKA-3698 was recently resolved with 
> https://github.com/apache/kafka/commit/27a19b964af35390d78e1b3b50bc03d23327f4d0.
> But Kafka documentation on message formats needs to be more explicit for new 
> users. Section 1.3 Step 4 says: "Send some messages" and takes lines of text 
> from the command line. Beginner's guide 
> (http://www.slideshare.net/miguno/apache-kafka-08-basic-training-verisign 
> Slide 104 says:
> {noformat}
>Kafka does not care about data format of msg payload
>Up to developer to handle serialization/deserialization
>   Common choices: Avro, JSON
> {noformat}
> If one producer sends lines of console text, another producer sends Avro, a 
> third producer sends JSON, and a fourth sends CBOR, how does the consumer 
> identify which deserializer to use for the payload?  The commit includes an 
> opaque K byte Key that could potentially include a codec identifier, but 
> provides no guidance on how to use it:
> {quote}
> "Leaving the key and value opaque is the right decision: there is a great 
> deal of progress being made on serialization libraries right now, and any 
> particular choice is unlikely to be right for all uses. Needless to say a 
> particular application using Kafka would likely mandate a particular 
> serialization type as part of its usage."
> {quote}
> Mandating any particular serialization is as unrealistic as mandating a 
> single mime-type for all web content.  There must be a way to signal the 
> serialization used to produce this message's V byte payload, and documenting 
> the existence of even a rudimentary codec registry with a few values (text, 
> Avro, JSON, CBOR) would establish the pattern to be used for future 
> serialization libraries.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: Allocate 2 attribute bits to signal payload fo...

2016-05-23 Thread davek2
GitHub user davek2 opened a pull request:

https://github.com/apache/kafka/pull/1419

Allocate 2 attribute bits to signal payload format

This documentation update proposes a mechanism to signal the serialization 
used for the message payload, resolving issue 
https://issues.apache.org/jira/browse/KAFKA-3744.  No change is made to the 
message structure; two previously-reserved bits in the attribute byte now have 
defined values, and for one of four cases the key field is defined to be a JSON 
object.

No change is required to messaging software.   No change is required to 
existing producer and consumer clients that use pre-agreed payload 
serialization. 

Misc notes:
1) Only one attribute bit would be needed if serialization were always 
signalled using the key field.  But it seems preferable to define two common 
serializations that do not have any dependency on the key field.  Selection of 
the common formats is arbitrary; text and avro seem reasonable but any two 
could be used instead.
2) The compression attribute uses three bits but only two are defined.  If 
the intent is to use all three bits for compression the undefined values should 
be listed as reserved; if not, the timestamp attribute can slide down to bit 2 
and serialization to bits 3~4, leaving bits 5~7 reserved.
3) It's unclear why message field 6 should be called "key" - a 
variable-length field is more likely to be described as "attributes" or 
"metadata", and 1-byte field 3 would be called "flags" instead of "attributes".
4) Field 8 is called "payload" under message format and "value" under 
on-disk format.  Changed to payload in both places.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davek2/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1419.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1419


commit 1d88b8d48cdfe67989bebf239f7588ca24e961b6
Author: Joe 
Date:   2016-05-24T00:32:04Z

Allocate 2 attribute bits for payload format




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-3396) Unauthorized topics are returned to the user

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3396:
---
Affects Version/s: 0.9.0.0
   0.10.0.0

> Unauthorized topics are returned to the user
> 
>
> Key: KAFKA-3396
> URL: https://issues.apache.org/jira/browse/KAFKA-3396
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.9.0.0, 0.10.0.0
>Reporter: Grant Henke
>Assignee: Edoardo Comar
> Fix For: 0.10.0.1
>
>
> Kafka's clients and protocol exposes unauthorized topics to the end user. 
> This is often considered a security hole. To some, the topic name is 
> considered sensitive information. Those that do not consider the name 
> sensitive, still consider it more information that allows a user to try and 
> circumvent security.  Instead, if a user does not have access to the topic, 
> the servers should act as if the topic does not exist. 
> To solve this some of the changes could include:
>   - The broker should not return a TOPIC_AUTHORIZATION(29) error for 
> requests (metadata, produce, fetch, etc) that include a topic that the user 
> does not have DESCRIBE access to.
>   - A user should not receive a TopicAuthorizationException when they do 
> not have DESCRIBE access to a topic or the cluster.
>  - The client should not maintain and expose a list of unauthorized 
> topics in org.apache.kafka.common.Cluster. 
> Other changes may be required that are not listed here. Further analysis is 
> needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3396) Unauthorized topics are returned to the user

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3396:
---
Component/s: security

> Unauthorized topics are returned to the user
> 
>
> Key: KAFKA-3396
> URL: https://issues.apache.org/jira/browse/KAFKA-3396
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Grant Henke
>Assignee: Edoardo Comar
> Fix For: 0.10.0.1
>
>
> Kafka's clients and protocol exposes unauthorized topics to the end user. 
> This is often considered a security hole. To some, the topic name is 
> considered sensitive information. Those that do not consider the name 
> sensitive, still consider it more information that allows a user to try and 
> circumvent security.  Instead, if a user does not have access to the topic, 
> the servers should act as if the topic does not exist. 
> To solve this some of the changes could include:
>   - The broker should not return a TOPIC_AUTHORIZATION(29) error for 
> requests (metadata, produce, fetch, etc) that include a topic that the user 
> does not have DESCRIBE access to.
>   - A user should not receive a TopicAuthorizationException when they do 
> not have DESCRIBE access to a topic or the cluster.
>  - The client should not maintain and expose a list of unauthorized 
> topics in org.apache.kafka.common.Cluster. 
> Other changes may be required that are not listed here. Further analysis is 
> needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3396) Unauthorized topics are returned to the user

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3396:
---
Fix Version/s: 0.10.0.1

> Unauthorized topics are returned to the user
> 
>
> Key: KAFKA-3396
> URL: https://issues.apache.org/jira/browse/KAFKA-3396
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Grant Henke
>Assignee: Edoardo Comar
> Fix For: 0.10.0.1
>
>
> Kafka's clients and protocol exposes unauthorized topics to the end user. 
> This is often considered a security hole. To some, the topic name is 
> considered sensitive information. Those that do not consider the name 
> sensitive, still consider it more information that allows a user to try and 
> circumvent security.  Instead, if a user does not have access to the topic, 
> the servers should act as if the topic does not exist. 
> To solve this some of the changes could include:
>   - The broker should not return a TOPIC_AUTHORIZATION(29) error for 
> requests (metadata, produce, fetch, etc) that include a topic that the user 
> does not have DESCRIBE access to.
>   - A user should not receive a TopicAuthorizationException when they do 
> not have DESCRIBE access to a topic or the cluster.
>  - The client should not maintain and expose a list of unauthorized 
> topics in org.apache.kafka.common.Cluster. 
> Other changes may be required that are not listed here. Further analysis is 
> needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Jason Gustafson
>
> Jason, doesn't gzip (or other compression) basically do this? If the topic
> is a string and the topic is repeated throughout, won't compression
> basically replace all repeated instances of it with an index reference to
> the full string?


Hey James, yeah, that's probably true, but keep in mind that the
compression happens on the broker side. It would be nice to have a more
compact representation so that get some benefit over the wire as well. This
seems to be less of a concern here, so the bigger gains are probably from
reducing the number of partitions that need to be listed individually.

-Jason

On Mon, May 23, 2016 at 4:23 PM, Onur Karaman 
wrote:

> When figuring out these optimizations, it's worth keeping in mind the
> improvements when the message is uncompressed vs when it's compressed.
>
> When uncompressed:
> Fixing the Assignment serialization to instead be a topic index into the
> corresponding member's subscription list would usually be a good thing.
>
> I think the proposal is only worse when the topic names are small. The
> Type.STRING we use in our protocol for the assignment's TOPIC_KEY_NAME is
> limited in length to Short.MAX_VALUE, so our strings are first prepended
> with 2 bytes to indicate the string size.
>
> The new proposal does worse when:
> 2 + utf_encoded_string_payload_size < index_type_size
> in other words when:
> utf_encoded_string_payload_size < index_type_size - 2
>
> If the index type ends up being Type.INT32, then the proposal is worse when
> the topic is length 1.
> If the index type ends up being Type.INT64, then the proposal is worse when
> the topic is less than length 6.
>
> When compressed:
> As James Cheng brought up, I'm not sure how things change when compression
> comes into the picture. This would be worth investigating.
>
> On Mon, May 23, 2016 at 4:05 PM, James Cheng  wrote:
>
> >
> > > On May 23, 2016, at 10:59 AM, Jason Gustafson 
> > wrote:
> > >
> > > 2. Maybe there's a better way to lay out the assignment without needing
> > to
> > > explicitly repeat the topic? For example, the leader could sort the
> > topics
> > > for each member and just use an integer to represent the index of each
> > > topic within the sorted list (note this depends on the subscription
> > > including the full topic list).
> > >
> > > Assignment -> [TopicIndex [Partition]]
> > >
> >
> > Jason, doesn't gzip (or other compression) basically do this? If the
> topic
> > is a string and the topic is repeated throughout, won't compression
> > basically replace all repeated instances of it with an index reference to
> > the full string?
> >
> > -James
> >
> > > You could even combine these two options so that you have only 3
> integers
> > > for each topic assignment:
> > >
> > > Assignment -> [TopicIndex MinPartition MaxPartition]
> > >
> > > There may even be better options with a little more thought. All of
> this
> > is
> > > just part of the client-side protocol, so it wouldn't require any
> version
> > > bumps on the broker. What do you think?
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > >
> > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang 
> > wrote:
> > >
> > >> The original concern is that regex may not be efficiently supported
> > >> across-languages, but if there is a neat workaround I would love to
> > learn.
> > >>
> > >> Guozhang
> > >>
> > >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma 
> wrote:
> > >>
> > >>> +1 to Jun's suggestion.
> > >>>
> > >>> Having said that, as a general point, I think we should consider
> > >> supporting
> > >>> topic patterns in the wire protocol. It requires some thinking for
> > >>> cross-language support, but it seems surmountable and it could make
> > >> certain
> > >>> operations a lot more efficient (the fact that a basic regex
> > subscription
> > >>> causes the consumer to request metadata for all topics is not great).
> > >>>
> > >>> Ismael
> > >>>
> > >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
> > >>> wrote:
> > >>>
> >  I like Jun's suggestion in changing the handling logics of single
> > large
> >  message on the consumer side.
> > 
> >  As for the case of "a single group subscribing to 3000 topics", with
> > >> 100
> >  consumers the 2.5Mb Gzip size is reasonable to me (when storing in
> ZK,
> > >> we
> >  also have the znode limit which is set to 1Mb by default, though
> > >>> admittedly
> >  it is only for one consumer). And if we do the change as Jun
> > suggested,
> >  2.5Mb on follower's memory pressure is OK I think.
> > 
> > 
> >  Guozhang
> > 
> > 
> >  On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> >  onurkaraman.apa...@gmail.com
> > > wrote:
> > 
> > > Results without compression:
> > > 1 consumer 292383 bytes
> > > 5 consumers 1079579 bytes * the tipping point
> > > 10 

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Liquan Pei
Would be interesting to see size after with compression on.

On Mon, May 23, 2016 at 4:23 PM, Onur Karaman 
wrote:

> When figuring out these optimizations, it's worth keeping in mind the
> improvements when the message is uncompressed vs when it's compressed.
>
> When uncompressed:
> Fixing the Assignment serialization to instead be a topic index into the
> corresponding member's subscription list would usually be a good thing.
>
> I think the proposal is only worse when the topic names are small. The
> Type.STRING we use in our protocol for the assignment's TOPIC_KEY_NAME is
> limited in length to Short.MAX_VALUE, so our strings are first prepended
> with 2 bytes to indicate the string size.
>
> The new proposal does worse when:
> 2 + utf_encoded_string_payload_size < index_type_size
> in other words when:
> utf_encoded_string_payload_size < index_type_size - 2
>
> If the index type ends up being Type.INT32, then the proposal is worse when
> the topic is length 1.
> If the index type ends up being Type.INT64, then the proposal is worse when
> the topic is less than length 6.
>
> When compressed:
> As James Cheng brought up, I'm not sure how things change when compression
> comes into the picture. This would be worth investigating.
>
> On Mon, May 23, 2016 at 4:05 PM, James Cheng  wrote:
>
> >
> > > On May 23, 2016, at 10:59 AM, Jason Gustafson 
> > wrote:
> > >
> > > 2. Maybe there's a better way to lay out the assignment without needing
> > to
> > > explicitly repeat the topic? For example, the leader could sort the
> > topics
> > > for each member and just use an integer to represent the index of each
> > > topic within the sorted list (note this depends on the subscription
> > > including the full topic list).
> > >
> > > Assignment -> [TopicIndex [Partition]]
> > >
> >
> > Jason, doesn't gzip (or other compression) basically do this? If the
> topic
> > is a string and the topic is repeated throughout, won't compression
> > basically replace all repeated instances of it with an index reference to
> > the full string?
> >
> > -James
> >
> > > You could even combine these two options so that you have only 3
> integers
> > > for each topic assignment:
> > >
> > > Assignment -> [TopicIndex MinPartition MaxPartition]
> > >
> > > There may even be better options with a little more thought. All of
> this
> > is
> > > just part of the client-side protocol, so it wouldn't require any
> version
> > > bumps on the broker. What do you think?
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > >
> > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang 
> > wrote:
> > >
> > >> The original concern is that regex may not be efficiently supported
> > >> across-languages, but if there is a neat workaround I would love to
> > learn.
> > >>
> > >> Guozhang
> > >>
> > >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma 
> wrote:
> > >>
> > >>> +1 to Jun's suggestion.
> > >>>
> > >>> Having said that, as a general point, I think we should consider
> > >> supporting
> > >>> topic patterns in the wire protocol. It requires some thinking for
> > >>> cross-language support, but it seems surmountable and it could make
> > >> certain
> > >>> operations a lot more efficient (the fact that a basic regex
> > subscription
> > >>> causes the consumer to request metadata for all topics is not great).
> > >>>
> > >>> Ismael
> > >>>
> > >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
> > >>> wrote:
> > >>>
> >  I like Jun's suggestion in changing the handling logics of single
> > large
> >  message on the consumer side.
> > 
> >  As for the case of "a single group subscribing to 3000 topics", with
> > >> 100
> >  consumers the 2.5Mb Gzip size is reasonable to me (when storing in
> ZK,
> > >> we
> >  also have the znode limit which is set to 1Mb by default, though
> > >>> admittedly
> >  it is only for one consumer). And if we do the change as Jun
> > suggested,
> >  2.5Mb on follower's memory pressure is OK I think.
> > 
> > 
> >  Guozhang
> > 
> > 
> >  On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> >  onurkaraman.apa...@gmail.com
> > > wrote:
> > 
> > > Results without compression:
> > > 1 consumer 292383 bytes
> > > 5 consumers 1079579 bytes * the tipping point
> > > 10 consumers 1855018 bytes
> > > 20 consumers 2780220 bytes
> > > 30 consumers 3705422 bytes
> > > 40 consumers 4630624 bytes
> > > 50 consumers 826 bytes
> > > 60 consumers 6480788 bytes
> > > 70 consumers 7405750 bytes
> > > 80 consumers 8330712 bytes
> > > 90 consumers 9255674 bytes
> > > 100 consumers 10180636 bytes
> > >
> > > So it looks like gzip compression shrinks the message size by 4x.
> > >
> > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> > >
> > >> Onur,

[jira] [Created] (KAFKA-3748) Add consumer-property to console tools consumer (similar to --producer-property)

2016-05-23 Thread Bharat Viswanadham (JIRA)
Bharat Viswanadham created KAFKA-3748:
-

 Summary: Add consumer-property to console tools consumer (similar 
to --producer-property)
 Key: KAFKA-3748
 URL: https://issues.apache.org/jira/browse/KAFKA-3748
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.9.0.0
Reporter: Bharat Viswanadham
Assignee: Bharat Viswanadham
 Fix For: 0.9.0.1, 0.9.0.0


Add --consumer-property to the console consumer.
Creating this task from the comment given in KAFKA-3567.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3709) Create project security page

2016-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297378#comment-15297378
 ] 

ASF GitHub Bot commented on KAFKA-3709:
---

Github user ijuma commented on the pull request:

https://github.com/apache/kafka-site/pull/12#issuecomment-221129684
  
Thanks Jun. I emailed active committers informing them about the creation 
of the mailing list. I sent an email to the list from a different account and 
verified that messages are going through.

I'll merge this PR tomorrow morning if there are no objections.


> Create project security page
> 
>
> Key: KAFKA-3709
> URL: https://issues.apache.org/jira/browse/KAFKA-3709
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>
> We are creating a security@k.a.o mailing list to receive reports of potential 
> vulnerabilities. Now that Kafka has security in place, the community might 
> starts receiving vulnerability reports and we need to follow the guidelines 
> here:
> http://www.apache.org/security/
> Specifically, security issues are better handled in a project-specific list. 
> This jira is to create a web page that informs users and contributors of how 
> we are supposed to handle security issues. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka-site pull request: KAFKA-3709: Create a project security pag...

2016-05-23 Thread ijuma
Github user ijuma commented on the pull request:

https://github.com/apache/kafka-site/pull/12#issuecomment-221129684
  
Thanks Jun. I emailed active committers informing them about the creation 
of the mailing list. I sent an email to the list from a different account and 
verified that messages are going through.

I'll merge this PR tomorrow morning if there are no objections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Onur Karaman
When figuring out these optimizations, it's worth keeping in mind the
improvements when the message is uncompressed vs when it's compressed.

When uncompressed:
Fixing the Assignment serialization to instead be a topic index into the
corresponding member's subscription list would usually be a good thing.

I think the proposal is only worse when the topic names are small. The
Type.STRING we use in our protocol for the assignment's TOPIC_KEY_NAME is
limited in length to Short.MAX_VALUE, so our strings are first prepended
with 2 bytes to indicate the string size.

The new proposal does worse when:
2 + utf_encoded_string_payload_size < index_type_size
in other words when:
utf_encoded_string_payload_size < index_type_size - 2

If the index type ends up being Type.INT32, then the proposal is worse when
the topic is length 1.
If the index type ends up being Type.INT64, then the proposal is worse when
the topic is less than length 6.

When compressed:
As James Cheng brought up, I'm not sure how things change when compression
comes into the picture. This would be worth investigating.

On Mon, May 23, 2016 at 4:05 PM, James Cheng  wrote:

>
> > On May 23, 2016, at 10:59 AM, Jason Gustafson 
> wrote:
> >
> > 2. Maybe there's a better way to lay out the assignment without needing
> to
> > explicitly repeat the topic? For example, the leader could sort the
> topics
> > for each member and just use an integer to represent the index of each
> > topic within the sorted list (note this depends on the subscription
> > including the full topic list).
> >
> > Assignment -> [TopicIndex [Partition]]
> >
>
> Jason, doesn't gzip (or other compression) basically do this? If the topic
> is a string and the topic is repeated throughout, won't compression
> basically replace all repeated instances of it with an index reference to
> the full string?
>
> -James
>
> > You could even combine these two options so that you have only 3 integers
> > for each topic assignment:
> >
> > Assignment -> [TopicIndex MinPartition MaxPartition]
> >
> > There may even be better options with a little more thought. All of this
> is
> > just part of the client-side protocol, so it wouldn't require any version
> > bumps on the broker. What do you think?
> >
> > Thanks,
> > Jason
> >
> >
> >
> >
> > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang 
> wrote:
> >
> >> The original concern is that regex may not be efficiently supported
> >> across-languages, but if there is a neat workaround I would love to
> learn.
> >>
> >> Guozhang
> >>
> >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma  wrote:
> >>
> >>> +1 to Jun's suggestion.
> >>>
> >>> Having said that, as a general point, I think we should consider
> >> supporting
> >>> topic patterns in the wire protocol. It requires some thinking for
> >>> cross-language support, but it seems surmountable and it could make
> >> certain
> >>> operations a lot more efficient (the fact that a basic regex
> subscription
> >>> causes the consumer to request metadata for all topics is not great).
> >>>
> >>> Ismael
> >>>
> >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
> >>> wrote:
> >>>
>  I like Jun's suggestion in changing the handling logics of single
> large
>  message on the consumer side.
> 
>  As for the case of "a single group subscribing to 3000 topics", with
> >> 100
>  consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK,
> >> we
>  also have the znode limit which is set to 1Mb by default, though
> >>> admittedly
>  it is only for one consumer). And if we do the change as Jun
> suggested,
>  2.5Mb on follower's memory pressure is OK I think.
> 
> 
>  Guozhang
> 
> 
>  On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
>  onurkaraman.apa...@gmail.com
> > wrote:
> 
> > Results without compression:
> > 1 consumer 292383 bytes
> > 5 consumers 1079579 bytes * the tipping point
> > 10 consumers 1855018 bytes
> > 20 consumers 2780220 bytes
> > 30 consumers 3705422 bytes
> > 40 consumers 4630624 bytes
> > 50 consumers 826 bytes
> > 60 consumers 6480788 bytes
> > 70 consumers 7405750 bytes
> > 80 consumers 8330712 bytes
> > 90 consumers 9255674 bytes
> > 100 consumers 10180636 bytes
> >
> > So it looks like gzip compression shrinks the message size by 4x.
> >
> > On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> >
> >> Onur,
> >>
> >> Thanks for the investigation.
> >>
> >> Another option is to just fix how we deal with the case when a
> >>> message
>  is
> >> larger than the fetch size. Today, if the fetch size is smaller
> >> than
>  the
> >> fetch size, the consumer will get stuck. Instead, we can simply
> >>> return
> > the
> >> full message if it's larger than the fetch size w/o requiring 

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread James Cheng

> On May 23, 2016, at 10:59 AM, Jason Gustafson  wrote:
> 
> 2. Maybe there's a better way to lay out the assignment without needing to
> explicitly repeat the topic? For example, the leader could sort the topics
> for each member and just use an integer to represent the index of each
> topic within the sorted list (note this depends on the subscription
> including the full topic list).
> 
> Assignment -> [TopicIndex [Partition]]
> 

Jason, doesn't gzip (or other compression) basically do this? If the topic is a 
string and the topic is repeated throughout, won't compression basically 
replace all repeated instances of it with an index reference to the full string?

-James

> You could even combine these two options so that you have only 3 integers
> for each topic assignment:
> 
> Assignment -> [TopicIndex MinPartition MaxPartition]
> 
> There may even be better options with a little more thought. All of this is
> just part of the client-side protocol, so it wouldn't require any version
> bumps on the broker. What do you think?
> 
> Thanks,
> Jason
> 
> 
> 
> 
> On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang  wrote:
> 
>> The original concern is that regex may not be efficiently supported
>> across-languages, but if there is a neat workaround I would love to learn.
>> 
>> Guozhang
>> 
>> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma  wrote:
>> 
>>> +1 to Jun's suggestion.
>>> 
>>> Having said that, as a general point, I think we should consider
>> supporting
>>> topic patterns in the wire protocol. It requires some thinking for
>>> cross-language support, but it seems surmountable and it could make
>> certain
>>> operations a lot more efficient (the fact that a basic regex subscription
>>> causes the consumer to request metadata for all topics is not great).
>>> 
>>> Ismael
>>> 
>>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
>>> wrote:
>>> 
 I like Jun's suggestion in changing the handling logics of single large
 message on the consumer side.
 
 As for the case of "a single group subscribing to 3000 topics", with
>> 100
 consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK,
>> we
 also have the znode limit which is set to 1Mb by default, though
>>> admittedly
 it is only for one consumer). And if we do the change as Jun suggested,
 2.5Mb on follower's memory pressure is OK I think.
 
 
 Guozhang
 
 
 On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
 onurkaraman.apa...@gmail.com
> wrote:
 
> Results without compression:
> 1 consumer 292383 bytes
> 5 consumers 1079579 bytes * the tipping point
> 10 consumers 1855018 bytes
> 20 consumers 2780220 bytes
> 30 consumers 3705422 bytes
> 40 consumers 4630624 bytes
> 50 consumers 826 bytes
> 60 consumers 6480788 bytes
> 70 consumers 7405750 bytes
> 80 consumers 8330712 bytes
> 90 consumers 9255674 bytes
> 100 consumers 10180636 bytes
> 
> So it looks like gzip compression shrinks the message size by 4x.
> 
> On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> 
>> Onur,
>> 
>> Thanks for the investigation.
>> 
>> Another option is to just fix how we deal with the case when a
>>> message
 is
>> larger than the fetch size. Today, if the fetch size is smaller
>> than
 the
>> fetch size, the consumer will get stuck. Instead, we can simply
>>> return
> the
>> full message if it's larger than the fetch size w/o requiring the
> consumer
>> to manually adjust the fetch size. On the broker side, to serve a
>>> fetch
>> request, we already do an index lookup and then scan the log a bit
>> to
> find
>> the message with the requested offset. We can just check the size
>> of
 that
>> message and return the full message if its size is larger than the
 fetch
>> size. This way, fetch size is really for performance optimization,
>>> i.e.
> in
>> the common case, we will not return more bytes than fetch size, but
>>> if
>> there is a large message, we will return more bytes than the
>>> specified
>> fetch size. In practice, large messages are rare. So, it shouldn't
> increase
>> the memory consumption on the client too much.
>> 
>> Jun
>> 
>> On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
>> onurkaraman.apa...@gmail.com>
>> wrote:
>> 
>>> Hey everyone. So I started doing some tests on the new
>> consumer/coordinator
>>> to see if it could handle more strenuous use cases like mirroring
>> clusters
>>> with thousands of topics and thought I'd share whatever I have so
 far.
>>> 
>>> The scalability limit: the amount of group metadata we can fit
>> into
 one
>>> message
>>> 
>>> Some background:
>>> Client-side assignment is implemented in two 

[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297299#comment-15297299
 ] 

Guozhang Wang commented on KAFKA-3745:
--

I may be still getting your motivations wrong, so just for clarification: for 
{{ValueJoiner}} interface it has

{code}
R apply(V1 value1, V2 value2);
{code}

And hence the key is not provided in the function, since we thought for common 
cases it is not needed for the join computation itself. Are you suggesting to 
add this key as:

{code}
R apply(K key, V1 value1, V2 value2);
{code}

for your specific scenario? From the description above I am not sure if that is 
the case.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297250#comment-15297250
 ] 

Greg Fodor commented on KAFKA-3745:
---

sure. we are left joining a KTable against a KStream. The entry in the KTable 
may be null since it's an outer join, so it can't be relied upon to provide the 
key, and the join key for the KStream is derived data from one of the fields 
(in particular it's a parsed substring of one of the columns.) the workaround 
right now is we do that parsing in a previous step and then emit a record with 
the value.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-0.10.0-jdk7 #104

2016-05-23 Thread Apache Jenkins Server
See 

Changes:

[ismael] KAFKA-3393; Updated the docs to reflect the deprecation of

[ismael] KAFKA-3258; Delete broker topic metrics of deleted topics

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu3 (Ubuntu ubuntu legacy-ubuntu) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/0.10.0^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/0.10.0^{commit} # timeout=10
Checking out Revision a86ae26fcb18d307d5d54f7061df613ce148fc33 
(refs/remotes/origin/0.10.0)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f a86ae26fcb18d307d5d54f7061df613ce148fc33
 > git rev-list a1838755a206fc769ef1fe947eafe3ff245afeb6 # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-0.10.0-jdk7] $ /bin/bash -xe /tmp/hudson2127636917590360743.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 41.494 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-0.10.0-jdk7] $ /bin/bash -xe /tmp/hudson2510383336211240427.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.13/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
Build file 
'/x1/jenkins/jenkins-slave/workspace/kafka-0.10.0-jdk7/build.gradle': line 231
useAnt has been deprecated and is scheduled to be removed in Gradle 3.0. The 
Ant-Based Scala compiler is deprecated, please see 
https://docs.gradle.org/current/userguide/scala_plugin.html.
:clean UP-TO-DATE
:clients:clean
:connect:clean UP-TO-DATE
:core:clean
:examples:clean
:log4j-appender:clean
:streams:clean
:tools:clean
:connect:api:clean
:connect:file:clean
:connect:json:clean
:connect:runtime:clean
:streams:examples:clean
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-0.10.0-jdk7:clients:compileJavaNote: Some input files use unchecked or 
unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:kafka-0.10.0-jdk7:clients:processResources UP-TO-DATE
:kafka-0.10.0-jdk7:clients:classes
:kafka-0.10.0-jdk7:clients:determineCommitId UP-TO-DATE
:kafka-0.10.0-jdk7:clients:createVersionFile
:kafka-0.10.0-jdk7:clients:jar
:kafka-0.10.0-jdk7:core:compileJava UP-TO-DATE
:kafka-0.10.0-jdk7:core:compileScala
/x1/jenkins/jenkins-slave/workspace/kafka-0.10.0-jdk7/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
/x1/jenkins/jenkins-slave/workspace/kafka-0.10.0-jdk7/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
/x1/jenkins/jenkins-slave/workspace/kafka-0.10.0-jdk7/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 expireTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {

  ^

[jira] [Updated] (KAFKA-3258) BrokerTopicMetrics of deleted topics are never deleted

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3258:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.0.1

> BrokerTopicMetrics of deleted topics are never deleted
> --
>
> Key: KAFKA-3258
> URL: https://issues.apache.org/jira/browse/KAFKA-3258
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.10.0.1
>
>
> Per-topic BrokerTopicMetrics generated by brokers are not deleted even when 
> the topic is deleted. This shows misleading metrics in metrics reporters long 
> after a topic is deleted and is also a resource leak.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Jason Gustafson
>
> Assignments also can be optimized with some tricks like the ones Jason
> mentioned, but I think these end up being specific to the assignment
> strategy, making it hard to keep a generic ConsumerProtocol.


Leaving the protocol generic would be ideal since tools (such as
consumer-groups.sh) depend on it. That said, we specifically designed it to
allow assignors to use their own schemas, so I don't see a major problem
taking advantage of that and we could hide the detail from tools easily
enough. Changing the message format used by the coordinator for group
metadata seems doable as well as long as it only affects the value schema
(and it sounds like it would). I'm a little less optimistic about reversing
the member -> subscription mapping as a general solution, however, since
many assignors will probably have consumer-specific data (e.g. in sticky or
rack-based assignment), but I'm definitely in favor of laying out all the
options.

Thanks,
Jason

On Mon, May 23, 2016 at 1:37 PM, Guozhang Wang  wrote:

> Discussed with Jason about several optimization proposals, and summarize
> them here:
>
> ---
> Today the offset topic message value format is:
>
> [member subscription assignment]
>
> where subscription and assignment are just bytes to the brokers, and
> consumers know the schema to interpret them; usually subscriptions contains
> the topic list
>
> [Topic-name]
>
> , and assignment contains the topic-partitions list
>
> [Topic-name [Partition-Id]].
>
>
> Now assuming we have 100 consumer subscribing to 3000 topics with each 25
> bytes on average, and assuming each consumer gets two partitions of each of
> the subscribed topics (I know the second assumption maybe off, but just
> keep it as for illustration purposes), the subscription will take about
> 3000 * 25 * 100 bytes in total, and the assignment will take about 3000 *
> (25 + 8 * 2) * 100 bytes, before compressed.
>
>
> One proposal from Jason is that we can change the assignment bytes to
> [Index [Partition-Id]], where Index is the the index of the topic in the
> subscription bytes, that saves about (25 - 8) * 3000 * 100 bytes;
>
> Another proposal from Onur goes further, that we can change the format of
> the message value to:
>
> [subscription [member assignment]]
>
> , which, combining with Jason's approach, will further reduce the
> subscription size from 3000 * 25 * 100 to 3000 * 25 * 1 in the best case.
>
>
> I think Jason's proposal is a good start as it does not change the protocol
> at all (again, to brokers it is just bytes), but only upgrading the format
> protocol and the interpretation logic on the consumer-side; and hence a new
> (say 0.11 after this optimization) consumer client can even work with an
> older broker. And it's saving is already quite beneficial.
>
>
> Guozhang
>
>
> On Mon, May 23, 2016 at 11:10 AM, Ismael Juma  wrote:
>
> > Hi Jason,
> >
> > It would definitely be interesting to try a few of these optimisations
> on a
> > real world example to quantify the impact.
> >
> > Ismael
> >
> > On Mon, May 23, 2016 at 6:59 PM, Jason Gustafson 
> > wrote:
> >
> > > Hey Onur,
> > >
> > > Thanks for the investigation. I agree with Ismael that pushing regex or
> > > some kind of patterns into the protocol would help for communicating
> > > subscriptions and for avoiding unnecessary overhead when fetching topic
> > > metadata, but it doesn't seem like it would address the main issue here
> > > since the leader would still have to split the regex into the
> individual
> > > topics in order to do the partition assignment. It feels like we may
> > need a
> > > different approach for representing assignments at this scale.
> > >
> > > One minor question I had is which assignment strategy you used in these
> > > tests? Do you see any difference in overhead overall if you change
> > between
> > > "range" and "round-robin"? One thought that occurred to me is that you
> > > could reduce the redundancy in the leader sync group by trying to limit
> > the
> > > number of consumers that received partitions from each topic. Ideally,
> > each
> > > topic would be given to exactly one member so that its name was
> repeated
> > > only once in the leader's sync group. I guess the compression before
> > > writing the group metadata would end up removing this redundancy
> anyway,
> > > but still might be worth investigating.
> > >
> > > It actually seems to me that there's quite a bit of room for cleverness
> > > here without changing the protocol. Here are a couple ideas:
> > >
> > > 1. Currently we list all of the assigned partitions explicitly in the
> > > assignment:
> > >
> > > Assignment -> [Topic [Partition]]
> > >
> > > Alternatively, you could let the assignment contain just a minimum and
> > > maximum partition:
> > >
> > > Assignment -> [Topic MinPartition MaxPartition]
> > >
> > > Obviously this would only fit range-based assignment approaches, but if
> > you
> > > 

[jira] [Updated] (KAFKA-3393) Update site docs and javadoc based on max.block.ms changes

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3393:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.0.1

> Update site docs and javadoc based on max.block.ms changes
> --
>
> Key: KAFKA-3393
> URL: https://issues.apache.org/jira/browse/KAFKA-3393
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>Assignee: Mayuresh Gharat
> Fix For: 0.10.0.1
>
>
> KAFKA-2120 deprecated block.on.buffer.full in favor of max.block.ms. This 
> change alters the behavior of the KafkaProducer. Users may not be expecting 
> that change when upgrading from the 0.8.x clients. We should:
> - Update the KafkaProducer javadoc
> - Update the ProducerConfig docs and the generated site docs
> - Add an entry to the 0.9 upgrade notes (if appropriate) 
> Related discussion can be seen here: https://github.com/apache/kafka/pull/1058



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Guozhang Wang
Discussed with Jason about several optimization proposals, and summarize
them here:

---
Today the offset topic message value format is:

[member subscription assignment]

where subscription and assignment are just bytes to the brokers, and
consumers know the schema to interpret them; usually subscriptions contains
the topic list

[Topic-name]

, and assignment contains the topic-partitions list

[Topic-name [Partition-Id]].


Now assuming we have 100 consumer subscribing to 3000 topics with each 25
bytes on average, and assuming each consumer gets two partitions of each of
the subscribed topics (I know the second assumption maybe off, but just
keep it as for illustration purposes), the subscription will take about
3000 * 25 * 100 bytes in total, and the assignment will take about 3000 *
(25 + 8 * 2) * 100 bytes, before compressed.


One proposal from Jason is that we can change the assignment bytes to
[Index [Partition-Id]], where Index is the the index of the topic in the
subscription bytes, that saves about (25 - 8) * 3000 * 100 bytes;

Another proposal from Onur goes further, that we can change the format of
the message value to:

[subscription [member assignment]]

, which, combining with Jason's approach, will further reduce the
subscription size from 3000 * 25 * 100 to 3000 * 25 * 1 in the best case.


I think Jason's proposal is a good start as it does not change the protocol
at all (again, to brokers it is just bytes), but only upgrading the format
protocol and the interpretation logic on the consumer-side; and hence a new
(say 0.11 after this optimization) consumer client can even work with an
older broker. And it's saving is already quite beneficial.


Guozhang


On Mon, May 23, 2016 at 11:10 AM, Ismael Juma  wrote:

> Hi Jason,
>
> It would definitely be interesting to try a few of these optimisations on a
> real world example to quantify the impact.
>
> Ismael
>
> On Mon, May 23, 2016 at 6:59 PM, Jason Gustafson 
> wrote:
>
> > Hey Onur,
> >
> > Thanks for the investigation. I agree with Ismael that pushing regex or
> > some kind of patterns into the protocol would help for communicating
> > subscriptions and for avoiding unnecessary overhead when fetching topic
> > metadata, but it doesn't seem like it would address the main issue here
> > since the leader would still have to split the regex into the individual
> > topics in order to do the partition assignment. It feels like we may
> need a
> > different approach for representing assignments at this scale.
> >
> > One minor question I had is which assignment strategy you used in these
> > tests? Do you see any difference in overhead overall if you change
> between
> > "range" and "round-robin"? One thought that occurred to me is that you
> > could reduce the redundancy in the leader sync group by trying to limit
> the
> > number of consumers that received partitions from each topic. Ideally,
> each
> > topic would be given to exactly one member so that its name was repeated
> > only once in the leader's sync group. I guess the compression before
> > writing the group metadata would end up removing this redundancy anyway,
> > but still might be worth investigating.
> >
> > It actually seems to me that there's quite a bit of room for cleverness
> > here without changing the protocol. Here are a couple ideas:
> >
> > 1. Currently we list all of the assigned partitions explicitly in the
> > assignment:
> >
> > Assignment -> [Topic [Partition]]
> >
> > Alternatively, you could let the assignment contain just a minimum and
> > maximum partition:
> >
> > Assignment -> [Topic MinPartition MaxPartition]
> >
> > Obviously this would only fit range-based assignment approaches, but if
> you
> > have a lot of partitions per topic, this could be a big win.
> >
> > 2. Maybe there's a better way to lay out the assignment without needing
> to
> > explicitly repeat the topic? For example, the leader could sort the
> topics
> > for each member and just use an integer to represent the index of each
> > topic within the sorted list (note this depends on the subscription
> > including the full topic list).
> >
> > Assignment -> [TopicIndex [Partition]]
> >
> > You could even combine these two options so that you have only 3 integers
> > for each topic assignment:
> >
> > Assignment -> [TopicIndex MinPartition MaxPartition]
> >
> > There may even be better options with a little more thought. All of this
> is
> > just part of the client-side protocol, so it wouldn't require any version
> > bumps on the broker. What do you think?
> >
> > Thanks,
> > Jason
> >
> >
> >
> >
> > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang 
> wrote:
> >
> > > The original concern is that regex may not be efficiently supported
> > > across-languages, but if there is a neat workaround I would love to
> > learn.
> > >
> > > Guozhang
> > >
> > > On Mon, May 23, 2016 at 5:31 AM, Ismael Juma 
> wrote:
> > 

Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-05-23 Thread parth brahmbhatt
Hi Jun,

Thanks for reviewing.

* We could add a Cluster action to add acls on who can request delegation
tokens. I don't see the use case for that yet but down the line when we
start supporting getDelegationTokenAs it will be necessary.
* Yes we recommend tokens to be only used/distributed over secure channels.
* Depending on what design we end up choosing Invalidation will be
responsibility of every broker or controller.
* I am not sure if I documented somewhere that invalidation will directly
go through zookeeper but that is not the intent. Invalidation will either
be request based or due to expiration. No direct zookeeper interaction from
any client.
* "Broker also stores the DelegationToken without the hmac in the
zookeeper." : Sorry about the confusion. The sole purpose of zookeeper in
this design is as distribution channel for tokens between all brokers and a
layer that ensures only tokens that were generated by making a request to a
broker will be accepted (more on this in second paragraph). The token
consists of few elements (owner, renewer, uuid , expiration, hmac) , one of
which is the finally generated hmac but hmac it self is derivable if you
have all the other elements of the token + secret key to generate hmac.
Given zookeeper does not provide SSL support we do not want the entire
token to be wire transferred to zookeeper as that will be an insecure wire
transfer. Instead we only store all the other elements of a delegation
tokens. Brokers can read these elements and because they also have access
to secret key they will be able to generate hmac on their end.

One of the alternative proposed is to avoid zookeeper altogether. A Client
will call broker with required information (owner, renwer, expiration) and
get back (signed hmac, uuid). Broker won't store this in zookeeper. From
this point a client can contact any broker with all the delegation token
info (owner, rewner, expiration, hmac, uuid) the borker will regenerate the
hmac and as long as it matches with hmac presented by client , broker will
allow the request to authenticate.  Only problem with this approach is if
the secret key is compromised any client can now generate random tokens and
they will still be able to authenticate as any user they like. with
zookeeper we guarantee that only tokens acquired via a broker (using some
auth scheme other than delegation token) will be accepted. We need to
discuss which proposal makes more sense and we can go over it in tomorrow's
meeting.

Also, can you forward the invite to me?

Thanks
Parth



On Mon, May 23, 2016 at 10:35 AM, Jun Rao  wrote:

> Thanks for the KIP. A few comments.
>
> 100. This potentially can be useful for Kafka Connect and Kafka rest proxy
> where a worker agent will need to run a task on behalf of a client. We will
> likely need to change how those services use Kafka clients
> (producer/consumer). Instead of a shared client per worker, we will need a
> client per user task since the authentication happens at the connection
> level. For Kafka Connect, the renewer will be the workers. So, we probably
> need to allow multiple renewers. For Kafka rest proxy, the renewer can
> probably just be the creator of the token.
>
> 101. Do we need new acl on who can request delegation tokens?
>
> 102. Do we recommend people to send delegation tokens in an encrypted
> channel?
>
> 103. Who is responsible for expiring tokens, every broker?
>
> 104. For invalidating tokens, would it be better to do it in a request
> instead of going to ZK directly?
>
> 105. The terminology of client in the wiki sometimes refers to the end
> client and some other times refers to the client using the delegation
> tokens. It would be useful to distinguish between the two.
>
> 106. Could you explain the sentence "Broker also stores the DelegationToken
> without the hmac in the zookeeper." a bit more? I thought the delegation
> token is the hmac.
>
> Thanks,
>
> Jun
>
>
> On Mon, May 23, 2016 at 9:22 AM, Jun Rao  wrote:
>
> > Hi, Harsha,
> >
> > Just sent out a KIP meeting invite. We can discuss this in the meeting
> > tomorrow.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, May 19, 2016 at 8:47 AM, Harsha  wrote:
> >
> >> Hi All,
> >>Can we have a KIP meeting around this. The KIP is up for
> >>sometime and if there are any questions lets quickly hash out
> >>details.
> >>
> >> Thanks,
> >> Harsha
> >>
> >> On Thu, May 19, 2016, at 08:40 AM, parth brahmbhatt wrote:
> >> > That is what the hadoop echo system uses so no good reason really. We
> >> > could
> >> > change it to whatever is the newest recommended standard is.
> >> >
> >> > Thanks
> >> > Parth
> >> >
> >> > On Thu, May 19, 2016 at 3:33 AM, Ismael Juma 
> wrote:
> >> >
> >> > > Hi Parth,
> >> > >
> >> > > Thanks for the KIP. I only started reviewing this and may have
> >> additional
> >> > > questions later. The immediate question that came to 

[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296991#comment-15296991
 ] 

Guozhang Wang commented on KAFKA-3745:
--

[~gfodor] Actually can you share your join use cases where the {{ValueJoiner}} 
itself needs to access the join key? Our original thought is that in many cases 
the joiners do not depend on the specific join key values.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-05-23 Thread Gwen Shapira
Hi Jun,

Few of my answers below (since these are things we discussed, or that I
thought about)

On Mon, May 23, 2016 at 10:35 AM, Jun Rao  wrote:

> Thanks for the KIP. A few comments.
>
> 100. This potentially can be useful for Kafka Connect and Kafka rest proxy
> where a worker agent will need to run a task on behalf of a client. We will
> likely need to change how those services use Kafka clients
> (producer/consumer). Instead of a shared client per worker, we will need a
> client per user task since the authentication happens at the connection
> level. For Kafka Connect, the renewer will be the workers. So, we probably
> need to allow multiple renewers. For Kafka rest proxy, the renewer can
> probably just be the creator of the token.
>

For the connector, the token for each connect task with be created by the
connector that manages the task. It can also be responsible for renewing.


>
> 101. Do we need new acl on who can request delegation tokens?
>

We could, but I'd prefer not to have that. I can't see a use case of
preventing certain users from delegating, since they can't delegate more
than the privileges they already have.


>
> 102. Do we recommend people to send delegation tokens in an encrypted
> channel?
>


Definitely. But just like SASL/PLAIN, we can leave both options open.


>
> 103. Who is responsible for expiring tokens, every broker?
>

I think token validity can be checked when they are used, like in SASL?


>
> 104. For invalidating tokens, would it be better to do it in a request
> instead of going to ZK directly?
>

+1 to this.


>
> 105. The terminology of client in the wiki sometimes refers to the end
> client and some other times refers to the client using the delegation
> tokens. It would be useful to distinguish between the two.
>
> 106. Could you explain the sentence "Broker also stores the DelegationToken
> without the hmac in the zookeeper." a bit more? I thought the delegation
> token is the hmac.
>
> Thanks,
>
> Jun
>
>
> On Mon, May 23, 2016 at 9:22 AM, Jun Rao  wrote:
>
> > Hi, Harsha,
> >
> > Just sent out a KIP meeting invite. We can discuss this in the meeting
> > tomorrow.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, May 19, 2016 at 8:47 AM, Harsha  wrote:
> >
> >> Hi All,
> >>Can we have a KIP meeting around this. The KIP is up for
> >>sometime and if there are any questions lets quickly hash out
> >>details.
> >>
> >> Thanks,
> >> Harsha
> >>
> >> On Thu, May 19, 2016, at 08:40 AM, parth brahmbhatt wrote:
> >> > That is what the hadoop echo system uses so no good reason really. We
> >> > could
> >> > change it to whatever is the newest recommended standard is.
> >> >
> >> > Thanks
> >> > Parth
> >> >
> >> > On Thu, May 19, 2016 at 3:33 AM, Ismael Juma 
> wrote:
> >> >
> >> > > Hi Parth,
> >> > >
> >> > > Thanks for the KIP. I only started reviewing this and may have
> >> additional
> >> > > questions later. The immediate question that came to mind is our
> >> choice of
> >> > > "DIGEST-MD5" even though it's marked as OBSOLETE in the IANA
> Registry
> >> of
> >> > > SASL mechanisms and the original RFC (2831) has been moved to
> Historic
> >> > > status:
> >> > >
> >> > > https://tools.ietf.org/html/rfc6331
> >> > >
> http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml
> >> > >
> >> > > What is the reasoning behind that choice?
> >> > >
> >> > > Thanks,
> >> > > Ismael
> >> > >
> >> > > On Fri, May 13, 2016 at 11:29 PM, Gwen Shapira 
> >> wrote:
> >> > >
> >> > > > Also comments inline :)
> >> > > >
> >> > > > > * I want to emphasize that even though delegation tokens are a
> >> Hadoop
> >> > > > > innovation, I feel very strongly about not adding dependency on
> >> Hadoop
> >> > > > > when implementing delegation tokens for Kafka. The KIP doesn't
> >> imply
> >> > > > > such dependency, but if you can clarify...
> >> > > > >
> >> > > > >
> >> > > > > *No hadoop dependency.*
> >> > > >
> >> > > > Yay! Just add this to the KIP so no one will read the KIP and
> panic
> >> > > > three weeks before the next release...
> >> > > >
> >> > > > > * Can we get delegation token at any time after authenticating?
> >> only
> >> > > > > immediately after?
> >> > > > >
> >> > > > >
> >> > > > > *As long as you are authenticated you can get delegation tokens.
> >> We
> >> > > need
> >> > > > to
> >> > > > > discuss if a client authenticated using delegation token, can
> also
> >> > > > acquire
> >> > > > > delegation token again or not. Also there is the question of do
> we
> >> > > allow
> >> > > > > anyone to acquire delegation token or we want specific ACLs (I
> >> think
> >> > > its
> >> > > > an
> >> > > > > overkill.)*
> >> > > >
> >> > > > I agree that ACLs is an overkill.
> >> > > >
> >> > > > I think we are debating two options: Either require Kerberos auth
> >> for
> >> > > > renewal or require non-owners to renew.
> >> > > 

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Onur Karaman
To get a better sense of the limit and what we should be optimizing for, it
helps to look at the message format:
private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
  new Field("client_id", STRING),
  new Field("client_host", STRING),
  new Field("session_timeout", INT32),
  new Field("subscription", BYTES),
  new Field("assignment", BYTES))
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new
Field("protocol_type", STRING),
  new Field("generation", INT32),
  new Field("protocol", STRING),
  new Field("leader", STRING),
  new Field("members", new ArrayOf(MEMBER_METADATA_V0)))

Subscription => Version Topics UserData
  Version=> Int16
  Topics => [String]
  UserData   => Bytes
Assignment => Version TopicPartitions
  Version => int16
  TopicPartitions => [Topic Partitions]
Topic => String
Partitions=> [int32]

The UserData isn't being used for range and roundrobin.

message size ~ sum(c_i*s_i + c_i*a_i for c_i in C)
C = the consumer group
c_i = the ith consumer in C
s_i = the subscription size for c_i
a_i = the assignment size for c_i

With today's implementation, trying out different assignment strategies
wouldn't make any difference since they all share the same subscription and
assignment serialization defined in ConsumerProtocol. As proof, I hit the
same limit with both range and roundrobin. My results shown in the original
post were using roundrobin.

There are basically two pieces we can optimize:
1. the subscriptions
2. the assignments

Subscriptions can be optimized in a couple ways:
1. allowing regex. this only helps consumers with pattern-based
subscriptions.
2. Option 7 that I had mentioned in the original post: broker-side
deduplication of subscriptions by mapping subscriptions to a list of
members.

Assignments also can be optimized with some tricks like the ones Jason
mentioned, but I think these end up being specific to the assignment
strategy, making it hard to keep a generic ConsumerProtocol.

I think it's worth thinking through our options some more.

On Mon, May 23, 2016 at 11:10 AM, Ismael Juma  wrote:

> Hi Jason,
>
> It would definitely be interesting to try a few of these optimisations on a
> real world example to quantify the impact.
>
> Ismael
>
> On Mon, May 23, 2016 at 6:59 PM, Jason Gustafson 
> wrote:
>
> > Hey Onur,
> >
> > Thanks for the investigation. I agree with Ismael that pushing regex or
> > some kind of patterns into the protocol would help for communicating
> > subscriptions and for avoiding unnecessary overhead when fetching topic
> > metadata, but it doesn't seem like it would address the main issue here
> > since the leader would still have to split the regex into the individual
> > topics in order to do the partition assignment. It feels like we may
> need a
> > different approach for representing assignments at this scale.
> >
> > One minor question I had is which assignment strategy you used in these
> > tests? Do you see any difference in overhead overall if you change
> between
> > "range" and "round-robin"? One thought that occurred to me is that you
> > could reduce the redundancy in the leader sync group by trying to limit
> the
> > number of consumers that received partitions from each topic. Ideally,
> each
> > topic would be given to exactly one member so that its name was repeated
> > only once in the leader's sync group. I guess the compression before
> > writing the group metadata would end up removing this redundancy anyway,
> > but still might be worth investigating.
> >
> > It actually seems to me that there's quite a bit of room for cleverness
> > here without changing the protocol. Here are a couple ideas:
> >
> > 1. Currently we list all of the assigned partitions explicitly in the
> > assignment:
> >
> > Assignment -> [Topic [Partition]]
> >
> > Alternatively, you could let the assignment contain just a minimum and
> > maximum partition:
> >
> > Assignment -> [Topic MinPartition MaxPartition]
> >
> > Obviously this would only fit range-based assignment approaches, but if
> you
> > have a lot of partitions per topic, this could be a big win.
> >
> > 2. Maybe there's a better way to lay out the assignment without needing
> to
> > explicitly repeat the topic? For example, the leader could sort the
> topics
> > for each member and just use an integer to represent the index of each
> > topic within the sorted list (note this depends on the subscription
> > including the full topic list).
> >
> > Assignment -> [TopicIndex [Partition]]
> >
> > You could even combine these two options so that you have only 3 integers
> > for each topic assignment:
> >
> > Assignment -> [TopicIndex MinPartition MaxPartition]
> >
> > There may even be better options with a little more thought. All of this
> is
> > just part of the client-side protocol, so it wouldn't require any version
> > bumps on the broker. What do you think?
> >
> > Thanks,
> > Jason
> >
> >
> >

[jira] [Assigned] (KAFKA-3685) Auto-generate ZooKeeper data structure wiki

2016-05-23 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-3685:
--

Assignee: Vahid Hashemian

> Auto-generate ZooKeeper data structure wiki
> ---
>
> Key: KAFKA-3685
> URL: https://issues.apache.org/jira/browse/KAFKA-3685
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
>
> The ZooKeeper data structure wiki page is located at 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper.
>  This should be auto-generated and versioned according to various releases. A 
> similar auto-generate has been previously done for protocol. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3158) ConsumerGroupCommand should tell whether group is actually dead

2016-05-23 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296928#comment-15296928
 ] 

Jason Gustafson commented on KAFKA-3158:


[~imandhan] The easiest way to reproduce the problem is to query a group which 
you know doesn't exist. If you do so, you'll get the message above that the 
group is either doesn't exist or is rebalancing, but we really should be able 
to tell the user that the group doesn't exist in this case. If you look in 
{{AdminClient.describeConsumerGroup()}}, we return an empty list both when the 
group is dead (i.e. has no members) and when it is not stable (i.e. 
rebalancing). So the caller has no way to distinguish the cases. Maybe a simple 
option to fix this is to include the state in the response of that method? 
Either that, or we could return an {{Option}} where {{None}} indicates that the 
group doesn't exist?

> ConsumerGroupCommand should tell whether group is actually dead
> ---
>
> Key: KAFKA-3158
> URL: https://issues.apache.org/jira/browse/KAFKA-3158
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, consumer
>Affects Versions: 0.9.0.0
>Reporter: Jason Gustafson
>Assignee: Ishita Mandhan
>Priority: Minor
>
> Currently the consumer group script reports the following when a group is 
> dead or rebalancing:
> {code}
> Consumer group `foo` does not exist or is rebalancing.
> {code}
> But it's annoying not to know which is actually the case. Since the group 
> state is exposed in the DescribeGroupRequest, we should be able to give 
> different messages for each case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-0.10.0-jdk7 #103

2016-05-23 Thread Apache Jenkins Server
See 

Changes:

[cshapi] Bump version to 0.10.0.0 for a release candidate

[cshapi] bumping version of branch from 0.10.0.0-SNAPSHOT to 0.10.0.1-SNAPSHOT,

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu-5 (docker Ubuntu ubuntu5 ubuntu yahoo-not-h2) in 
workspace 
 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/0.10.0^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/0.10.0^{commit} # timeout=10
Checking out Revision a1838755a206fc769ef1fe947eafe3ff245afeb6 
(refs/remotes/origin/0.10.0)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f a1838755a206fc769ef1fe947eafe3ff245afeb6
 > git rev-list 2e456cb6167e33414ac88235701022e7b7d705bb # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-0.10.0-jdk7] $ /bin/bash -xe /tmp/hudson7514721831873686107.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 26.3 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-0.10.0-jdk7] $ /bin/bash -xe /tmp/hudson704647589279926386.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.13/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
Build file ': 
line 231
useAnt has been deprecated and is scheduled to be removed in Gradle 3.0. The 
Ant-Based Scala compiler is deprecated, please see 
https://docs.gradle.org/current/userguide/scala_plugin.html.
:clean UP-TO-DATE
:clients:clean
:connect:clean UP-TO-DATE
:core:clean
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-0.10.0-jdk7:clients:compileJavaNote: Some input files use unchecked or 
unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:kafka-0.10.0-jdk7:clients:processResources UP-TO-DATE
:kafka-0.10.0-jdk7:clients:classes
:kafka-0.10.0-jdk7:clients:determineCommitId UP-TO-DATE
:kafka-0.10.0-jdk7:clients:createVersionFile
:kafka-0.10.0-jdk7:clients:jar
:kafka-0.10.0-jdk7:core:compileJava UP-TO-DATE
:kafka-0.10.0-jdk7:core:compileScala
:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 expireTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {

[jira] [Resolved] (KAFKA-3275) Replace 0.9.1.0 references with 0.10.0.0

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-3275.

Resolution: Fixed

Yes, thanks.

> Replace 0.9.1.0 references with 0.10.0.0
> 
>
> Key: KAFKA-3275
> URL: https://issues.apache.org/jira/browse/KAFKA-3275
> Project: Kafka
>  Issue Type: Task
>Reporter: Ismael Juma
>
> The next release will be 0.10.0.0 instead of 0.9.1.0 based on the mailing 
> list vote:
> http://search-hadoop.com/m/uyzND1kfh8g1RBuVm=Re+VOTE+Make+next+Kafka+release+0+10+0+0+instead+of+0+9+1+0
> We need to update a number of places to take this into account.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3275) Replace 0.9.1.0 references with 0.10.0.0

2016-05-23 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296898#comment-15296898
 ] 

Vahid Hashemian commented on KAFKA-3275:


[~ijuma] Can this JIRA be closed since its subtasks are complete? Thanks.

> Replace 0.9.1.0 references with 0.10.0.0
> 
>
> Key: KAFKA-3275
> URL: https://issues.apache.org/jira/browse/KAFKA-3275
> Project: Kafka
>  Issue Type: Task
>Reporter: Ismael Juma
>
> The next release will be 0.10.0.0 instead of 0.9.1.0 based on the mailing 
> list vote:
> http://search-hadoop.com/m/uyzND1kfh8g1RBuVm=Re+VOTE+Make+next+Kafka+release+0+10+0+0+instead+of+0+9+1+0
> We need to update a number of places to take this into account.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3264) Mark the old Scala consumer and related classes as deprecated

2016-05-23 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-3264:
--

Assignee: Vahid Hashemian

> Mark the old Scala consumer and related classes as deprecated
> -
>
> Key: KAFKA-3264
> URL: https://issues.apache.org/jira/browse/KAFKA-3264
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Vahid Hashemian
>
> Once the new consumer is out of beta, we should consider deprecating the old 
> Scala consumers to encourage use of the new consumer and facilitate the 
> removal of the old consumers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3158) ConsumerGroupCommand should tell whether group is actually dead

2016-05-23 Thread Ishita Mandhan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296877#comment-15296877
 ] 

Ishita Mandhan commented on KAFKA-3158:
---

[~jasong35] How can I reproduce the error? I'm having trouble killing a 
consumer group and running the describe statement at the same time. I've tried 
using watch and pipe with no luck.

> ConsumerGroupCommand should tell whether group is actually dead
> ---
>
> Key: KAFKA-3158
> URL: https://issues.apache.org/jira/browse/KAFKA-3158
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, consumer
>Affects Versions: 0.9.0.0
>Reporter: Jason Gustafson
>Assignee: Ishita Mandhan
>Priority: Minor
>
> Currently the consumer group script reports the following when a group is 
> dead or rebalancing:
> {code}
> Consumer group `foo` does not exist or is rebalancing.
> {code}
> But it's annoying not to know which is actually the case. Since the group 
> state is exposed in the DescribeGroupRequest, we should be able to give 
> different messages for each case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3120) Consumer doesn't get messages from some partitions after reassign

2016-05-23 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296845#comment-15296845
 ] 

Vahid Hashemian commented on KAFKA-3120:


I can't reproduce this using the current trunk.

> Consumer doesn't get messages from some partitions after reassign
> -
>
> Key: KAFKA-3120
> URL: https://issues.apache.org/jira/browse/KAFKA-3120
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux
>Reporter: Jakub Neubauer
>Priority: Critical
>
> I tested some scenario on one-node cluster with this setup:
> * A topic with 5 partitions, 1 replica.
> * One producer (new java client)
> * 2 consumers were started (let's say A and B). Using the new java client. 2 
> partitions to A, 3 partitions to B were assigned.
> Then I stopped one of the consumers (cleanly). The partitions were 
> re-assigned (The consumer got all 5 partitions in the 
> "ConsumerRebalanceListener.onPartitionsAssigned" listener.
> But as messages were produced, the living consumer received only messages of 
> some partitions (magically those that belonged to the now-dead consumer).
> The messages were not lost. After I restarted the second consumer, it got the 
> messages that it previously didn't get. But without restarting, the messages 
> were not consumed by it.
> It is quite serious issue, since there is no sign of something being wrong. 
> Everything seems to be working. So the administrator has no chance to get the 
> idea that (only some) messages are not consumed on the "healthy" system.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Ismael Juma
Hi Jason,

It would definitely be interesting to try a few of these optimisations on a
real world example to quantify the impact.

Ismael

On Mon, May 23, 2016 at 6:59 PM, Jason Gustafson  wrote:

> Hey Onur,
>
> Thanks for the investigation. I agree with Ismael that pushing regex or
> some kind of patterns into the protocol would help for communicating
> subscriptions and for avoiding unnecessary overhead when fetching topic
> metadata, but it doesn't seem like it would address the main issue here
> since the leader would still have to split the regex into the individual
> topics in order to do the partition assignment. It feels like we may need a
> different approach for representing assignments at this scale.
>
> One minor question I had is which assignment strategy you used in these
> tests? Do you see any difference in overhead overall if you change between
> "range" and "round-robin"? One thought that occurred to me is that you
> could reduce the redundancy in the leader sync group by trying to limit the
> number of consumers that received partitions from each topic. Ideally, each
> topic would be given to exactly one member so that its name was repeated
> only once in the leader's sync group. I guess the compression before
> writing the group metadata would end up removing this redundancy anyway,
> but still might be worth investigating.
>
> It actually seems to me that there's quite a bit of room for cleverness
> here without changing the protocol. Here are a couple ideas:
>
> 1. Currently we list all of the assigned partitions explicitly in the
> assignment:
>
> Assignment -> [Topic [Partition]]
>
> Alternatively, you could let the assignment contain just a minimum and
> maximum partition:
>
> Assignment -> [Topic MinPartition MaxPartition]
>
> Obviously this would only fit range-based assignment approaches, but if you
> have a lot of partitions per topic, this could be a big win.
>
> 2. Maybe there's a better way to lay out the assignment without needing to
> explicitly repeat the topic? For example, the leader could sort the topics
> for each member and just use an integer to represent the index of each
> topic within the sorted list (note this depends on the subscription
> including the full topic list).
>
> Assignment -> [TopicIndex [Partition]]
>
> You could even combine these two options so that you have only 3 integers
> for each topic assignment:
>
> Assignment -> [TopicIndex MinPartition MaxPartition]
>
> There may even be better options with a little more thought. All of this is
> just part of the client-side protocol, so it wouldn't require any version
> bumps on the broker. What do you think?
>
> Thanks,
> Jason
>
>
>
>
> On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang  wrote:
>
> > The original concern is that regex may not be efficiently supported
> > across-languages, but if there is a neat workaround I would love to
> learn.
> >
> > Guozhang
> >
> > On Mon, May 23, 2016 at 5:31 AM, Ismael Juma  wrote:
> >
> > > +1 to Jun's suggestion.
> > >
> > > Having said that, as a general point, I think we should consider
> > supporting
> > > topic patterns in the wire protocol. It requires some thinking for
> > > cross-language support, but it seems surmountable and it could make
> > certain
> > > operations a lot more efficient (the fact that a basic regex
> subscription
> > > causes the consumer to request metadata for all topics is not great).
> > >
> > > Ismael
> > >
> > > On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
> > > wrote:
> > >
> > > > I like Jun's suggestion in changing the handling logics of single
> large
> > > > message on the consumer side.
> > > >
> > > > As for the case of "a single group subscribing to 3000 topics", with
> > 100
> > > > consumers the 2.5Mb Gzip size is reasonable to me (when storing in
> ZK,
> > we
> > > > also have the znode limit which is set to 1Mb by default, though
> > > admittedly
> > > > it is only for one consumer). And if we do the change as Jun
> suggested,
> > > > 2.5Mb on follower's memory pressure is OK I think.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> > > > onurkaraman.apa...@gmail.com
> > > > > wrote:
> > > >
> > > > > Results without compression:
> > > > > 1 consumer 292383 bytes
> > > > > 5 consumers 1079579 bytes * the tipping point
> > > > > 10 consumers 1855018 bytes
> > > > > 20 consumers 2780220 bytes
> > > > > 30 consumers 3705422 bytes
> > > > > 40 consumers 4630624 bytes
> > > > > 50 consumers 826 bytes
> > > > > 60 consumers 6480788 bytes
> > > > > 70 consumers 7405750 bytes
> > > > > 80 consumers 8330712 bytes
> > > > > 90 consumers 9255674 bytes
> > > > > 100 consumers 10180636 bytes
> > > > >
> > > > > So it looks like gzip compression shrinks the message size by 4x.
> > > > >
> > > > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> > > > >
> > > > > > 

Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Jason Gustafson
Hey Onur,

Thanks for the investigation. I agree with Ismael that pushing regex or
some kind of patterns into the protocol would help for communicating
subscriptions and for avoiding unnecessary overhead when fetching topic
metadata, but it doesn't seem like it would address the main issue here
since the leader would still have to split the regex into the individual
topics in order to do the partition assignment. It feels like we may need a
different approach for representing assignments at this scale.

One minor question I had is which assignment strategy you used in these
tests? Do you see any difference in overhead overall if you change between
"range" and "round-robin"? One thought that occurred to me is that you
could reduce the redundancy in the leader sync group by trying to limit the
number of consumers that received partitions from each topic. Ideally, each
topic would be given to exactly one member so that its name was repeated
only once in the leader's sync group. I guess the compression before
writing the group metadata would end up removing this redundancy anyway,
but still might be worth investigating.

It actually seems to me that there's quite a bit of room for cleverness
here without changing the protocol. Here are a couple ideas:

1. Currently we list all of the assigned partitions explicitly in the
assignment:

Assignment -> [Topic [Partition]]

Alternatively, you could let the assignment contain just a minimum and
maximum partition:

Assignment -> [Topic MinPartition MaxPartition]

Obviously this would only fit range-based assignment approaches, but if you
have a lot of partitions per topic, this could be a big win.

2. Maybe there's a better way to lay out the assignment without needing to
explicitly repeat the topic? For example, the leader could sort the topics
for each member and just use an integer to represent the index of each
topic within the sorted list (note this depends on the subscription
including the full topic list).

Assignment -> [TopicIndex [Partition]]

You could even combine these two options so that you have only 3 integers
for each topic assignment:

Assignment -> [TopicIndex MinPartition MaxPartition]

There may even be better options with a little more thought. All of this is
just part of the client-side protocol, so it wouldn't require any version
bumps on the broker. What do you think?

Thanks,
Jason




On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang  wrote:

> The original concern is that regex may not be efficiently supported
> across-languages, but if there is a neat workaround I would love to learn.
>
> Guozhang
>
> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma  wrote:
>
> > +1 to Jun's suggestion.
> >
> > Having said that, as a general point, I think we should consider
> supporting
> > topic patterns in the wire protocol. It requires some thinking for
> > cross-language support, but it seems surmountable and it could make
> certain
> > operations a lot more efficient (the fact that a basic regex subscription
> > causes the consumer to request metadata for all topics is not great).
> >
> > Ismael
> >
> > On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
> > wrote:
> >
> > > I like Jun's suggestion in changing the handling logics of single large
> > > message on the consumer side.
> > >
> > > As for the case of "a single group subscribing to 3000 topics", with
> 100
> > > consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK,
> we
> > > also have the znode limit which is set to 1Mb by default, though
> > admittedly
> > > it is only for one consumer). And if we do the change as Jun suggested,
> > > 2.5Mb on follower's memory pressure is OK I think.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com
> > > > wrote:
> > >
> > > > Results without compression:
> > > > 1 consumer 292383 bytes
> > > > 5 consumers 1079579 bytes * the tipping point
> > > > 10 consumers 1855018 bytes
> > > > 20 consumers 2780220 bytes
> > > > 30 consumers 3705422 bytes
> > > > 40 consumers 4630624 bytes
> > > > 50 consumers 826 bytes
> > > > 60 consumers 6480788 bytes
> > > > 70 consumers 7405750 bytes
> > > > 80 consumers 8330712 bytes
> > > > 90 consumers 9255674 bytes
> > > > 100 consumers 10180636 bytes
> > > >
> > > > So it looks like gzip compression shrinks the message size by 4x.
> > > >
> > > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> > > >
> > > > > Onur,
> > > > >
> > > > > Thanks for the investigation.
> > > > >
> > > > > Another option is to just fix how we deal with the case when a
> > message
> > > is
> > > > > larger than the fetch size. Today, if the fetch size is smaller
> than
> > > the
> > > > > fetch size, the consumer will get stuck. Instead, we can simply
> > return
> > > > the
> > > > > full message if it's larger than the fetch size w/o requiring the
> > > > consumer
> > > > > to 

Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-05-23 Thread Jun Rao
Thanks for the KIP. A few comments.

100. This potentially can be useful for Kafka Connect and Kafka rest proxy
where a worker agent will need to run a task on behalf of a client. We will
likely need to change how those services use Kafka clients
(producer/consumer). Instead of a shared client per worker, we will need a
client per user task since the authentication happens at the connection
level. For Kafka Connect, the renewer will be the workers. So, we probably
need to allow multiple renewers. For Kafka rest proxy, the renewer can
probably just be the creator of the token.

101. Do we need new acl on who can request delegation tokens?

102. Do we recommend people to send delegation tokens in an encrypted
channel?

103. Who is responsible for expiring tokens, every broker?

104. For invalidating tokens, would it be better to do it in a request
instead of going to ZK directly?

105. The terminology of client in the wiki sometimes refers to the end
client and some other times refers to the client using the delegation
tokens. It would be useful to distinguish between the two.

106. Could you explain the sentence "Broker also stores the DelegationToken
without the hmac in the zookeeper." a bit more? I thought the delegation
token is the hmac.

Thanks,

Jun


On Mon, May 23, 2016 at 9:22 AM, Jun Rao  wrote:

> Hi, Harsha,
>
> Just sent out a KIP meeting invite. We can discuss this in the meeting
> tomorrow.
>
> Thanks,
>
> Jun
>
> On Thu, May 19, 2016 at 8:47 AM, Harsha  wrote:
>
>> Hi All,
>>Can we have a KIP meeting around this. The KIP is up for
>>sometime and if there are any questions lets quickly hash out
>>details.
>>
>> Thanks,
>> Harsha
>>
>> On Thu, May 19, 2016, at 08:40 AM, parth brahmbhatt wrote:
>> > That is what the hadoop echo system uses so no good reason really. We
>> > could
>> > change it to whatever is the newest recommended standard is.
>> >
>> > Thanks
>> > Parth
>> >
>> > On Thu, May 19, 2016 at 3:33 AM, Ismael Juma  wrote:
>> >
>> > > Hi Parth,
>> > >
>> > > Thanks for the KIP. I only started reviewing this and may have
>> additional
>> > > questions later. The immediate question that came to mind is our
>> choice of
>> > > "DIGEST-MD5" even though it's marked as OBSOLETE in the IANA Registry
>> of
>> > > SASL mechanisms and the original RFC (2831) has been moved to Historic
>> > > status:
>> > >
>> > > https://tools.ietf.org/html/rfc6331
>> > > http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml
>> > >
>> > > What is the reasoning behind that choice?
>> > >
>> > > Thanks,
>> > > Ismael
>> > >
>> > > On Fri, May 13, 2016 at 11:29 PM, Gwen Shapira 
>> wrote:
>> > >
>> > > > Also comments inline :)
>> > > >
>> > > > > * I want to emphasize that even though delegation tokens are a
>> Hadoop
>> > > > > innovation, I feel very strongly about not adding dependency on
>> Hadoop
>> > > > > when implementing delegation tokens for Kafka. The KIP doesn't
>> imply
>> > > > > such dependency, but if you can clarify...
>> > > > >
>> > > > >
>> > > > > *No hadoop dependency.*
>> > > >
>> > > > Yay! Just add this to the KIP so no one will read the KIP and panic
>> > > > three weeks before the next release...
>> > > >
>> > > > > * Can we get delegation token at any time after authenticating?
>> only
>> > > > > immediately after?
>> > > > >
>> > > > >
>> > > > > *As long as you are authenticated you can get delegation tokens.
>> We
>> > > need
>> > > > to
>> > > > > discuss if a client authenticated using delegation token, can also
>> > > > acquire
>> > > > > delegation token again or not. Also there is the question of do we
>> > > allow
>> > > > > anyone to acquire delegation token or we want specific ACLs (I
>> think
>> > > its
>> > > > an
>> > > > > overkill.)*
>> > > >
>> > > > I agree that ACLs is an overkill.
>> > > >
>> > > > I think we are debating two options: Either require Kerberos auth
>> for
>> > > > renewal or require non-owners to renew.
>> > > > I *think* the latter is simpler (it basically require a "job master"
>> > > > to take responsibility for the renewal, it will have its own
>> identity
>> > > > anyway and I think this is the correct design pattern anyway. For
>> > > > storm, I'd expect Nimbus to coordinate renewals?), but it is hard to
>> > > > debate simplicity without looking at the code changes required. If
>> you
>> > > > have a draft of how the "require Kerberos" will look in Kafka code,
>> > > > I'll be happy to take a look.
>> > > >
>> > > > > * My understanding is that tokens will propagate via ZK but
>> without
>> > > > > additional changes to UpdateMetadata protocol, correct? Clients
>> > > > > currently don't retry on SASL auth failure (IIRC), but since the
>> > > > > tokens propagate between brokers asynch, we will need to retry a
>> bit
>> > > > > to avoid clients failing auth due to timing issues.
>> > > > >
>> > > > > *I am considering 2 

Re: KAFKA-3722 : Discussion about custom PrincipalBuilder and Authorizer configs

2016-05-23 Thread Mayuresh Gharat
Hi Harsha and Ismael,

Option 2 sounds like a good idea if we want to make this quick fix I think.
Option 4 might require a KIP as its public interface change. I can resubmit
a patch for option 2 or create a KIP if necessary for option 4.

>From the previous conversation here, I think Ismael prefers option 2.
I don't have a strong opinion here since I understand its not easy to make
public API changes but IMO, would go with option 4.

Harsha what do you think on this?


Thanks,

Mayuresh

On Mon, May 23, 2016 at 5:45 AM, Ismael Juma  wrote:

> Hi Mayuresh and Harsha,
>
> If we were doing this from scratch, I would prefer option 4 too. However,
> users have their own custom principal builders now and option 2 with a
> suitably updated javadoc is the way to go in my opinion.
>
> Ismael
>
> On Sat, May 21, 2016 at 2:28 AM, Harsha  wrote:
>
> > Mayuresh,
> >  Thanks for the write up. With principal builder,
> >  the idea is to reuse a single principal builder
> >  across all the security protocols where its
> >  applicable and given that principal builder has
> >  access to transportLayer and authenticator it
> >  should be able to figure out what type of
> >  transportLayer it is and it should be able
> >  construct the principal based on that and it should
> >  handle all the security protocols that we support.
> > In your options 1,2 & 4 seems to be doing  the same
> > thing i.e checking what security protocol that a
> > given transportLayer is and building a principal ,
> > correct me if I am wrong here.   I like going with 4
> > as others stated on PR . As passing
> > security_protocol makes it more specific to the
> > method that its need to be handled . In the interest
> > of having less config I think option 4 seems to be
> > better even though it breaks the interface.
> >
> > Thanks,
> > Harsha
> > On Fri, May 20, 2016, at 05:00 PM, Mayuresh Gharat wrote:
> > > Hi All,
> > >
> > > I came across an issue with plugging in a custom PrincipalBuilder class
> > > using the config "principal.builder.class" along with a custom
> Authorizer
> > > class using the config "authorizer.class.name".
> > >
> > > Consider the following scenario :
> > >
> > > For PlainText we don't supply any PrincipalBuilder. For SSL we want to
> > > supply a PrincipalBuilder using the property "principal.builder.class".
> > >
> > > a) Now consider we have a broker running on these 2 ports and supply
> that
> > > custom principalBuilder class using that config.
> > >
> > > b) The interbroker communication is using PlainText. I am using a
> single
> > > broker cluster for testing.
> > >
> > > c) Now we issue a produce request on the SSL port of the broker.
> > >
> > > d) The controller tries to build a channel for plaintext with this
> broker
> > > for the new topic instructions.
> > >
> > > e) PlainText tries to use the principal builder specified in the
> > > "principal.builder.class" config which was meant only for SSL port
> since
> > > the code path is same
> "ChannelBuilders.createPrincipalBuilder(configs)".
> > >
> > > f) In the custom principal Builder if we are trying to do some cert
> > > checks
> > > or down conversion of transportLayer to SSLTransportLayer so that we
> can
> > > use its functionality we get error/exception at runtime.
> > >
> > > The basic idea is the PlainText channel should not be using the
> > > PrincipalBuilder meant for other types of channels.
> > >
> > > Now there are few options/workarounds to avoid this :
> > >
> > > 1) Do instanceOf check in Authorizer.authorize() on TransportLayer
> > > instance
> > > passed in and do the correct handling. This is not intuitive and
> imposes
> > > a
> > > strict coding rule on the programmer.
> > >
> > > 2) TransportLayer should expose an API for telling the security
> protocol
> > > type. This is not too intuitive either.
> > >
> > > 3) Add extra configs for Authorizer and PrincipalBuilder for each
> channel
> > > type. This gives us a flexibility for the PrincipalBuilder and
> Authorizer
> > > handle requests on different types of ports in a different way.
> > >
> > > 4) PrincipalBuilder.buildPrincipal() should take in extra parameter for
> > > the
> > > type of protocol and we should document this in javadoc to use it to
> > > handle
> > > the type of request. This is little better than 1) and 2) but again
> > > imposes
> > > a strict coding rule on the programmer.
> > >
> > > Just wanted to know what the community thinks about this and get any
> > > suggestions/feedback . There's some discussion about this here :
> > > https://github.com/apache/kafka/pull/1403
> > >
> > 

[jira] [Updated] (KAFKA-3747) Close `RecordBatch.records` when append to batch fails

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3747:
---
Status: Patch Available  (was: Open)

> Close `RecordBatch.records` when append to batch fails
> --
>
> Key: KAFKA-3747
> URL: https://issues.apache.org/jira/browse/KAFKA-3747
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.10.0.1
>
>
> We should close the existing `RecordBatch.records` when we create a new 
> `RecordBatch` for the `TopicPartition`.
> This would mean that we would only retain temporary resources like 
> compression stream buffers for one `RecordBatch` per partition, which can 
> have a significant impact when producers are dealing with slow brokers, see 
> KAFKA-3704 for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3747) Close `RecordBatch.records` when append to batch fails

2016-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296641#comment-15296641
 ] 

ASF GitHub Bot commented on KAFKA-3747:
---

GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/1418

KAFKA-3747; Close `RecordBatch.records` when append to batch fails

With this change, `test_producer_throughput` with message_size=1, 
compression_type=snappy and a snappy buffer size of 32k can be executed in a 
heap of 192m in a local environment (768m is needed without this change).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-3747-close-record-batch-when-append-fails

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1418.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1418


commit 40b9076efc4f284bc0af2a83cb7754adc26ee362
Author: Ismael Juma 
Date:   2016-05-23T16:26:15Z

Close `RecordBatch` if `tryAppend` fails

commit cb495aaea03778dda1a579a399ce0bf85c03ecfa
Author: Ismael Juma 
Date:   2016-05-23T16:27:56Z

Use diamond operator in `RecordAccumulator` and prefer `Deque.isEmpty` over 
`size`




> Close `RecordBatch.records` when append to batch fails
> --
>
> Key: KAFKA-3747
> URL: https://issues.apache.org/jira/browse/KAFKA-3747
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.10.0.1
>
>
> We should close the existing `RecordBatch.records` when we create a new 
> `RecordBatch` for the `TopicPartition`.
> This would mean that we would only retain temporary resources like 
> compression stream buffers for one `RecordBatch` per partition, which can 
> have a significant impact when producers are dealing with slow brokers, see 
> KAFKA-3704 for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-3747; Close `RecordBatch.records` when a...

2016-05-23 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/1418

KAFKA-3747; Close `RecordBatch.records` when append to batch fails

With this change, `test_producer_throughput` with message_size=1, 
compression_type=snappy and a snappy buffer size of 32k can be executed in a 
heap of 192m in a local environment (768m is needed without this change).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-3747-close-record-batch-when-append-fails

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1418.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1418


commit 40b9076efc4f284bc0af2a83cb7754adc26ee362
Author: Ismael Juma 
Date:   2016-05-23T16:26:15Z

Close `RecordBatch` if `tryAppend` fails

commit cb495aaea03778dda1a579a399ce0bf85c03ecfa
Author: Ismael Juma 
Date:   2016-05-23T16:27:56Z

Use diamond operator in `RecordAccumulator` and prefer `Deque.isEmpty` over 
`size`




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Jay Kreps
I think the concern was just that we didn't want to do java regex for
non-java clients, but I do think there are perl regex libraries (which is
kind of more the standard) for java. So that might be a solution.

-Jay

On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang  wrote:

> The original concern is that regex may not be efficiently supported
> across-languages, but if there is a neat workaround I would love to learn.
>
> Guozhang
>
> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma  wrote:
>
> > +1 to Jun's suggestion.
> >
> > Having said that, as a general point, I think we should consider
> supporting
> > topic patterns in the wire protocol. It requires some thinking for
> > cross-language support, but it seems surmountable and it could make
> certain
> > operations a lot more efficient (the fact that a basic regex subscription
> > causes the consumer to request metadata for all topics is not great).
> >
> > Ismael
> >
> > On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
> > wrote:
> >
> > > I like Jun's suggestion in changing the handling logics of single large
> > > message on the consumer side.
> > >
> > > As for the case of "a single group subscribing to 3000 topics", with
> 100
> > > consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK,
> we
> > > also have the znode limit which is set to 1Mb by default, though
> > admittedly
> > > it is only for one consumer). And if we do the change as Jun suggested,
> > > 2.5Mb on follower's memory pressure is OK I think.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com
> > > > wrote:
> > >
> > > > Results without compression:
> > > > 1 consumer 292383 bytes
> > > > 5 consumers 1079579 bytes * the tipping point
> > > > 10 consumers 1855018 bytes
> > > > 20 consumers 2780220 bytes
> > > > 30 consumers 3705422 bytes
> > > > 40 consumers 4630624 bytes
> > > > 50 consumers 826 bytes
> > > > 60 consumers 6480788 bytes
> > > > 70 consumers 7405750 bytes
> > > > 80 consumers 8330712 bytes
> > > > 90 consumers 9255674 bytes
> > > > 100 consumers 10180636 bytes
> > > >
> > > > So it looks like gzip compression shrinks the message size by 4x.
> > > >
> > > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> > > >
> > > > > Onur,
> > > > >
> > > > > Thanks for the investigation.
> > > > >
> > > > > Another option is to just fix how we deal with the case when a
> > message
> > > is
> > > > > larger than the fetch size. Today, if the fetch size is smaller
> than
> > > the
> > > > > fetch size, the consumer will get stuck. Instead, we can simply
> > return
> > > > the
> > > > > full message if it's larger than the fetch size w/o requiring the
> > > > consumer
> > > > > to manually adjust the fetch size. On the broker side, to serve a
> > fetch
> > > > > request, we already do an index lookup and then scan the log a bit
> to
> > > > find
> > > > > the message with the requested offset. We can just check the size
> of
> > > that
> > > > > message and return the full message if its size is larger than the
> > > fetch
> > > > > size. This way, fetch size is really for performance optimization,
> > i.e.
> > > > in
> > > > > the common case, we will not return more bytes than fetch size, but
> > if
> > > > > there is a large message, we will return more bytes than the
> > specified
> > > > > fetch size. In practice, large messages are rare. So, it shouldn't
> > > > increase
> > > > > the memory consumption on the client too much.
> > > > >
> > > > > Jun
> > > > >
> > > > > On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
> > > > > onurkaraman.apa...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey everyone. So I started doing some tests on the new
> > > > > consumer/coordinator
> > > > > > to see if it could handle more strenuous use cases like mirroring
> > > > > clusters
> > > > > > with thousands of topics and thought I'd share whatever I have so
> > > far.
> > > > > >
> > > > > > The scalability limit: the amount of group metadata we can fit
> into
> > > one
> > > > > > message
> > > > > >
> > > > > > Some background:
> > > > > > Client-side assignment is implemented in two phases
> > > > > > 1. a PreparingRebalance phase that identifies members of the
> group
> > > and
> > > > > > aggregates member subscriptions.
> > > > > > 2. an AwaitingSync phase that waits for the group leader to
> decide
> > > > member
> > > > > > assignments based on the member subscriptions across the group.
> > > > > >   - The leader announces this decision with a SyncGroupRequest.
> The
> > > > > > GroupCoordinator handles SyncGroupRequests by appending all group
> > > state
> > > > > > into a single message under the __consumer_offsets topic. This
> > > message
> > > > is
> > > > > > keyed on the group id and contains each member subscription as
> well
> > > as
> > > > > the
> > > > > > decided assignment for each member.
> 

[jira] [Assigned] (KAFKA-3747) Close `RecordBatch.records` when append to batch fails

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-3747:
--

Assignee: Ismael Juma

> Close `RecordBatch.records` when append to batch fails
> --
>
> Key: KAFKA-3747
> URL: https://issues.apache.org/jira/browse/KAFKA-3747
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.10.0.1
>
>
> We should close the existing `RecordBatch.records` when we create a new 
> `RecordBatch` for the `TopicPartition`.
> This would mean that we would only retain temporary resources like 
> compression stream buffers for one `RecordBatch` per partition, which can 
> have a significant impact when producers are dealing with slow brokers, see 
> KAFKA-3704 for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Becket Qin
+1 on Jun's idea.

Even without the new consumer, currently we may still face this issue of
record size too large in offset topic if user commits offsets with a big
blob of metadata.

Topic pattern would help reduce the group metadata size. However some use
cases may not be able to benefit from it. For example, one use case we have
for mirror maker is dynamically changing the topics to consume based on the
end user's requirement. In that case even if we use the regex it would
probably become a topic list.

Also, I think we should probably turn on compression for offset topic by
default. The metadata of JoinGroupRequests are likely similar so the
aggregated metadata should be highly compressible.

Thanks,

Jiangjie (Becket) Qin

On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang  wrote:

> The original concern is that regex may not be efficiently supported
> across-languages, but if there is a neat workaround I would love to learn.
>
> Guozhang
>
> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma  wrote:
>
> > +1 to Jun's suggestion.
> >
> > Having said that, as a general point, I think we should consider
> supporting
> > topic patterns in the wire protocol. It requires some thinking for
> > cross-language support, but it seems surmountable and it could make
> certain
> > operations a lot more efficient (the fact that a basic regex subscription
> > causes the consumer to request metadata for all topics is not great).
> >
> > Ismael
> >
> > On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
> > wrote:
> >
> > > I like Jun's suggestion in changing the handling logics of single large
> > > message on the consumer side.
> > >
> > > As for the case of "a single group subscribing to 3000 topics", with
> 100
> > > consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK,
> we
> > > also have the znode limit which is set to 1Mb by default, though
> > admittedly
> > > it is only for one consumer). And if we do the change as Jun suggested,
> > > 2.5Mb on follower's memory pressure is OK I think.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com
> > > > wrote:
> > >
> > > > Results without compression:
> > > > 1 consumer 292383 bytes
> > > > 5 consumers 1079579 bytes * the tipping point
> > > > 10 consumers 1855018 bytes
> > > > 20 consumers 2780220 bytes
> > > > 30 consumers 3705422 bytes
> > > > 40 consumers 4630624 bytes
> > > > 50 consumers 826 bytes
> > > > 60 consumers 6480788 bytes
> > > > 70 consumers 7405750 bytes
> > > > 80 consumers 8330712 bytes
> > > > 90 consumers 9255674 bytes
> > > > 100 consumers 10180636 bytes
> > > >
> > > > So it looks like gzip compression shrinks the message size by 4x.
> > > >
> > > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> > > >
> > > > > Onur,
> > > > >
> > > > > Thanks for the investigation.
> > > > >
> > > > > Another option is to just fix how we deal with the case when a
> > message
> > > is
> > > > > larger than the fetch size. Today, if the fetch size is smaller
> than
> > > the
> > > > > fetch size, the consumer will get stuck. Instead, we can simply
> > return
> > > > the
> > > > > full message if it's larger than the fetch size w/o requiring the
> > > > consumer
> > > > > to manually adjust the fetch size. On the broker side, to serve a
> > fetch
> > > > > request, we already do an index lookup and then scan the log a bit
> to
> > > > find
> > > > > the message with the requested offset. We can just check the size
> of
> > > that
> > > > > message and return the full message if its size is larger than the
> > > fetch
> > > > > size. This way, fetch size is really for performance optimization,
> > i.e.
> > > > in
> > > > > the common case, we will not return more bytes than fetch size, but
> > if
> > > > > there is a large message, we will return more bytes than the
> > specified
> > > > > fetch size. In practice, large messages are rare. So, it shouldn't
> > > > increase
> > > > > the memory consumption on the client too much.
> > > > >
> > > > > Jun
> > > > >
> > > > > On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
> > > > > onurkaraman.apa...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey everyone. So I started doing some tests on the new
> > > > > consumer/coordinator
> > > > > > to see if it could handle more strenuous use cases like mirroring
> > > > > clusters
> > > > > > with thousands of topics and thought I'd share whatever I have so
> > > far.
> > > > > >
> > > > > > The scalability limit: the amount of group metadata we can fit
> into
> > > one
> > > > > > message
> > > > > >
> > > > > > Some background:
> > > > > > Client-side assignment is implemented in two phases
> > > > > > 1. a PreparingRebalance phase that identifies members of the
> group
> > > and
> > > > > > aggregates member subscriptions.
> > > > > > 2. an AwaitingSync phase that waits for the group leader to
> 

[jira] [Commented] (KAFKA-3704) Improve mechanism for compression stream block size selection in KafkaProducer

2016-05-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296616#comment-15296616
 ] 

Ismael Juma commented on KAFKA-3704:


I filed KAFKA-3747 for 3).


> Improve mechanism for compression stream block size selection in KafkaProducer
> --
>
> Key: KAFKA-3704
> URL: https://issues.apache.org/jira/browse/KAFKA-3704
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Ismael Juma
> Fix For: 0.10.1.0
>
>
> As discovered in https://issues.apache.org/jira/browse/KAFKA-3565, the 
> current default block size (1K) used in Snappy and GZIP may cause a 
> sub-optimal compression ratio for Snappy, and hence reduce throughput. 
> Because we no longer recompress data in the broker, it also impacts what gets 
> stored on disk.
> A solution might be to use the default block size, which is 64K in LZ4, 32K 
> in Snappy and 0.5K in GZIP. The downside is that this solution will require 
> more memory allocated outside of the buffer pool and hence users may need to 
> bump up their JVM heap size, especially for MirrorMakers. Using Snappy as an 
> example, it's an additional 2x32k per batch (as Snappy uses two buffers) and 
> one would expect at least one batch per partition. However, the number of 
> batches per partition can be much higher if the broker is slow to acknowledge 
> producer requests (depending on `buffer.memory`, `batch.size`, message size, 
> etc.).
> Given the above, there are a few things that could be done (potentially more 
> than one):
> 1) A configuration for the producer compression stream buffer size.
> 2) Allocate buffers from the buffer pool and pass them to the compression 
> library. This is possible with Snappy and we could adapt our LZ4 code. It's 
> not possible with GZIP, but it uses a very small buffer by default.
> 3) Close the existing `RecordBatch.records` when we create a new 
> `RecordBatch` for the `TopicPartition` instead of doing it during 
> `RecordAccumulator.drain`. This would mean that we would only retain 
> resources for one `RecordBatch` per partition, which would improve the worst 
> case scenario significantly.
> Note that we decided that this change was too risky for 0.10.0.0 and reverted 
> the original attempt.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3704) Improve mechanism for compression stream block size selection in KafkaProducer

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3704:
---
Description: 
As discovered in https://issues.apache.org/jira/browse/KAFKA-3565, the current 
default block size (1K) used in Snappy and GZIP may cause a sub-optimal 
compression ratio for Snappy, and hence reduce throughput. Because we no longer 
recompress data in the broker, it also impacts what gets stored on disk.

A solution might be to use the default block size, which is 64K in LZ4, 32K in 
Snappy and 0.5K in GZIP. The downside is that this solution will require more 
memory allocated outside of the buffer pool and hence users may need to bump up 
their JVM heap size, especially for MirrorMakers. Using Snappy as an example, 
it's an additional 2x32k per batch (as Snappy uses two buffers) and one would 
expect at least one batch per partition. However, the number of batches per 
partition can be much higher if the broker is slow to acknowledge producer 
requests (depending on `buffer.memory`, `batch.size`, message size, etc.).
Given the above, there are a few things that could be done (potentially more 
than one):

1) A configuration for the producer compression stream buffer size.
2) Allocate buffers from the buffer pool and pass them to the compression 
library. This is possible with Snappy and we could adapt our LZ4 code. It's not 
possible with GZIP, but it uses a very small buffer by default.
3) Close the existing `RecordBatch.records` when we create a new `RecordBatch` 
for the `TopicPartition` instead of doing it during `RecordAccumulator.drain`. 
This would mean that we would only retain resources for one `RecordBatch` per 
partition, which would improve the worst case scenario significantly.

Note that we decided that this change was too risky for 0.10.0.0 and reverted 
the original attempt.

  was:
As discovered in https://issues.apache.org/jira/browse/KAFKA-3565, the current 
default block size (1K) used in Snappy and GZIP may cause a sub-optimal 
compression ratio for Snappy, and hence reduce throughput. Because we no longer 
recompress data in the broker, it also impacts what gets stored on disk.

A solution might be to use the default block size, which is 64K in LZ4, 32K in 
Snappy and 0.5K in GZIP. The downside is that this solution will require more 
memory allocated outside of the buffer pool and hence users may need to bump up 
their JVM heap size, especially for MirrorMakers. Using Snappy as an example, 
it's an additional 2x32k per batch (as Snappy uses two buffers) and one would 
expect at least one batch per partition. However, the number of batches per 
partition can be much higher if the broker is slow to acknowledge producer 
requests (depending on `buffer.memory`, `batch.size`, message size, etc.). This 

Given the above, there are a few things that could be done (potentially more 
than one):

1) A configuration for the producer compression stream buffer size.
2) Allocate buffers from the buffer pool and pass them to the compression 
library. This is possible with Snappy and we could adapt our LZ4 code. It's not 
possible with GZIP, but it uses a very small buffer by default.
3) Close the existing `RecordBatch.records` when we create a new `RecordBatch` 
for the `TopicPartition` instead of doing it during `RecordAccumulator.drain`. 
This would mean that we would only retain resources for one `RecordBatch` per 
partition, which would improve the worst case scenario significantly.

Note that we decided that this change was too risky for 0.10.0.0 and reverted 
the original attempt.


> Improve mechanism for compression stream block size selection in KafkaProducer
> --
>
> Key: KAFKA-3704
> URL: https://issues.apache.org/jira/browse/KAFKA-3704
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Ismael Juma
> Fix For: 0.10.1.0
>
>
> As discovered in https://issues.apache.org/jira/browse/KAFKA-3565, the 
> current default block size (1K) used in Snappy and GZIP may cause a 
> sub-optimal compression ratio for Snappy, and hence reduce throughput. 
> Because we no longer recompress data in the broker, it also impacts what gets 
> stored on disk.
> A solution might be to use the default block size, which is 64K in LZ4, 32K 
> in Snappy and 0.5K in GZIP. The downside is that this solution will require 
> more memory allocated outside of the buffer pool and hence users may need to 
> bump up their JVM heap size, especially for MirrorMakers. Using Snappy as an 
> example, it's an additional 2x32k per batch (as Snappy uses two buffers) and 
> one would expect at least one batch per partition. However, the number of 
> batches per partition can be much higher if the broker is slow to acknowledge 
> producer 

[jira] [Updated] (KAFKA-3704) Improve mechanism for compression stream block size selection in KafkaProducer

2016-05-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3704:
---
Description: 
As discovered in https://issues.apache.org/jira/browse/KAFKA-3565, the current 
default block size (1K) used in Snappy and GZIP may cause a sub-optimal 
compression ratio for Snappy, and hence reduce throughput. Because we no longer 
recompress data in the broker, it also impacts what gets stored on disk.

A solution might be to use the default block size, which is 64K in LZ4, 32K in 
Snappy and 0.5K in GZIP. The downside is that this solution will require more 
memory allocated outside of the buffer pool and hence users may need to bump up 
their JVM heap size, especially for MirrorMakers. Using Snappy as an example, 
it's an additional 2x32k per batch (as Snappy uses two buffers) and one would 
expect at least one batch per partition. However, the number of batches per 
partition can be much higher if the broker is slow to acknowledge producer 
requests (depending on `buffer.memory`, `batch.size`, message size, etc.). This 

Given the above, there are a few things that could be done (potentially more 
than one):

1) A configuration for the producer compression stream buffer size.
2) Allocate buffers from the buffer pool and pass them to the compression 
library. This is possible with Snappy and we could adapt our LZ4 code. It's not 
possible with GZIP, but it uses a very small buffer by default.
3) Close the existing `RecordBatch.records` when we create a new `RecordBatch` 
for the `TopicPartition` instead of doing it during `RecordAccumulator.drain`. 
This would mean that we would only retain resources for one `RecordBatch` per 
partition, which would improve the worst case scenario significantly.

Note that we decided that this change was too risky for 0.10.0.0 and reverted 
the original attempt.

  was:
As discovered in https://issues.apache.org/jira/browse/KAFKA-3565, the current 
default block size (1K) used in Snappy and GZIP may cause a sub-optimal 
compression ratio for Snappy, and hence reduce throughput. Because we no longer 
recompress data in the broker, it also impacts what gets stored on disk.

A solution might be to use the default block size, which is 64K in LZ4, 32K in 
Snappy and 0.5K in GZIP. The downside is that this solution will require more 
memory allocated outside of the buffer pool and hence users may need to bump up 
their JVM heap size, especially for MirrorMakers. Using Snappy as an example, 
it's an additional 2x32k per batch (as Snappy uses two buffers) and one would 
expect at least one batch per partition. However, the number of batches per 
partition can be much higher if the broker is slow to acknowledge producer 
requests (depending on `buffer.memory`, `batch.size`, message size, etc.).

Given the above, there are a few things that could be done (potentially more 
than one):

1) A configuration for the producer compression stream buffer size.
2) Allocate buffers from the buffer pool and pass them to the compression 
library. This is possible with Snappy and we could adapt our LZ4 code. It's not 
possible with GZIP, but it uses a very small buffer by default.
3) Close the existing `RecordBatch.records` when we create a new `RecordBatch` 
for the `TopicPartition` instead of doing it during `RecordAccumulator.drain`. 
This would mean that we would only retain resources for one `RecordBatch` per 
partition, which would improve the worst case scenario significantly.

Note that we decided that this change was too risky for 0.10.0.0 and reverted 
the original attempt.


> Improve mechanism for compression stream block size selection in KafkaProducer
> --
>
> Key: KAFKA-3704
> URL: https://issues.apache.org/jira/browse/KAFKA-3704
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Ismael Juma
> Fix For: 0.10.1.0
>
>
> As discovered in https://issues.apache.org/jira/browse/KAFKA-3565, the 
> current default block size (1K) used in Snappy and GZIP may cause a 
> sub-optimal compression ratio for Snappy, and hence reduce throughput. 
> Because we no longer recompress data in the broker, it also impacts what gets 
> stored on disk.
> A solution might be to use the default block size, which is 64K in LZ4, 32K 
> in Snappy and 0.5K in GZIP. The downside is that this solution will require 
> more memory allocated outside of the buffer pool and hence users may need to 
> bump up their JVM heap size, especially for MirrorMakers. Using Snappy as an 
> example, it's an additional 2x32k per batch (as Snappy uses two buffers) and 
> one would expect at least one batch per partition. However, the number of 
> batches per partition can be much higher if the broker is slow to acknowledge 
> producer 

[jira] [Created] (KAFKA-3747) Close `RecordBatch.records` when append to batch fails

2016-05-23 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-3747:
--

 Summary: Close `RecordBatch.records` when append to batch fails
 Key: KAFKA-3747
 URL: https://issues.apache.org/jira/browse/KAFKA-3747
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
 Fix For: 0.10.0.1


We should close the existing `RecordBatch.records` when we create a new 
`RecordBatch` for the `TopicPartition`.

This would mean that we would only retain temporary resources like compression 
stream buffers for one `RecordBatch` per partition, which can have a 
significant impact when producers are dealing with slow brokers, see KAFKA-3704 
for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-55: Secure quotas for authenticated users

2016-05-23 Thread Jun Rao
Rajini,

Thanks for the KIP. When we first added the quota support, the intention
was to be able to add a quota per application. Since at that time, we don't
have security yet. We essentially simulated users with client-ids. Now that
we do have security. It seems that we just need to have a way to set quota
at the user level. Setting quota at the combination of users and client-ids
seems more complicated and I am not sure if there is a good use case.

Also, the new config quota.secure.enable seems a bit weird. Would it be
better to add a new config quota.type. It defaults to clientId for backward
compatibility. If one sets it to user, then the default broker level quota
is for users w/o a customized quota. In this setting, brokers will also
only take quota set at the user level (i.e., quota set at clientId level
will be ignored).

Thanks,

Jun

On Tue, May 3, 2016 at 4:32 AM, Rajini Sivaram  wrote:

> Ewen,
>
> Thank you for the review. I agree that ideally we would have one definition
> of quotas that handles all cases. But I couldn't quite fit all the
> combinations that are possible today with client-id-based quotas into the
> new configuration. I think upgrade path is not bad since quotas are
> per-broker. You can configure quotas based on the new configuration, set
> quota.secure.enable=true and restart the broker. Since there is no
> requirement for both insecure client-id based quotas and secure user-based
> quotas to co-exist in a cluster, isn't that sufficient? The implementation
> does use a unified approach, so if an alternative configuration can be
> defined (perhaps with some acceptable limitations?) which can express both,
> it will be easy to implement. Suggestions welcome :-)
>
> The cases that the new configuration cannot express, but the old one can
> are:
>
>1. SSL/SASL with multiple users, same client ids used by multiple users,
>client-id based quotas where quotas are shared between multiple users
>2. Default quotas for client-ids. In the new configuration, default
>quotas are defined for users and clients with no configured sub-quota
> share
>the user's quota.
>
>
>
> On Sat, Apr 30, 2016 at 6:21 AM, Ewen Cheslack-Postava 
> wrote:
>
> > Rajini,
> >
> > I'm admittedly not very familiar with a lot of this code or
> implementation,
> > so correct me if I'm making any incorrect assumptions.
> >
> > I've only scanned the KIP, but my main concern is the rejection of the
> > alternative -- unifying client-id and principal quotas. In particular,
> > doesn't this make an upgrade for brokers using those different approaches
> > difficult since you have to make a hard break between client-id and
> > principal quotas? If people adopt client-id quotas to begin with, it
> seems
> > like we might not be providing a clean upgrade path.
> >
> > As I said, I haven't kept up to date with the details of the security and
> > quota features, but I'd want to make sure we didn't suggest one path with
> > 0.9, then add another that we can't provide a clean upgrade path to.
> >
> > -Ewen
> >
> > On Fri, Apr 22, 2016 at 7:22 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> > > The PR for KAFKA-3492 (https://github.com/apache/kafka/pull/1256)
> > contains
> > > the code associated with KIP-55. I will keep it updated during the
> review
> > > process.
> > >
> > > Thanks,
> > >
> > > Rajini
> > >
> > > On Mon, Apr 18, 2016 at 4:41 PM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com> wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have just created KIP-55 to support quotas based on authenticated
> > user
> > > > principals.
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-55%3A+Secure+Quotas+for+Authenticated+Users
> > > >
> > > > Comments and feedback are appreciated.
> > > >
> > > > Thank you...
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > >
> > > Rajini
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>
>
>
> --
> Regards,
>
> Rajini
>


[jira] [Updated] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3745:
-
Assignee: (was: Guozhang Wang)

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3745:
-
Labels: api  (was: )

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3745:
-
Labels: api newbie  (was: api)

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296597#comment-15296597
 ] 

Guozhang Wang commented on KAFKA-3745:
--

Thanks for reporting this [~gfodor].

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-05-23 Thread Jun Rao
Hi, Harsha,

Just sent out a KIP meeting invite. We can discuss this in the meeting
tomorrow.

Thanks,

Jun

On Thu, May 19, 2016 at 8:47 AM, Harsha  wrote:

> Hi All,
>Can we have a KIP meeting around this. The KIP is up for
>sometime and if there are any questions lets quickly hash out
>details.
>
> Thanks,
> Harsha
>
> On Thu, May 19, 2016, at 08:40 AM, parth brahmbhatt wrote:
> > That is what the hadoop echo system uses so no good reason really. We
> > could
> > change it to whatever is the newest recommended standard is.
> >
> > Thanks
> > Parth
> >
> > On Thu, May 19, 2016 at 3:33 AM, Ismael Juma  wrote:
> >
> > > Hi Parth,
> > >
> > > Thanks for the KIP. I only started reviewing this and may have
> additional
> > > questions later. The immediate question that came to mind is our
> choice of
> > > "DIGEST-MD5" even though it's marked as OBSOLETE in the IANA Registry
> of
> > > SASL mechanisms and the original RFC (2831) has been moved to Historic
> > > status:
> > >
> > > https://tools.ietf.org/html/rfc6331
> > > http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml
> > >
> > > What is the reasoning behind that choice?
> > >
> > > Thanks,
> > > Ismael
> > >
> > > On Fri, May 13, 2016 at 11:29 PM, Gwen Shapira 
> wrote:
> > >
> > > > Also comments inline :)
> > > >
> > > > > * I want to emphasize that even though delegation tokens are a
> Hadoop
> > > > > innovation, I feel very strongly about not adding dependency on
> Hadoop
> > > > > when implementing delegation tokens for Kafka. The KIP doesn't
> imply
> > > > > such dependency, but if you can clarify...
> > > > >
> > > > >
> > > > > *No hadoop dependency.*
> > > >
> > > > Yay! Just add this to the KIP so no one will read the KIP and panic
> > > > three weeks before the next release...
> > > >
> > > > > * Can we get delegation token at any time after authenticating?
> only
> > > > > immediately after?
> > > > >
> > > > >
> > > > > *As long as you are authenticated you can get delegation tokens. We
> > > need
> > > > to
> > > > > discuss if a client authenticated using delegation token, can also
> > > > acquire
> > > > > delegation token again or not. Also there is the question of do we
> > > allow
> > > > > anyone to acquire delegation token or we want specific ACLs (I
> think
> > > its
> > > > an
> > > > > overkill.)*
> > > >
> > > > I agree that ACLs is an overkill.
> > > >
> > > > I think we are debating two options: Either require Kerberos auth for
> > > > renewal or require non-owners to renew.
> > > > I *think* the latter is simpler (it basically require a "job master"
> > > > to take responsibility for the renewal, it will have its own identity
> > > > anyway and I think this is the correct design pattern anyway. For
> > > > storm, I'd expect Nimbus to coordinate renewals?), but it is hard to
> > > > debate simplicity without looking at the code changes required. If
> you
> > > > have a draft of how the "require Kerberos" will look in Kafka code,
> > > > I'll be happy to take a look.
> > > >
> > > > > * My understanding is that tokens will propagate via ZK but without
> > > > > additional changes to UpdateMetadata protocol, correct? Clients
> > > > > currently don't retry on SASL auth failure (IIRC), but since the
> > > > > tokens propagate between brokers asynch, we will need to retry a
> bit
> > > > > to avoid clients failing auth due to timing issues.
> > > > >
> > > > > *I am considering 2 alternatives right now. The current documented
> > > > approach
> > > > > is zookeeper based and it does not require any changes to
> > > UpdateMetadata
> > > > > protocol. An alternative approach can remove zookeeper dependency
> as
> > > well
> > > > > but we can discuss that in KIP discussion call.*
> > > >
> > > > Oooh! Sounds interesting. Do you want to ping Jun to arrange a call?
> > > >
> > > > > * I liked Ashish's suggestion of having just the controller issue
> the
> > > > > delegation tokens, to avoid syncing a shared secret. Not sure if we
> > > > > want to continue the discussion here or on the wiki. I think that
> we
> > > > > can decouple the problem of "token distribution" from "shared
> secret
> > > > > distribution" and use the controller as the only token generator to
> > > > > solve the second issue, while still using ZK async to distribute
> > > > > tokens.
> > > > >
> > > > >
> > > > > *As mentioned in the previous Email I am fine with that approach as
> > > long
> > > > as
> > > > > we agree that the extra complexity of adding/updating APIS adds
> enough
> > > > > value. The advantage with the controller approach is secret
> rotation
> > > can
> > > > be
> > > > > automated,frequent and would not require deployment. *
> > > >
> > > > Can you detail the extra complexity (or point me to the email I
> > > > missed?) - which Apis are required?
> > > > As far as I can tell, clients can already find the controller from
> > > > 

Kafka KIP meeting May 24 at 11:00am PST

2016-05-23 Thread Jun Rao
Hi, Everyone,

Now that Kafka 0.10.0.0 is released, we will have a Kafka KIP meeting tomorrow
at 11:00am PST. If you plan to attend but haven't received an invite,
please let me know. The following is the agenda.

Agenda:

KIP-48 - Delegation tokens
KIP-58 - Make Log Compaction Point Configurable
KIP-4   - Status check

Thanks,

Jun


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Guozhang Wang
The original concern is that regex may not be efficiently supported
across-languages, but if there is a neat workaround I would love to learn.

Guozhang

On Mon, May 23, 2016 at 5:31 AM, Ismael Juma  wrote:

> +1 to Jun's suggestion.
>
> Having said that, as a general point, I think we should consider supporting
> topic patterns in the wire protocol. It requires some thinking for
> cross-language support, but it seems surmountable and it could make certain
> operations a lot more efficient (the fact that a basic regex subscription
> causes the consumer to request metadata for all topics is not great).
>
> Ismael
>
> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang 
> wrote:
>
> > I like Jun's suggestion in changing the handling logics of single large
> > message on the consumer side.
> >
> > As for the case of "a single group subscribing to 3000 topics", with 100
> > consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK, we
> > also have the znode limit which is set to 1Mb by default, though
> admittedly
> > it is only for one consumer). And if we do the change as Jun suggested,
> > 2.5Mb on follower's memory pressure is OK I think.
> >
> >
> > Guozhang
> >
> >
> > On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> > onurkaraman.apa...@gmail.com
> > > wrote:
> >
> > > Results without compression:
> > > 1 consumer 292383 bytes
> > > 5 consumers 1079579 bytes * the tipping point
> > > 10 consumers 1855018 bytes
> > > 20 consumers 2780220 bytes
> > > 30 consumers 3705422 bytes
> > > 40 consumers 4630624 bytes
> > > 50 consumers 826 bytes
> > > 60 consumers 6480788 bytes
> > > 70 consumers 7405750 bytes
> > > 80 consumers 8330712 bytes
> > > 90 consumers 9255674 bytes
> > > 100 consumers 10180636 bytes
> > >
> > > So it looks like gzip compression shrinks the message size by 4x.
> > >
> > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> > >
> > > > Onur,
> > > >
> > > > Thanks for the investigation.
> > > >
> > > > Another option is to just fix how we deal with the case when a
> message
> > is
> > > > larger than the fetch size. Today, if the fetch size is smaller than
> > the
> > > > fetch size, the consumer will get stuck. Instead, we can simply
> return
> > > the
> > > > full message if it's larger than the fetch size w/o requiring the
> > > consumer
> > > > to manually adjust the fetch size. On the broker side, to serve a
> fetch
> > > > request, we already do an index lookup and then scan the log a bit to
> > > find
> > > > the message with the requested offset. We can just check the size of
> > that
> > > > message and return the full message if its size is larger than the
> > fetch
> > > > size. This way, fetch size is really for performance optimization,
> i.e.
> > > in
> > > > the common case, we will not return more bytes than fetch size, but
> if
> > > > there is a large message, we will return more bytes than the
> specified
> > > > fetch size. In practice, large messages are rare. So, it shouldn't
> > > increase
> > > > the memory consumption on the client too much.
> > > >
> > > > Jun
> > > >
> > > > On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
> > > > onurkaraman.apa...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hey everyone. So I started doing some tests on the new
> > > > consumer/coordinator
> > > > > to see if it could handle more strenuous use cases like mirroring
> > > > clusters
> > > > > with thousands of topics and thought I'd share whatever I have so
> > far.
> > > > >
> > > > > The scalability limit: the amount of group metadata we can fit into
> > one
> > > > > message
> > > > >
> > > > > Some background:
> > > > > Client-side assignment is implemented in two phases
> > > > > 1. a PreparingRebalance phase that identifies members of the group
> > and
> > > > > aggregates member subscriptions.
> > > > > 2. an AwaitingSync phase that waits for the group leader to decide
> > > member
> > > > > assignments based on the member subscriptions across the group.
> > > > >   - The leader announces this decision with a SyncGroupRequest. The
> > > > > GroupCoordinator handles SyncGroupRequests by appending all group
> > state
> > > > > into a single message under the __consumer_offsets topic. This
> > message
> > > is
> > > > > keyed on the group id and contains each member subscription as well
> > as
> > > > the
> > > > > decided assignment for each member.
> > > > >
> > > > > The environment:
> > > > > - one broker
> > > > > - one __consumer_offsets partition
> > > > > - offsets.topic.compression.codec=1 // this is gzip
> > > > > - broker has my pending KAFKA-3718 patch that actually makes use of
> > > > > offsets.topic.compression.codec:
> > > > https://github.com/apache/kafka/pull/1394
> > > > > - around 3000 topics. This is an actual subset of topics from one
> of
> > > our
> > > > > clusters.
> > > > > - topics have 8 partitions
> > > > > - topics are 25 characters long on average
> > > > > - one group 

[jira] [Updated] (KAFKA-3664) When subscription set changes on new consumer, the partitions may be removed without offset being committed.

2016-05-23 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3664:
---
Status: Patch Available  (was: Open)

> When subscription set changes on new consumer, the partitions may be removed 
> without offset being committed.
> 
>
> Key: KAFKA-3664
> URL: https://issues.apache.org/jira/browse/KAFKA-3664
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Vahid Hashemian
>
> When users are using group management, if they call consumer.subscribe() to 
> change the subscription, the removed subscriptions will be immediately 
> removed and their offset will not be commit. Also the revoked partitions 
> passed to the ConsumerRebalanceListener.onPartitionsRevoked() will not 
> include those partitions. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296538#comment-15296538
 ] 

Edoardo Comar commented on KAFKA-3727:
--

My expectation is that with either subscribe() and assign() if the topic does 
not exist the behaviour on poll() should be the same;
e.g. return an empty recordset (as subscribe currently does) 

this defect may be similar to https://issues.apache.org/jira/browse/KAFKA-3503


> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3746) InvalidReceiveException when connecting to broker over SSL

2016-05-23 Thread Sergey Alaev (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Alaev resolved KAFKA-3746.
-
Resolution: Not A Problem

Configuration issue - attempting to establish SSL connection to plaintext 
broker port.

> InvalidReceiveException when connecting to broker over SSL
> --
>
> Key: KAFKA-3746
> URL: https://issues.apache.org/jira/browse/KAFKA-3746
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: 3-node cluster on localhost
>Reporter: Sergey Alaev
>
> When trying to do KafkaConsumer.poll(), server closes connection with 
> InvalidReceiveException. Strangely, it is repoduced only with SSL enabled 
> between consumer and broker. We do not use SSL for inter-broker communication.
> Consumer configuration:
> {code}
> [2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] 
> [INFO]: ConsumerConfig values: 
>   metric.reporters = []
>   metadata.max.age.ms = 30
>   value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer
>   group.id = sds
>   partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
>   reconnect.backoff.ms = 50
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   max.partition.fetch.bytes = 1048576
>   bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
>   retry.backoff.ms = 100
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   ssl.keystore.type = JKS
>   ssl.trustmanager.algorithm = PKIX
>   enable.auto.commit = false
>   ssl.key.password = [hidden]
>   fetch.max.wait.ms = 500
>   sasl.kerberos.min.time.before.relogin = 6
>   connections.max.idle.ms = 54
>   ssl.truststore.password = [hidden]
>   session.timeout.ms = 3
>   metrics.num.samples = 2
>   client.id = 
>   ssl.endpoint.identification.algorithm = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   ssl.protocol = TLS
>   check.crcs = true
>   request.timeout.ms = 4
>   ssl.provider = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks
>   heartbeat.interval.ms = 3000
>   auto.commit.interval.ms = 1000
>   receive.buffer.bytes = 32768
>   ssl.cipher.suites = null
>   ssl.truststore.type = JKS
>   security.protocol = SSL
>   ssl.truststore.location = 
> src/main/resources/ssl/kafka.client.truststore.jks
>   ssl.keystore.password = [hidden]
>   ssl.keymanager.algorithm = SunX509
>   metrics.sample.window.ms = 3
>   fetch.min.bytes = 1
>   send.buffer.bytes = 131072
>   auto.offset.reset = earliest
> {code}
> Server configuration:
> {code}
> [2016-05-23 15:04:51,707] INFO KafkaConfig values:
> advertised.host.name = null
> metric.reporters = []
> quota.producer.default = 9223372036854775807
> offsets.topic.num.partitions = 50
> log.flush.interval.messages = 9223372036854775807
> auto.create.topics.enable = true
> controller.socket.timeout.ms = 3
> log.flush.interval.ms = null
> principal.builder.class = class 
> org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
> replica.socket.receive.buffer.bytes = 65536
> min.insync.replicas = 2
> replica.fetch.wait.max.ms = 500
> num.recovery.threads.per.data.dir = 1
> ssl.keystore.type = JKS
> default.replication.factor = 3
> ssl.truststore.password = [hidden]
> log.preallocate = false
> sasl.kerberos.principal.to.local.rules = [DEFAULT]
> fetch.purgatory.purge.interval.requests = 1000
> ssl.endpoint.identification.algorithm = null
> replica.socket.timeout.ms = 3
> message.max.bytes = 112
> num.io.threads = 10
> offsets.commit.required.acks = -1
> log.flush.offset.checkpoint.interval.ms = 6
> delete.topic.enable = true
> quota.window.size.seconds = 1
> ssl.truststore.type = JKS
> offsets.commit.timeout.ms = 5000
> quota.window.num = 11
> zookeeper.connect = 127.0.0.1:2181
> authorizer.class.name =
> num.replica.fetchers = 1
> log.retention.ms = null
> log.roll.jitter.hours = 0
> log.cleaner.enable = true
> offsets.load.buffer.size = 5242880
> log.cleaner.delete.retention.ms = 8640
> ssl.client.auth = none
> controlled.shutdown.max.retries = 3
> queued.max.requests = 500
> 

[jira] [Commented] (KAFKA-3746) InvalidReceiveException when connecting to broker over SSL

2016-05-23 Thread Sergey Alaev (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296517#comment-15296517
 ] 

Sergey Alaev commented on KAFKA-3746:
-

Ah. Yes. Sorry for being stupid. Closing it now.

> InvalidReceiveException when connecting to broker over SSL
> --
>
> Key: KAFKA-3746
> URL: https://issues.apache.org/jira/browse/KAFKA-3746
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: 3-node cluster on localhost
>Reporter: Sergey Alaev
>
> When trying to do KafkaConsumer.poll(), server closes connection with 
> InvalidReceiveException. Strangely, it is repoduced only with SSL enabled 
> between consumer and broker. We do not use SSL for inter-broker communication.
> Consumer configuration:
> {code}
> [2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] 
> [INFO]: ConsumerConfig values: 
>   metric.reporters = []
>   metadata.max.age.ms = 30
>   value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer
>   group.id = sds
>   partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
>   reconnect.backoff.ms = 50
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   max.partition.fetch.bytes = 1048576
>   bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
>   retry.backoff.ms = 100
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   ssl.keystore.type = JKS
>   ssl.trustmanager.algorithm = PKIX
>   enable.auto.commit = false
>   ssl.key.password = [hidden]
>   fetch.max.wait.ms = 500
>   sasl.kerberos.min.time.before.relogin = 6
>   connections.max.idle.ms = 54
>   ssl.truststore.password = [hidden]
>   session.timeout.ms = 3
>   metrics.num.samples = 2
>   client.id = 
>   ssl.endpoint.identification.algorithm = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   ssl.protocol = TLS
>   check.crcs = true
>   request.timeout.ms = 4
>   ssl.provider = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks
>   heartbeat.interval.ms = 3000
>   auto.commit.interval.ms = 1000
>   receive.buffer.bytes = 32768
>   ssl.cipher.suites = null
>   ssl.truststore.type = JKS
>   security.protocol = SSL
>   ssl.truststore.location = 
> src/main/resources/ssl/kafka.client.truststore.jks
>   ssl.keystore.password = [hidden]
>   ssl.keymanager.algorithm = SunX509
>   metrics.sample.window.ms = 3
>   fetch.min.bytes = 1
>   send.buffer.bytes = 131072
>   auto.offset.reset = earliest
> {code}
> Server configuration:
> {code}
> [2016-05-23 15:04:51,707] INFO KafkaConfig values:
> advertised.host.name = null
> metric.reporters = []
> quota.producer.default = 9223372036854775807
> offsets.topic.num.partitions = 50
> log.flush.interval.messages = 9223372036854775807
> auto.create.topics.enable = true
> controller.socket.timeout.ms = 3
> log.flush.interval.ms = null
> principal.builder.class = class 
> org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
> replica.socket.receive.buffer.bytes = 65536
> min.insync.replicas = 2
> replica.fetch.wait.max.ms = 500
> num.recovery.threads.per.data.dir = 1
> ssl.keystore.type = JKS
> default.replication.factor = 3
> ssl.truststore.password = [hidden]
> log.preallocate = false
> sasl.kerberos.principal.to.local.rules = [DEFAULT]
> fetch.purgatory.purge.interval.requests = 1000
> ssl.endpoint.identification.algorithm = null
> replica.socket.timeout.ms = 3
> message.max.bytes = 112
> num.io.threads = 10
> offsets.commit.required.acks = -1
> log.flush.offset.checkpoint.interval.ms = 6
> delete.topic.enable = true
> quota.window.size.seconds = 1
> ssl.truststore.type = JKS
> offsets.commit.timeout.ms = 5000
> quota.window.num = 11
> zookeeper.connect = 127.0.0.1:2181
> authorizer.class.name =
> num.replica.fetchers = 1
> log.retention.ms = null
> log.roll.jitter.hours = 0
> log.cleaner.enable = true
> offsets.load.buffer.size = 5242880
> log.cleaner.delete.retention.ms = 8640
> ssl.client.auth = none
> controlled.shutdown.max.retries = 3
> queued.max.requests = 500
> offsets.topic.replication.factor = 3
> 

[jira] [Commented] (KAFKA-3746) InvalidReceiveException when connecting to broker over SSL

2016-05-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296514#comment-15296514
 ] 

Ismael Juma commented on KAFKA-3746:


In your server configuration you have:

listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093

In your consumer configuration, you have:

bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
security.protocol = SSL

So, your consumer is connecting to port 9092 with security protocol SSL even 
though the broker expects that port to be PLAINTEXT. That is likely to be the 
reason for the error.


> InvalidReceiveException when connecting to broker over SSL
> --
>
> Key: KAFKA-3746
> URL: https://issues.apache.org/jira/browse/KAFKA-3746
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: 3-node cluster on localhost
>Reporter: Sergey Alaev
>
> When trying to do KafkaConsumer.poll(), server closes connection with 
> InvalidReceiveException. Strangely, it is repoduced only with SSL enabled 
> between consumer and broker. We do not use SSL for inter-broker communication.
> Consumer configuration:
> {code}
> [2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] 
> [INFO]: ConsumerConfig values: 
>   metric.reporters = []
>   metadata.max.age.ms = 30
>   value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer
>   group.id = sds
>   partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
>   reconnect.backoff.ms = 50
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   max.partition.fetch.bytes = 1048576
>   bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
>   retry.backoff.ms = 100
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   ssl.keystore.type = JKS
>   ssl.trustmanager.algorithm = PKIX
>   enable.auto.commit = false
>   ssl.key.password = [hidden]
>   fetch.max.wait.ms = 500
>   sasl.kerberos.min.time.before.relogin = 6
>   connections.max.idle.ms = 54
>   ssl.truststore.password = [hidden]
>   session.timeout.ms = 3
>   metrics.num.samples = 2
>   client.id = 
>   ssl.endpoint.identification.algorithm = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   ssl.protocol = TLS
>   check.crcs = true
>   request.timeout.ms = 4
>   ssl.provider = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks
>   heartbeat.interval.ms = 3000
>   auto.commit.interval.ms = 1000
>   receive.buffer.bytes = 32768
>   ssl.cipher.suites = null
>   ssl.truststore.type = JKS
>   security.protocol = SSL
>   ssl.truststore.location = 
> src/main/resources/ssl/kafka.client.truststore.jks
>   ssl.keystore.password = [hidden]
>   ssl.keymanager.algorithm = SunX509
>   metrics.sample.window.ms = 3
>   fetch.min.bytes = 1
>   send.buffer.bytes = 131072
>   auto.offset.reset = earliest
> {code}
> Server configuration:
> {code}
> [2016-05-23 15:04:51,707] INFO KafkaConfig values:
> advertised.host.name = null
> metric.reporters = []
> quota.producer.default = 9223372036854775807
> offsets.topic.num.partitions = 50
> log.flush.interval.messages = 9223372036854775807
> auto.create.topics.enable = true
> controller.socket.timeout.ms = 3
> log.flush.interval.ms = null
> principal.builder.class = class 
> org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
> replica.socket.receive.buffer.bytes = 65536
> min.insync.replicas = 2
> replica.fetch.wait.max.ms = 500
> num.recovery.threads.per.data.dir = 1
> ssl.keystore.type = JKS
> default.replication.factor = 3
> ssl.truststore.password = [hidden]
> log.preallocate = false
> sasl.kerberos.principal.to.local.rules = [DEFAULT]
> fetch.purgatory.purge.interval.requests = 1000
> ssl.endpoint.identification.algorithm = null
> replica.socket.timeout.ms = 3
> message.max.bytes = 112
> num.io.threads = 10
> offsets.commit.required.acks = -1
> log.flush.offset.checkpoint.interval.ms = 6
> delete.topic.enable = true
> quota.window.size.seconds = 1
> ssl.truststore.type = JKS
> offsets.commit.timeout.ms = 5000
> quota.window.num = 11
> zookeeper.connect = 127.0.0.1:2181
> authorizer.class.name =
> 

[jira] [Created] (KAFKA-3746) InvalidReceiveException when connecting to broker over SSL

2016-05-23 Thread Sergey Alaev (JIRA)
Sergey Alaev created KAFKA-3746:
---

 Summary: InvalidReceiveException when connecting to broker over SSL
 Key: KAFKA-3746
 URL: https://issues.apache.org/jira/browse/KAFKA-3746
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.1
 Environment: 3-node cluster on localhost
Reporter: Sergey Alaev


When trying to do KafkaConsumer.poll(), server closes connection with 
InvalidReceiveException. Strangely, it is repoduced only with SSL enabled 
between consumer and broker. We do not use SSL for inter-broker communication.

Consumer configuration:
{code}
[2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] [INFO]: 
ConsumerConfig values: 
metric.reporters = []
metadata.max.age.ms = 30
value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer
group.id = sds
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = [hidden]
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
ssl.truststore.password = [hidden]
session.timeout.ms = 3
metrics.num.samples = 2
client.id = 
ssl.endpoint.identification.algorithm = null
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 4
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = SSL
ssl.truststore.location = 
src/main/resources/ssl/kafka.client.truststore.jks
ssl.keystore.password = [hidden]
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest
{code}

Server configuration:
{code}
[2016-05-23 15:04:51,707] INFO KafkaConfig values:
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
log.flush.interval.messages = 9223372036854775807
auto.create.topics.enable = true
controller.socket.timeout.ms = 3
log.flush.interval.ms = null
principal.builder.class = class 
org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
replica.socket.receive.buffer.bytes = 65536
min.insync.replicas = 2
replica.fetch.wait.max.ms = 500
num.recovery.threads.per.data.dir = 1
ssl.keystore.type = JKS
default.replication.factor = 3
ssl.truststore.password = [hidden]
log.preallocate = false
sasl.kerberos.principal.to.local.rules = [DEFAULT]
fetch.purgatory.purge.interval.requests = 1000
ssl.endpoint.identification.algorithm = null
replica.socket.timeout.ms = 3
message.max.bytes = 112
num.io.threads = 10
offsets.commit.required.acks = -1
log.flush.offset.checkpoint.interval.ms = 6
delete.topic.enable = true
quota.window.size.seconds = 1
ssl.truststore.type = JKS
offsets.commit.timeout.ms = 5000
quota.window.num = 11
zookeeper.connect = 127.0.0.1:2181
authorizer.class.name =
num.replica.fetchers = 1
log.retention.ms = null
log.roll.jitter.hours = 0
log.cleaner.enable = true
offsets.load.buffer.size = 5242880
log.cleaner.delete.retention.ms = 8640
ssl.client.auth = none
controlled.shutdown.max.retries = 3
queued.max.requests = 500
offsets.topic.replication.factor = 3
log.cleaner.threads = 1
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
socket.request.max.bytes = 104857600
ssl.trustmanager.algorithm = PKIX
zookeeper.session.timeout.ms = 6000
log.retention.bytes = -1
sasl.kerberos.min.time.before.relogin = 6
zookeeper.set.acl = false

[jira] [Commented] (KAFKA-3726) Enable cold storage option

2016-05-23 Thread Radoslaw Gruchalski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296495#comment-15296495
 ] 

Radoslaw Gruchalski commented on KAFKA-3726:


bq. So my interpretation of your use case, from your post, is: If you're 
ingesting data into Kafka, with the aim of getting into file based storage for 
offline processing, it would be simpler to just copy the Kafka data files 
directly, rather than consume them and recreate new files in cold storage.

Indeed. The goal is to bring a standard mechanism for doing so. I'd be happy to 
contribute such thing but it would be great to work the direction.

> Enable cold storage option
> --
>
> Key: KAFKA-3726
> URL: https://issues.apache.org/jira/browse/KAFKA-3726
> Project: Kafka
>  Issue Type: Wish
>Reporter: Radoslaw Gruchalski
> Attachments: kafka-cold-storage.txt
>
>
> This JIRA builds up on the cold storage article I have published on Medium. 
> The copy of the article attached here.
> The need for cold storage or an "indefinite" log seems to be quite often 
> discussed on the user mailing list.
> The cold storage idea would enable the opportunity for the operator to keep 
> the raw Kafka offset files in a third party storage and allow retrieving the 
> data back for re-consumption.
> The two possible options for enabling such functionality are, from the 
> article:
> First approach: if Kafka provided a notification mechanism and could trigger 
> a program when a segment file is to be discarded, it would become feasible to 
> provide a standard method of moving data to cold storage in reaction to those 
> events. Once the program finishes backing the segments up, it could tell 
> Kafka “it is now safe to delete these segments”.
> The second option is to provide an additional value for the 
> log.cleanup.policy setting, call it cold-storage. In case of this value, 
> Kafka would move the segment files — which otherwise would be deleted — to 
> another destination on the server. They can be picked up from there and moved 
> to the cold storage.
> Both have their limitations. The former one is simply a mechanism exposed to 
> allow operator building up the tooling necessary to enable this. Events could 
> be published in a manner similar to Mesos Event Bus 
> (https://mesosphere.github.io/marathon/docs/event-bus.html) or Kafka itself 
> could provide a control topic on which such info would be published. The 
> outcome is, the operator can subscribe to the event bus and get notified 
> about, at least, two events:
> - log segment is complete and can be backed up
> - partition leader changed
> These two, together with an option to keep the log segment safe from 
> compaction for a certain amount of time, would be sufficient to reliably 
> implement cold storage.
> The latter option, {{log.cleanup.policy}} setting would be more complete 
> feature but it is also much more difficult to implement.  All brokers would 
> have keep the backup of the data in the cold storage significantly increasing 
> the size requirements, also, the de-duplication of the data for the 
> replicated data would be left completely to the operator.
> In any case, the thing to stay away from is having Kafka to deal with the 
> physical aspect of moving the data to and back from the cold storage. This is 
> not Kafka's task. The intent is to provide a method for reliable cold storage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296398#comment-15296398
 ] 

Edoardo Comar commented on KAFKA-3727:
--

The consumer remains stuck in a loop because 
{quote}
Fetcher.listOffset(TopicPartition, long) line: 320 
Fetcher.resetOffset(TopicPartition) line: 294  
Fetcher.updateFetchPositions(Set) line: 170
KafkaConsumer.updateFetchPositions(Set) line: 1408 
KafkaConsumer.pollOnce(long) line: 982 
KafkaConsumer.poll(long) line: 937 
{quote}

that is, the consumer is stuck in awaitMetadataUpdate in Fetcher.java
{code}
  private long listOffset(TopicPartition partition, long timestamp) {
while (true) {
RequestFuture future = sendListOffsetRequest(partition, 
timestamp);
client.poll(future);

if (future.succeeded())
return future.value();

if (!future.isRetriable())
throw future.exception();

if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
else
time.sleep(retryBackoffMs);
}
}
{code}

Don't you think that the consumer should give up with an exception ?

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-23 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-3727:
-
Comment: was deleted

(was: To explain better, this is a stack trace when the consumer is stuck in 
the loop :
{quote}
Fetcher.listOffset(TopicPartition, long) line: 320 
Fetcher.resetOffset(TopicPartition) line: 294  
Fetcher.updateFetchPositions(Set) line: 170
KafkaConsumer.updateFetchPositions(Set) line: 1408 
KafkaConsumer.pollOnce(long) line: 982 
KafkaConsumer.poll(long) line: 937 
{quote}

that is, the ConsumerNetworkClient will keep invoking {{ awaitMetadataUpdate(); 
}}

{quote}
private long listOffset(TopicPartition partition, long timestamp) {
while (true) {
RequestFuture future = sendListOffsetRequest(partition, 
timestamp);
client.poll(future);

if (future.succeeded())
return future.value();

if (!future.isRetriable())
throw future.exception();

if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
else
time.sleep(retryBackoffMs);
}
}
{quote}

Don't you think the consumer should give up at some point ?)

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3727) Consumer.poll() stuck in loop on non-existent topic manually assigned

2016-05-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296394#comment-15296394
 ] 

Edoardo Comar commented on KAFKA-3727:
--

To explain better, this is a stack trace when the consumer is stuck in the loop 
:
{quote}
Fetcher.listOffset(TopicPartition, long) line: 320 
Fetcher.resetOffset(TopicPartition) line: 294  
Fetcher.updateFetchPositions(Set) line: 170
KafkaConsumer.updateFetchPositions(Set) line: 1408 
KafkaConsumer.pollOnce(long) line: 982 
KafkaConsumer.poll(long) line: 937 
{quote}

that is, the ConsumerNetworkClient will keep invoking {{ awaitMetadataUpdate(); 
}}

{quote}
private long listOffset(TopicPartition partition, long timestamp) {
while (true) {
RequestFuture future = sendListOffsetRequest(partition, 
timestamp);
client.poll(future);

if (future.succeeded())
return future.value();

if (!future.isRetriable())
throw future.exception();

if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
else
time.sleep(retryBackoffMs);
}
}
{quote}

Don't you think the consumer should give up at some point ?

> Consumer.poll() stuck in loop on non-existent topic manually assigned
> -
>
> Key: KAFKA-3727
> URL: https://issues.apache.org/jira/browse/KAFKA-3727
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Edoardo Comar
>
> The behavior of a consumer on poll() for a non-existing topic is surprisingly 
> different/inconsistent 
> between a consumer that subscribed to the topic and one that had the 
> topic-partition manually assigned.
> The "subscribed" consumer will return an empty collection
> The "assigned" consumer will *loop forever* - this feels a bug to me.
> sample snippet to reproduce:
> {quote}
> KafkaConsumer assignKc = new KafkaConsumer<>(props1);
> KafkaConsumer subsKc = new KafkaConsumer<>(props2);
> List tps = new ArrayList<>();
> tps.add(new TopicPartition("topic-not-exists", 0));
> assignKc.assign(tps);
> subsKc.subscribe(Arrays.asList("topic-not-exists"));
> System.out.println("* subscribe k consumer ");
> ConsumerRecords crs2 = subsKc.poll(1000L); 
> print("subscribeKc", crs2); // returns empty
> System.out.println("* assign k consumer ");
> ConsumerRecords crs1 = assignKc.poll(1000L); 
>// will loop forever ! 
> print("assignKc", crs1);
> {quote}
> the logs for the "assigned" consumer show:
> [2016-05-18 17:33:09,907] DEBUG Updated cluster metadata version 8 to 
> Cluster(nodes = [192.168.10.18:9093 (id: 0 rack: null)], partitions = []) 
> (org.apache.kafka.clients.Metadata)
> [2016-05-18 17:33:09,908] DEBUG Partition topic-not-exists-0 is unknown for 
> fetching offset, wait for metadata refresh 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-05-18 17:33:10,010] DEBUG Sending metadata request 
> {topics=[topic-not-exists]} to node 0 (org.apache.kafka.clients.NetworkClient)
> [2016-05-18 17:33:10,011] WARN Error while fetching metadata with correlation 
> id 9 : {topic-not-exists=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KAFKA-3722 : Discussion about custom PrincipalBuilder and Authorizer configs

2016-05-23 Thread Ismael Juma
Hi Mayuresh and Harsha,

If we were doing this from scratch, I would prefer option 4 too. However,
users have their own custom principal builders now and option 2 with a
suitably updated javadoc is the way to go in my opinion.

Ismael

On Sat, May 21, 2016 at 2:28 AM, Harsha  wrote:

> Mayuresh,
>  Thanks for the write up. With principal builder,
>  the idea is to reuse a single principal builder
>  across all the security protocols where its
>  applicable and given that principal builder has
>  access to transportLayer and authenticator it
>  should be able to figure out what type of
>  transportLayer it is and it should be able
>  construct the principal based on that and it should
>  handle all the security protocols that we support.
> In your options 1,2 & 4 seems to be doing  the same
> thing i.e checking what security protocol that a
> given transportLayer is and building a principal ,
> correct me if I am wrong here.   I like going with 4
> as others stated on PR . As passing
> security_protocol makes it more specific to the
> method that its need to be handled . In the interest
> of having less config I think option 4 seems to be
> better even though it breaks the interface.
>
> Thanks,
> Harsha
> On Fri, May 20, 2016, at 05:00 PM, Mayuresh Gharat wrote:
> > Hi All,
> >
> > I came across an issue with plugging in a custom PrincipalBuilder class
> > using the config "principal.builder.class" along with a custom Authorizer
> > class using the config "authorizer.class.name".
> >
> > Consider the following scenario :
> >
> > For PlainText we don't supply any PrincipalBuilder. For SSL we want to
> > supply a PrincipalBuilder using the property "principal.builder.class".
> >
> > a) Now consider we have a broker running on these 2 ports and supply that
> > custom principalBuilder class using that config.
> >
> > b) The interbroker communication is using PlainText. I am using a single
> > broker cluster for testing.
> >
> > c) Now we issue a produce request on the SSL port of the broker.
> >
> > d) The controller tries to build a channel for plaintext with this broker
> > for the new topic instructions.
> >
> > e) PlainText tries to use the principal builder specified in the
> > "principal.builder.class" config which was meant only for SSL port since
> > the code path is same "ChannelBuilders.createPrincipalBuilder(configs)".
> >
> > f) In the custom principal Builder if we are trying to do some cert
> > checks
> > or down conversion of transportLayer to SSLTransportLayer so that we can
> > use its functionality we get error/exception at runtime.
> >
> > The basic idea is the PlainText channel should not be using the
> > PrincipalBuilder meant for other types of channels.
> >
> > Now there are few options/workarounds to avoid this :
> >
> > 1) Do instanceOf check in Authorizer.authorize() on TransportLayer
> > instance
> > passed in and do the correct handling. This is not intuitive and imposes
> > a
> > strict coding rule on the programmer.
> >
> > 2) TransportLayer should expose an API for telling the security protocol
> > type. This is not too intuitive either.
> >
> > 3) Add extra configs for Authorizer and PrincipalBuilder for each channel
> > type. This gives us a flexibility for the PrincipalBuilder and Authorizer
> > handle requests on different types of ports in a different way.
> >
> > 4) PrincipalBuilder.buildPrincipal() should take in extra parameter for
> > the
> > type of protocol and we should document this in javadoc to use it to
> > handle
> > the type of request. This is little better than 1) and 2) but again
> > imposes
> > a strict coding rule on the programmer.
> >
> > Just wanted to know what the community thinks about this and get any
> > suggestions/feedback . There's some discussion about this here :
> > https://github.com/apache/kafka/pull/1403
> >
> > Thanks,
> >
> > Mayuresh
>


Re: [DISCUSS] scalability limits in the coordinator

2016-05-23 Thread Ismael Juma
+1 to Jun's suggestion.

Having said that, as a general point, I think we should consider supporting
topic patterns in the wire protocol. It requires some thinking for
cross-language support, but it seems surmountable and it could make certain
operations a lot more efficient (the fact that a basic regex subscription
causes the consumer to request metadata for all topics is not great).

Ismael

On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang  wrote:

> I like Jun's suggestion in changing the handling logics of single large
> message on the consumer side.
>
> As for the case of "a single group subscribing to 3000 topics", with 100
> consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK, we
> also have the znode limit which is set to 1Mb by default, though admittedly
> it is only for one consumer). And if we do the change as Jun suggested,
> 2.5Mb on follower's memory pressure is OK I think.
>
>
> Guozhang
>
>
> On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com
> > wrote:
>
> > Results without compression:
> > 1 consumer 292383 bytes
> > 5 consumers 1079579 bytes * the tipping point
> > 10 consumers 1855018 bytes
> > 20 consumers 2780220 bytes
> > 30 consumers 3705422 bytes
> > 40 consumers 4630624 bytes
> > 50 consumers 826 bytes
> > 60 consumers 6480788 bytes
> > 70 consumers 7405750 bytes
> > 80 consumers 8330712 bytes
> > 90 consumers 9255674 bytes
> > 100 consumers 10180636 bytes
> >
> > So it looks like gzip compression shrinks the message size by 4x.
> >
> > On Sat, May 21, 2016 at 9:47 AM, Jun Rao  wrote:
> >
> > > Onur,
> > >
> > > Thanks for the investigation.
> > >
> > > Another option is to just fix how we deal with the case when a message
> is
> > > larger than the fetch size. Today, if the fetch size is smaller than
> the
> > > fetch size, the consumer will get stuck. Instead, we can simply return
> > the
> > > full message if it's larger than the fetch size w/o requiring the
> > consumer
> > > to manually adjust the fetch size. On the broker side, to serve a fetch
> > > request, we already do an index lookup and then scan the log a bit to
> > find
> > > the message with the requested offset. We can just check the size of
> that
> > > message and return the full message if its size is larger than the
> fetch
> > > size. This way, fetch size is really for performance optimization, i.e.
> > in
> > > the common case, we will not return more bytes than fetch size, but if
> > > there is a large message, we will return more bytes than the specified
> > > fetch size. In practice, large messages are rare. So, it shouldn't
> > increase
> > > the memory consumption on the client too much.
> > >
> > > Jun
> > >
> > > On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com>
> > > wrote:
> > >
> > > > Hey everyone. So I started doing some tests on the new
> > > consumer/coordinator
> > > > to see if it could handle more strenuous use cases like mirroring
> > > clusters
> > > > with thousands of topics and thought I'd share whatever I have so
> far.
> > > >
> > > > The scalability limit: the amount of group metadata we can fit into
> one
> > > > message
> > > >
> > > > Some background:
> > > > Client-side assignment is implemented in two phases
> > > > 1. a PreparingRebalance phase that identifies members of the group
> and
> > > > aggregates member subscriptions.
> > > > 2. an AwaitingSync phase that waits for the group leader to decide
> > member
> > > > assignments based on the member subscriptions across the group.
> > > >   - The leader announces this decision with a SyncGroupRequest. The
> > > > GroupCoordinator handles SyncGroupRequests by appending all group
> state
> > > > into a single message under the __consumer_offsets topic. This
> message
> > is
> > > > keyed on the group id and contains each member subscription as well
> as
> > > the
> > > > decided assignment for each member.
> > > >
> > > > The environment:
> > > > - one broker
> > > > - one __consumer_offsets partition
> > > > - offsets.topic.compression.codec=1 // this is gzip
> > > > - broker has my pending KAFKA-3718 patch that actually makes use of
> > > > offsets.topic.compression.codec:
> > > https://github.com/apache/kafka/pull/1394
> > > > - around 3000 topics. This is an actual subset of topics from one of
> > our
> > > > clusters.
> > > > - topics have 8 partitions
> > > > - topics are 25 characters long on average
> > > > - one group with a varying number of consumers each hardcoded with
> all
> > > the
> > > > topics just to make the tests more consistent. wildcarding with .*
> > should
> > > > have the same effect once the subscription hits the coordinator as
> the
> > > > subscription has already been fully expanded out to the list of
> topics
> > by
> > > > the consumers.
> > > > - I added some log messages to Log.scala to print out the message
> sizes
> > > > after compression
> > > > - there are no producers 

[jira] [Commented] (KAFKA-3726) Enable cold storage option

2016-05-23 Thread Ben Stopford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296178#comment-15296178
 ] 

Ben Stopford commented on KAFKA-3726:
-

Ah, OK. Yes I quite like this idea, particularly in conjunction with compacted 
topics. It would be subject to the disk format remaining backwards compatible, 
but considering consumers get the disk format directly that's already, to some 
extent, part of the contract. 

So my interpretation of your use case, from your post, is: If you're ingesting 
data into Kafka, with the aim of getting into file based storage for offline 
processing, it would be simpler to just copy the Kafka data files directly, 
rather than consume them and recreate new files in cold storage. 

Is that correct? I mention this partially because you mention backing files up 
here, which makes me think of database backups etc, which is a slightly 
different use case. 


> Enable cold storage option
> --
>
> Key: KAFKA-3726
> URL: https://issues.apache.org/jira/browse/KAFKA-3726
> Project: Kafka
>  Issue Type: Wish
>Reporter: Radoslaw Gruchalski
> Attachments: kafka-cold-storage.txt
>
>
> This JIRA builds up on the cold storage article I have published on Medium. 
> The copy of the article attached here.
> The need for cold storage or an "indefinite" log seems to be quite often 
> discussed on the user mailing list.
> The cold storage idea would enable the opportunity for the operator to keep 
> the raw Kafka offset files in a third party storage and allow retrieving the 
> data back for re-consumption.
> The two possible options for enabling such functionality are, from the 
> article:
> First approach: if Kafka provided a notification mechanism and could trigger 
> a program when a segment file is to be discarded, it would become feasible to 
> provide a standard method of moving data to cold storage in reaction to those 
> events. Once the program finishes backing the segments up, it could tell 
> Kafka “it is now safe to delete these segments”.
> The second option is to provide an additional value for the 
> log.cleanup.policy setting, call it cold-storage. In case of this value, 
> Kafka would move the segment files — which otherwise would be deleted — to 
> another destination on the server. They can be picked up from there and moved 
> to the cold storage.
> Both have their limitations. The former one is simply a mechanism exposed to 
> allow operator building up the tooling necessary to enable this. Events could 
> be published in a manner similar to Mesos Event Bus 
> (https://mesosphere.github.io/marathon/docs/event-bus.html) or Kafka itself 
> could provide a control topic on which such info would be published. The 
> outcome is, the operator can subscribe to the event bus and get notified 
> about, at least, two events:
> - log segment is complete and can be backed up
> - partition leader changed
> These two, together with an option to keep the log segment safe from 
> compaction for a certain amount of time, would be sufficient to reliably 
> implement cold storage.
> The latter option, {{log.cleanup.policy}} setting would be more complete 
> feature but it is also much more difficult to implement.  All brokers would 
> have keep the backup of the data in the cold storage significantly increasing 
> the size requirements, also, the de-duplication of the data for the 
> replicated data would be left completely to the operator.
> In any case, the thing to stay away from is having Kafka to deal with the 
> physical aspect of moving the data to and back from the cold storage. This is 
> not Kafka's task. The intent is to provide a method for reliable cold storage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3567) Add --security-protocol option to console consumer and producer

2016-05-23 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296163#comment-15296163
 ] 

Edoardo Comar commented on KAFKA-3567:
--

Hi, the console producer has these options:

--producer-property   A mechanism to pass user-defined   
   properties in the form key=value to  
   the producer.
--producer.config   Producer config properties file. Note  
   that [producer-property] takes   
   precedence over this config. 

the console consumer has only

--consumer.config  Consumer config properties file.   

IMHO, it would make sense not to add a specific --security-protocol option to 
them,
but just add a 
--consumer-property 

to the console consumer

> Add --security-protocol option to console consumer and producer
> ---
>
> Key: KAFKA-3567
> URL: https://issues.apache.org/jira/browse/KAFKA-3567
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Sriharsha Chintalapani
>Assignee: Bharat Viswanadham
> Fix For: 0.9.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3647) Unable to set a ssl provider

2016-05-23 Thread Johan Abbors (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296098#comment-15296098
 ] 

Johan Abbors commented on KAFKA-3647:
-

Yeah, in my opinion this is enough.

The ssl.provider option should not require any updates. It was one of this 
things I (and probably also Edgar) started exploring and changing to find 
common cipher suites between clients/servers as I believed the issue was in the 
JVM ssl configuration.

> Unable to set a ssl provider
> 
>
> Key: KAFKA-3647
> URL: https://issues.apache.org/jira/browse/KAFKA-3647
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.9.0.1
> Environment: Centos, OracleJRE 8, Vagrant
>Reporter: Elvar
>Priority: Minor
>
> When defining a ssl provider Kafka does not start because the provider was 
> not found.
> {code}
> [2016-05-02 13:48:48,252] FATAL [Kafka Server 11], Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.KafkaException: 
> java.security.NoSuchProviderException: no such provider: sun.security.ec.SunEC
> at 
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
> {code}
> To test
> {code}
> /bin/kafka-server-start /etc/kafka/server.properties --override 
> ssl.provider=sun.security.ec.SunEC
> {code}
> This is stopping us from talking to Kafka with SSL from Go programs because 
> no common cipher suites are available.
> Using sslscan this is available from Kafka
> {code}
>  Supported Server Cipher(s):
>Accepted  TLSv1  256 bits  DHE-DSS-AES256-SHA
>Accepted  TLSv1  128 bits  DHE-DSS-AES128-SHA
>Accepted  TLSv1  128 bits  EDH-DSS-DES-CBC3-SHA
>Accepted  TLS11  256 bits  DHE-DSS-AES256-SHA
>Accepted  TLS11  128 bits  DHE-DSS-AES128-SHA
>Accepted  TLS11  128 bits  EDH-DSS-DES-CBC3-SHA
>Accepted  TLS12  256 bits  DHE-DSS-AES256-GCM-SHA384
>Accepted  TLS12  256 bits  DHE-DSS-AES256-SHA256
>Accepted  TLS12  256 bits  DHE-DSS-AES256-SHA
>Accepted  TLS12  128 bits  DHE-DSS-AES128-GCM-SHA256
>Accepted  TLS12  128 bits  DHE-DSS-AES128-SHA256
>Accepted  TLS12  128 bits  DHE-DSS-AES128-SHA
>Accepted  TLS12  128 bits  EDH-DSS-DES-CBC3-SHA
>  Preferred Server Cipher(s):
>SSLv2  0 bits(NONE)
>TLSv1  256 bits  DHE-DSS-AES256-SHA
>TLS11  256 bits  DHE-DSS-AES256-SHA
>TLS12  256 bits  DHE-DSS-AES256-GCM-SHA384
> {code}
> From the Golang documentation these are avilable there
> {code}
> TLS_RSA_WITH_RC4_128_SHAuint16 = 0x0005
> TLS_RSA_WITH_3DES_EDE_CBC_SHA   uint16 = 0x000a
> TLS_RSA_WITH_AES_128_CBC_SHAuint16 = 0x002f
> TLS_RSA_WITH_AES_256_CBC_SHAuint16 = 0x0035
> TLS_RSA_WITH_AES_128_GCM_SHA256 uint16 = 0x009c
> TLS_RSA_WITH_AES_256_GCM_SHA384 uint16 = 0x009d
> TLS_ECDHE_ECDSA_WITH_RC4_128_SHAuint16 = 0xc007
> TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHAuint16 = 0xc009
> TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHAuint16 = 0xc00a
> TLS_ECDHE_RSA_WITH_RC4_128_SHA  uint16 = 0xc011
> TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA uint16 = 0xc012
> TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA  uint16 = 0xc013
> TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA  uint16 = 0xc014
> TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256   uint16 = 0xc02f
> TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 uint16 = 0xc02b
> TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384   uint16 = 0xc030
> TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 uint16 = 0xc02c
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3736) Add http metrics reporter

2016-05-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296075#comment-15296075
 ] 

Ismael Juma commented on KAFKA-3736:


[~amuraru], what is your Apache Wiki id? I can give you the required 
permissions.

> Add http metrics reporter
> -
>
> Key: KAFKA-3736
> URL: https://issues.apache.org/jira/browse/KAFKA-3736
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Adrian Muraru
> Fix For: 0.10.1.0
>
>
> The current builtin JMX metrics reporter is pretty heavy in terms of load and 
> collection. A new http lightweight reporter is proposed to expose the metrics 
> via a local http port.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3736) Add http metrics reporter

2016-05-23 Thread Adrian Muraru (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296063#comment-15296063
 ] 

Adrian Muraru commented on KAFKA-3736:
--

Ok, just noticed that "Monitoring" is considered public facing change so I'm 
going to create a KIP.
For some reason I cannot add a new page on wiki here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
Do I need special perms?
Thanks a lot.

> Add http metrics reporter
> -
>
> Key: KAFKA-3736
> URL: https://issues.apache.org/jira/browse/KAFKA-3736
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Adrian Muraru
> Fix For: 0.10.1.0
>
>
> The current builtin JMX metrics reporter is pretty heavy in terms of load and 
> collection. A new http lightweight reporter is proposed to expose the metrics 
> via a local http port.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)