[jira] [Created] (KAFKA-6559) Iterate record sets before calling Log.append

2018-02-13 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-6559:
--

 Summary: Iterate record sets before calling Log.append
 Key: KAFKA-6559
 URL: https://issues.apache.org/jira/browse/KAFKA-6559
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 1.0.0
Reporter: Todd Palino
Assignee: Todd Palino


If a Produce request contains multiple record sets for a single 
topic-partition, it is better to iterate these before calling Log.append. This 
is because append will process all the sets together, and therefore will need 
to reassign offsets even if the offsets for an individual record set are 
properly formed. By iterating the record sets before calling append, each set 
can be considered on its own and potentially be appended without reassigning 
offsets.

While the core Java producer client does not current operate this way, it is 
permitted by the protocol and may be used by other clients that aggregate 
multiple batches together to produce them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Question Kafka Reassign partitions tool

2017-12-22 Thread Todd Palino
Yes, the replicas are stored in Zookeeper, so you can iterate over the
information there to build a view of the cluster that you can use. If you
want an example for this, take a look at the code for kafka-assigner in
https://github.com/linkedin/kafka-tools. Or you can just use that tool to
adjust replication factors and balance partitions.

-Todd


On Fri, Dec 22, 2017 at 9:21 AM, Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi Todd,
>
> Thanks for the reply. Problem is I have about 160 topics(5 partitions for
> each) for which I need to increase the replication factors for. So, I would
> have to find the current leader for each of the partitions and hand code
> the json which would become tedious.
>
> The partition leader info is stored in zookeeper? If yes, then can I
> probably use that get the current json and then build the json which can be
> fed to the tool. I tried to search using the zk shell but couldn't find...
>
> Sagar.
>
> On Fri, Dec 22, 2017 at 7:45 PM, Todd Palino <tpal...@gmail.com> wrote:
>
> > Preferred replica election is naive. It will always follow the order of
> the
> > replicas as they are set. So if you want to set the default leader, just
> > make it the first replica in the list for the partition. We build the
> JASON
> > this way all the time.
> >
> > -Todd
> >
> >
> > On Dec 22, 2017 6:46 AM, "Sagar" <sagarmeansoc...@gmail.com> wrote:
> >
> > Hi,
> >
> > Had a question on Kafka reassign partitions tool.
> >
> > We have a 3 node cluster but our replication factor is set to 1. So we
> have
> > been looking to increase it to 3 for HA.
> >
> > I tried the tool on a couple of topics and it increases the replication
> > factor alright. Also it doesn't change the leader as the leader still is
> in
> > the RAR.
> >
> > This is how I run it:
> >
> > Json which is used:
> >
> > {"version":1,"partitions":[
> >
> > {"topic":"cric-engine.engine.eng_fow","partition":3,"
> > replicas":[92,51,101]}]}
> >
> > Earlier config for the topic
> >
> > kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
> > 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
> > Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
> > Configs:
> > Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
> > 92,51,101 Isr: 101,51,92
> > Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
> > 92,51,101 Isr: 51,101,92
> > Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92
> Isr:
> > 92
> > Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas: 101
> > Isr:
> > 101
> > Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51
> Isr:
> > 51
> >
> > After running:
> >
> >  kafka-reassign-partitions --reassignment-json-file
> > increase-replication-factor.json --execute --zookeeper 10.0.4.165:2181,
> > 10.0.5.139:2181,10.0.6.106:2181
> >
> > partitions 3 Replicas increase:
> >
> > kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
> > 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
> > Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
> > Configs:
> > Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
> > 92,51,101 Isr: 101,51,92
> > Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
> > 92,51,101 Isr: 51,101,92
> > Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92
> Isr:
> > 92
> > Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas:
> > 92,51,101 Isr: 101,51,92
> > Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51
> Isr:
> > 51
> >
> > What I wanted to know is that does it affect the preferred replica? If
> you
> > see the Replicas, all of them are now 92,51,101 even though the leader
> has
> > remained the same from before. So, if any of the broker goes down or we
> > run kafka-preferred-replica-election.sh, wouldn't it move all the
> leaders
> > to broker 92? Is my assesment correct?
> >
> > If yes, then is there a way I can still do this operation by getting
> leader
> > for a partition first, then adding it to the replica list and then
> building
> > the json dynamically?
> >
> > Thanks!
> > Sagar.
> >
>



-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Question Kafka Reassign partitions tool

2017-12-22 Thread Todd Palino
Preferred replica election is naive. It will always follow the order of the
replicas as they are set. So if you want to set the default leader, just
make it the first replica in the list for the partition. We build the JASON
this way all the time.

-Todd


On Dec 22, 2017 6:46 AM, "Sagar"  wrote:

Hi,

Had a question on Kafka reassign partitions tool.

We have a 3 node cluster but our replication factor is set to 1. So we have
been looking to increase it to 3 for HA.

I tried the tool on a couple of topics and it increases the replication
factor alright. Also it doesn't change the leader as the leader still is in
the RAR.

This is how I run it:

Json which is used:

{"version":1,"partitions":[

{"topic":"cric-engine.engine.eng_fow","partition":3,"
replicas":[92,51,101]}]}

Earlier config for the topic

kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
Configs:
Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
92,51,101 Isr: 101,51,92
Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
92,51,101 Isr: 51,101,92
Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 Isr:
92
Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas: 101
Isr:
101
Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 Isr:
51

After running:

 kafka-reassign-partitions --reassignment-json-file
increase-replication-factor.json --execute --zookeeper 10.0.4.165:2181,
10.0.5.139:2181,10.0.6.106:2181

partitions 3 Replicas increase:

kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper
10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181
Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3
Configs:
Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas:
92,51,101 Isr: 101,51,92
Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas:
92,51,101 Isr: 51,101,92
Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 Isr:
92
Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas:
92,51,101 Isr: 101,51,92
Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 Isr:
51

What I wanted to know is that does it affect the preferred replica? If you
see the Replicas, all of them are now 92,51,101 even though the leader has
remained the same from before. So, if any of the broker goes down or we
run kafka-preferred-replica-election.sh, wouldn't it move all the leaders
to broker 92? Is my assesment correct?

If yes, then is there a way I can still do this operation by getting leader
for a partition first, then adding it to the replica list and then building
the json dynamically?

Thanks!
Sagar.


Re: Plans to improve SSL performance in Kafka, for 0.10.x?

2017-09-04 Thread Todd Palino
I will say that we've been turning on TLS consume lately (including using
it for IBP, which we've been doing for a while) and we haven't seen any of
the performance concerns that we originally did. Very little hit at all.

-Todd

On Sep 4, 2017 12:45 AM, "Ismael Juma" <ism...@juma.me.uk> wrote:

> By the way, in-kernel TLS has now landed in the Linux kernel:
>
> https://github.com/torvalds/linux/blob/master/
> Documentation/networking/tls.txt
>
> There is work in progress to take advantage of that in OpenSSL:
>
> https://github.com/Mellanox/tls-openssl
>
> Ismael
>
>
> On Tue, Sep 6, 2016 at 1:48 PM, Todd Palino <tpal...@gmail.com> wrote:
>
> > Yeah, that's why I mentioned it with a caveat :) Someone (I can't recall
> > who, but it was someone I consider reasonably knowledgable as I actually
> > gave it some weight) mentioned it, but I haven't looked into it further
> > than that. I agree that I don't see how this is going to help us at the
> app
> > layer.
> >
> > -Todd
> >
> > On Tuesday, September 6, 2016, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > Hi Todd,
> > >
> > > Thanks for sharing your experience enabling TLS in your clusters. Very
> > > helpful. One comment below.
> > >
> > > On Sun, Sep 4, 2016 at 6:28 PM, Todd Palino <tpal...@gmail.com
> > > <javascript:;>> wrote:
> > > >
> > > > Right now, we're specifically avoiding moving consume traffic to SSL,
> > due
> > > > to the zero copy send issue. Now I've been told (but I have not
> > > > investigated) that OpenSSL can solve this. It would probably be a
> good
> > > use
> > > > of time to look into that further.
> > > >
> > >
> > > As far as I know, OpenSSL can reduce the TLS overhead, but we will
> still
> > > lose the zero-copy optimisation. There is some attempts at making it
> > > possible to retain zero-copy with TLS in the kernel[1][2], but it's
> > > probably too early for us to consider that for Kafka.
> > >
> > > Ismael
> > >
> > > [1] https://lwn.net/Articles/666509/
> > > [2]
> > > http://techblog.netflix.com/2016/08/protecting-netflix-
> > > viewing-privacy-at.html
> > >
> >
> >
> > --
> > *Todd Palino*
> > Staff Site Reliability Engineer
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
> >
>


[jira] [Updated] (KAFKA-5056) Shuffling of partitions in old consumer fetch requests removed

2017-04-11 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-5056:
---
Summary: Shuffling of partitions in old consumer fetch requests removed  
(was: Shuffling of partitions in old consume fetch requests removed)

> Shuffling of partitions in old consumer fetch requests removed
> --
>
> Key: KAFKA-5056
> URL: https://issues.apache.org/jira/browse/KAFKA-5056
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>    Reporter: Todd Palino
>    Assignee: Todd Palino
>
> [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes]
>  deprecated the constructor to {{FetchRequest}} which shuffles the 
> {{requestInfo}} parameter, in favor of round robin reordering logic added to 
> the replica fetcher and the consumer API. However, this was not added to the 
> old consumer {{ConsumerFetcherThread}}, which has resulted in unfair 
> partition fetching since 0.10.1.
> In order to maintain the old consumer, we need to add the removed shuffle to 
> {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} 
> is composed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed

2017-04-11 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-5056:
---
Reviewer: Joel Koshy
  Status: Patch Available  (was: In Progress)

> Shuffling of partitions in old consume fetch requests removed
> -
>
> Key: KAFKA-5056
> URL: https://issues.apache.org/jira/browse/KAFKA-5056
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>    Reporter: Todd Palino
>    Assignee: Todd Palino
>
> [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes]
>  deprecated the constructor to {{FetchRequest}} which shuffles the 
> {{requestInfo}} parameter, in favor of round robin reordering logic added to 
> the replica fetcher and the consumer API. However, this was not added to the 
> old consumer {{ConsumerFetcherThread}}, which has resulted in unfair 
> partition fetching since 0.10.1.
> In order to maintain the old consumer, we need to add the removed shuffle to 
> {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} 
> is composed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed

2017-04-11 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5056 started by Todd Palino.
--
> Shuffling of partitions in old consume fetch requests removed
> -
>
> Key: KAFKA-5056
> URL: https://issues.apache.org/jira/browse/KAFKA-5056
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>    Reporter: Todd Palino
>    Assignee: Todd Palino
>
> [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes]
>  deprecated the constructor to {{FetchRequest}} which shuffles the 
> {{requestInfo}} parameter, in favor of round robin reordering logic added to 
> the replica fetcher and the consumer API. However, this was not added to the 
> old consumer {{ConsumerFetcherThread}}, which has resulted in unfair 
> partition fetching since 0.10.1.
> In order to maintain the old consumer, we need to add the removed shuffle to 
> {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} 
> is composed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed

2017-04-11 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-5056:
--

 Summary: Shuffling of partitions in old consume fetch requests 
removed
 Key: KAFKA-5056
 URL: https://issues.apache.org/jira/browse/KAFKA-5056
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Todd Palino
Assignee: Todd Palino


[KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes]
 deprecated the constructor to {{FetchRequest}} which shuffles the 
{{requestInfo}} parameter, in favor of round robin reordering logic added to 
the replica fetcher and the consumer API. However, this was not added to the 
old consumer {{ConsumerFetcherThread}}, which has resulted in unfair partition 
fetching since 0.10.1.

In order to maintain the old consumer, we need to add the removed shuffle to 
{{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} is 
composed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-03-08 Thread Todd Palino
Rajini -

I understand what you’re saying, but the point I’m making is that I don’t
believe we need to take it into account directly. The CPU utilization of
the network threads is directly proportional to the number of bytes being
sent. The more bytes, the more CPU that is required for SSL (or other
tasks). This is opposed to the request handler threads, where there are a
number of factors that affect CPU utilization. This means that it’s not
necessary to separately quota network thread byte usage and CPU - if we
quota byte usage (which we already do), we have fixed the CPU usage at a
proportional amount.

Jun -

Thanks for the clarification there. I was thinking of the utilization
percentage as being fixed, not what the percentage reflects. I’m not tied
to either way of doing it, provided that we do not lock clients to a single
thread. For example, if I specify that a given client can use 10% of a
single thread, that should also mean they can use 1% on 10 threads.

-Todd



On Wed, Mar 8, 2017 at 8:57 AM, Jun Rao <j...@confluent.io> wrote:

> Hi, Todd,
>
> Thanks for the feedback.
>
> I just want to clarify your second point. If the limit percentage is per
> thread and the thread counts are changed, the absolute processing limit for
> existing users haven't changed and there is no need to adjust them. On the
> other hand, if the limit percentage is of total thread pool capacity and
> the thread counts are changed, the effective processing limit for a user
> will change. So, to preserve the current processing limit, existing user
> limits have to be adjusted. If there is a hardware change, the effective
> processing limit for a user will change in either approach and the existing
> limit may need to be adjusted. However, hardware changes are less common
> than thread pool configuration changes.
>
> Thanks,
>
> Jun
>
> On Tue, Mar 7, 2017 at 4:45 PM, Todd Palino <tpal...@gmail.com> wrote:
>
> > I’ve been following this one on and off, and overall it sounds good to
> me.
> >
> > - The SSL question is a good one. However, that type of overhead should
> be
> > proportional to the bytes rate, so I think that a bytes rate quota would
> > still be a suitable way to address it.
> >
> > - I think it’s better to make the quota percentage of total thread pool
> > capacity, and not percentage of an individual thread. That way you don’t
> > have to adjust it when you adjust thread counts (tuning, hardware
> changes,
> > etc.)
> >
> >
> > -Todd
> >
> >
> >
> > On Tue, Mar 7, 2017 at 2:38 PM, Becket Qin <becket@gmail.com> wrote:
> >
> > > I see. Good point about SSL.
> > >
> > > I just asked Todd to take a look.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Mar 7, 2017 at 2:17 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Jiangjie,
> > > >
> > > > Yes, I agree that byte rate already protects the network threads
> > > > indirectly. I am not sure if byte rate fully captures the CPU
> overhead
> > in
> > > > network due to SSL. So, at the high level, we can use request time
> > limit
> > > to
> > > > protect CPU and use byte rate to protect storage and network.
> > > >
> > > > Also, do you think you can get Todd to comment on this KIP?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Mar 7, 2017 at 11:21 AM, Becket Qin <becket@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Rajini/Jun,
> > > > >
> > > > > The percentage based reasoning sounds good.
> > > > > One thing I am wondering is that if we assume the network thread
> are
> > > just
> > > > > doing the network IO, can we say bytes rate quota is already sort
> of
> > > > > network threads quota?
> > > > > If we take network threads into the consideration here, would that
> be
> > > > > somewhat overlapping with the bytes rate quota?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Tue, Mar 7, 2017 at 11:04 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Jun,
> > > > > >
> > > > > > Thank you for the explanation, I hadn't realized you meant
> > percentage
> > &g

Re: [DISCUSS] KIP-124: Request rate quotas

2017-03-07 Thread Todd Palino
I’ve been following this one on and off, and overall it sounds good to me.

- The SSL question is a good one. However, that type of overhead should be
proportional to the bytes rate, so I think that a bytes rate quota would
still be a suitable way to address it.

- I think it’s better to make the quota percentage of total thread pool
capacity, and not percentage of an individual thread. That way you don’t
have to adjust it when you adjust thread counts (tuning, hardware changes,
etc.)


-Todd



On Tue, Mar 7, 2017 at 2:38 PM, Becket Qin  wrote:

> I see. Good point about SSL.
>
> I just asked Todd to take a look.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 7, 2017 at 2:17 PM, Jun Rao  wrote:
>
> > Hi, Jiangjie,
> >
> > Yes, I agree that byte rate already protects the network threads
> > indirectly. I am not sure if byte rate fully captures the CPU overhead in
> > network due to SSL. So, at the high level, we can use request time limit
> to
> > protect CPU and use byte rate to protect storage and network.
> >
> > Also, do you think you can get Todd to comment on this KIP?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Mar 7, 2017 at 11:21 AM, Becket Qin 
> wrote:
> >
> > > Hi Rajini/Jun,
> > >
> > > The percentage based reasoning sounds good.
> > > One thing I am wondering is that if we assume the network thread are
> just
> > > doing the network IO, can we say bytes rate quota is already sort of
> > > network threads quota?
> > > If we take network threads into the consideration here, would that be
> > > somewhat overlapping with the bytes rate quota?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Mar 7, 2017 at 11:04 AM, Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Jun,
> > > >
> > > > Thank you for the explanation, I hadn't realized you meant percentage
> > of
> > > > the total thread pool. If everyone is OK with Jun's suggestion, I
> will
> > > > update the KIP.
> > > >
> > > > Thanks,
> > > >
> > > > Rajini
> > > >
> > > > On Tue, Mar 7, 2017 at 5:08 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Rajini,
> > > > >
> > > > > Let's take your example. Let's say a user sets the limit to 50%. I
> am
> > > not
> > > > > sure if it's better to apply the same percentage separately to
> > network
> > > > and
> > > > > io thread pool. For example, for produce requests, most of the time
> > > will
> > > > be
> > > > > spent in the io threads whereas for fetch requests, most of the
> time
> > > will
> > > > > be in the network threads. So, using the same percentage in both
> > thread
> > > > > pools means one of the pools' resource will be over allocated.
> > > > >
> > > > > An alternative way is to simply model network and io thread pool
> > > > together.
> > > > > If you get 10 io threads and 5 network threads, you get 1500%
> request
> > > > > processing power. A 50% limit means a total of 750% processing
> power.
> > > We
> > > > > just add up the time a user request spent in either network or io
> > > thread.
> > > > > If that total exceeds 750% (doesn't matter whether it's spent more
> in
> > > > > network or io thread), the request will be throttled. This seems
> more
> > > > > general and is not sensitive to the current implementation detail
> of
> > > > having
> > > > > a separate network and io thread pool. In the future, if the
> > threading
> > > > > model changes, the same concept of quota can still be applied. For
> > now,
> > > > > since it's a bit tricky to add the delay logic in the network
> thread
> > > > pool,
> > > > > we could probably just do the delaying only in the io threads as
> you
> > > > > suggested earlier.
> > > > >
> > > > > There is still the orthogonal question of whether a quota of 50% is
> > out
> > > > of
> > > > > 100% or 100% * #total processing threads. My feeling is that the
> > latter
> > > > is
> > > > > slightly better based on my explanation earlier. The way to
> describe
> > > this
> > > > > quota to the users can be "share of elapsed request processing time
> > on
> > > a
> > > > > single CPU" (similar to top).
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Mar 3, 2017 at 4:22 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Jun,
> > > > > >
> > > > > > Agree about the two scenarios.
> > > > > >
> > > > > > But still not sure about a single quota covering both network
> > threads
> > > > and
> > > > > > I/O threads with per-thread quota. If there are 10 I/O threads
> and
> > 5
> > > > > > network threads and I want to assign half the quota to userA, the
> > > quota
> > > > > > would be 750%. I imagine, internally, we would convert this to
> 500%
> > > for
> > > > > I/O
> > > > > > and 250% for network threads to allocate 50% of each pool.
> > > > > >
> > > > > > A couple of scenarios:
> > > > > >
> > > > > > 1. Admin adds 1 extra network 

[jira] [Commented] (KAFKA-1342) Slow controlled shutdowns can result in stale shutdown requests

2017-03-02 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893728#comment-15893728
 ] 

Todd Palino commented on KAFKA-1342:


[~wushujames], I'm not sure about increasing the number of requests, but a lot 
of this should have been resolved recently with some updates that I believe 
[~becket_qin] made. To make it better, we had bumped 
controller.socket.timeout.ms significantly (to 30). We didn't see any side 
effects from doing that.

> Slow controlled shutdowns can result in stale shutdown requests
> ---
>
> Key: KAFKA-1342
> URL: https://issues.apache.org/jira/browse/KAFKA-1342
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: newbie++, newbiee, reliability
> Fix For: 0.11.0.0
>
>
> I don't think this is a bug introduced in 0.8.1., but triggered by the fact
> that controlled shutdown seems to have become slower in 0.8.1 (will file a
> separate ticket to investigate that). When doing a rolling bounce, it is
> possible for a bounced broker to stop all its replica fetchers since the
> previous PID's shutdown requests are still being shutdown.
> - 515 is the controller
> - Controlled shutdown initiated for 503
> - Controller starts controlled shutdown for 503
> - The controlled shutdown takes a long time in moving leaders and moving
>   follower replicas on 503 to the offline state.
> - So 503's read from the shutdown channel times out and a new channel is
>   created. It issues another shutdown request.  This request (since it is a
>   new channel) is accepted at the controller's socket server but then waits
>   on the broker shutdown lock held by the previous controlled shutdown which
>   is still in progress.
> - The above step repeats for the remaining retries (six more requests).
> - 503 hits SocketTimeout exception on reading the response of the last
>   shutdown request and proceeds to do an unclean shutdown.
> - The controller's onBrokerFailure call-back fires and moves 503's replicas
>   to offline (not too important in this sequence).
> - 503 is brought back up.
> - The controller's onBrokerStartup call-back fires and moves its replicas
>   (and partitions) to online state. 503 starts its replica fetchers.
> - Unfortunately, the (phantom) shutdown requests are still being handled and
>   the controller sends StopReplica requests to 503.
> - The first shutdown request finally finishes (after 76 minutes in my case!).
> - The remaining shutdown requests also execute and do the same thing (sends
>   StopReplica requests for all partitions to
>   503).
> - The remaining requests complete quickly because they end up not having to
>   touch zookeeper paths - no leaders left on the broker and no need to
>   shrink ISR in zookeeper since it has already been done by the first
>   shutdown request.
> - So in the end-state 503 is up, but effectively idle due to the previous
>   PID's shutdown requests.
> There are some obvious fixes that can be made to controlled shutdown to help
> address the above issue. E.g., we don't really need to move follower
> partitions to Offline. We did that as an "optimization" so the broker falls
> out of ISR sooner - which is helpful when producers set required.acks to -1.
> However it adds a lot of latency to controlled shutdown. Also, (more
> importantly) we should have a mechanism to abort any stale shutdown process.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-28 Thread Todd Palino
re and it takes time to rebuild disk after one disk
> > > > failure. RAID 5 implementations are susceptible to system failures
> > > because
> > > > of trends regarding array rebuild time and the chance of drive
> failure
> > > > during rebuild. There is no such performance degradation for JBOD and
> > > JBOD
> > > > can support multiple log directory failure without reducing
> performance
> > > of
> > > > good log directories. Would this be a reasonable reason for using
> JBOD
> > > > instead of RAID-5/6?
> > > >
> > > > Previously we discussed wether broker should remove offline replica
> > from
> > > > replica fetcher thread. I still think it should do it instead of
> > > printing a
> > > > lot of error in the log4j log. We can still let controller send
> > > > StopReplicaRequest to the broker. I am not sure I undertand why
> > allowing
> > > > broker to remove offline replica from fetcher thread will increase
> > churns
> > > > in ISR. Do you think this is concern with this approach?
> > > >
> > > > I have updated the KIP to remove created flag from ZK and change the
> > > filed
> > > > name to isNewReplica. Can you check if there is any issue with the
> > latest
> > > > KIP? Thanks for your time!
> > > >
> > > > Regards,
> > > > Dong
> > > >
> > > >
> > > > On Sat, Feb 25, 2017 at 9:11 AM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > Personally, I'd prefer not to write the created flag per replica in
> > ZK.
> > > > > Your suggestion of disabling replica creation if there is a bad log
> > > > > directory on the broker could work. The only thing is that it may
> > delay
> > > > the
> > > > > creation of new replicas. I was thinking that an alternative is to
> > > extend
> > > > > LeaderAndIsrRequest by adding a isNewReplica field per replica.
> That
> > > > field
> > > > > will be set when a replica is transitioning from the NewReplica
> state
> > > to
> > > > > Online state. Then, when a broker receives a LeaderAndIsrRequest,
> if
> > a
> > > > > replica is marked as the new replica, it will be created on a good
> > log
> > > > > directory, if not already present. Otherwise, it only creates the
> > > replica
> > > > > if all log directories are good and the replica is not already
> > present.
> > > > > This way, we don't delay the processing of new replicas in the
> common
> > > > case.
> > > > >
> > > > > I am ok with not persisting the offline replicas in ZK and just
> > > > discovering
> > > > > them through the LeaderAndIsrRequest. It handles the cases when a
> > > broker
> > > > > starts up with bad log directories better. So, the additional
> > overhead
> > > of
> > > > > rediscovering the offline replicas is justified.
> > > > >
> > > > >
> > > > > Another high level question. The proposal rejected RAID5/6 since it
> > > adds
> > > > > additional I/Os. The main issue with RAID5 is that to write a block
> > > that
> > > > > doesn't match the RAID stripe size, we have to first read the old
> > > parity
> > > > to
> > > > > compute the new one, which increases the number of I/Os (
> > > > > http://rickardnobel.se/raid-5-write-penalty/). I am wondering if
> you
> > > > have
> > > > > tested RAID5's performance by creating a file system whose block
> size
> > > > > matches the RAID stripe size (https://www.percona.com/blog/
> > > > > 2011/12/16/setting-up-xfs-the-simple-edition/). This way, writing
> a
> > > > block
> > > > > doesn't require a read first. A large block size may increase the
> > > amount
> > > > of
> > > > > data writes, when the same block has to be written to disk multiple
> > > > times.
> > > > > However, this is probably ok in Kafka's use case since we batch the
> > I/O
> > > > > flush already. As you can see, we will be adding some complexity to
> > > > support
> > > > > JBOD in Kafka one way or another. If we can tune the performance of
> > > RAID5
> > > > > to match that of RAID10, perhaps using RAID5 is a simpler solution.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Feb 24, 2017 at 10:17 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > I don't think we should allow failed replicas to be re-created on
> > the
> > > > > good
> > > > > > disks. Say there are 2 disks and each of them is 51% loaded. If
> any
> > > > disk
> > > > > > fail, and we allow replicas to be re-created on the other disks,
> > both
> > > > > disks
> > > > > > will fail. Alternatively we can disable replica creation if there
> > is
> > > > bad
> > > > > > disk on a broker. I personally think it is worth the additional
> > > > > complexity
> > > > > > in the broker to store created replicas in ZK so that we allow
> new
> > > > > replicas
> > > > > > to be created on the broker even when there is bad log directory.
> > > This
> > > > > > approach won't add complexity in the controller. But I am fine
> with
> > > > > > disabling replica creation when there is bad log directory that
> if
> > it
> > > > is
> > > > > > the only blocking issue for this KIP.
> > > > > >
> > > > > > Whether we store created flags is independent of whether/how we
> > store
> > > > > > offline replicas. Per our previous discussion, do you think it is
> > OK
> > > > not
> > > > > > store offline replicas in ZK and propagate the offline replicas
> > from
> > > > > broker
> > > > > > to controller via LeaderAndIsrRequest?
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > >
> > > >
> > >
> >
>



-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
Come on, I’ve done at least 2 talks on this one :)

Producing counts to a topic is part of it, but that’s only part. So you
count you have 100 messages in topic A. When you mirror topic A to another
cluster, you have 99 messages. Where was your problem? Or worse, you have
100 messages, but one producer duplicated messages and another one lost
messages. You need details about where the message came from in order to
pinpoint problems when they happen. Source producer info, where it was
produced into your infrastructure, and when it was produced. This requires
you to add the information to the message.

And yes, you still need to maintain your clients. So maybe my original
example was not the best. My thoughts on not wanting to be responsible for
message formats stands, because that’s very much separate from the client.
As you know, we have our own internal client library that can insert the
right headers, and right now inserts the right audit information into the
message fields. If they exist, and assuming the message is Avro encoded.
What if someone wants to use JSON instead for a good reason? What if user X
wants to encrypt messages, but user Y does not? Maintaining the client
library is still much easier than maintaining the message formats.

-Todd


On Thu, Dec 1, 2016 at 6:21 PM, Gwen Shapira <g...@confluent.io> wrote:

> Based on your last sentence, consider me convinced :)
>
> I get why headers are critical for Mirroring (you need tags to prevent
> loops and sometimes to route messages to the correct destination).
> But why do you need headers to audit? We are auditing by producing
> counts to a side topic (and I was under the impression you do the
> same), so we never need to modify the message.
>
> Another thing - after we added headers, wouldn't you be in the
> business of making sure everyone uses them properly? Making sure
> everyone includes the right headers you need, not using the header
> names you intend to use, etc. I don't think the "policing" business
> will ever go away.
>
> On Thu, Dec 1, 2016 at 5:25 PM, Todd Palino <tpal...@gmail.com> wrote:
> > Got it. As an ops guy, I'm not very happy with the workaround. Avro means
> > that I have to be concerned with the format of the messages in order to
> run
> > the infrastructure (audit, mirroring, etc.). That means that I have to
> > handle the schemas, and I have to enforce rules about good formats. This
> is
> > not something I want to be in the business of, because I should be able
> to
> > run a service infrastructure without needing to be in the weeds of
> dealing
> > with customer data formats.
> >
> > Trust me, a sizable portion of my support time is spent dealing with
> schema
> > issues. I really would like to get away from that. Maybe I'd have more
> time
> > for other hobbies. Like writing. ;)
> >
> > -Todd
> >
> > On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira <g...@confluent.io> wrote:
> >
> >> I'm pretty satisfied with the current workarounds (Avro container
> >> format), so I'm not too excited about the extra work required to do
> >> headers in Kafka. I absolutely don't mind it if you do it...
> >> I think the Apache convention for "good idea, but not willing to put
> >> any work toward it" is +0.5? anyway, that's what I was trying to
> >> convey :)
> >>
> >> On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino <tpal...@gmail.com> wrote:
> >> > Well I guess my question for you, then, is what is holding you back
> from
> >> > full support for headers? What’s the bit that you’re missing that has
> you
> >> > under a full +1?
> >> >
> >> > -Todd
> >> >
> >> >
> >> > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira <g...@confluent.io>
> wrote:
> >> >
> >> >> I know why people who support headers support them, and I've seen
> what
> >> >> the discussion is like.
> >> >>
> >> >> This is why I'm asking people who are against headers (especially
> >> >> committers) what will make them change their mind - so we can get
> this
> >> >> part over one way or another.
> >> >>
> >> >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
> >> >> just looking for something concrete we can do to move the discussion
> >> >> along to the yummy design details (which is the argument I really am
> >> >> looking forward to).
> >> >>
> >> >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino <tpal...@gmail.com>
> wrote:
> >> >> > So, Gwen, to yo

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
Got it. As an ops guy, I'm not very happy with the workaround. Avro means
that I have to be concerned with the format of the messages in order to run
the infrastructure (audit, mirroring, etc.). That means that I have to
handle the schemas, and I have to enforce rules about good formats. This is
not something I want to be in the business of, because I should be able to
run a service infrastructure without needing to be in the weeds of dealing
with customer data formats.

Trust me, a sizable portion of my support time is spent dealing with schema
issues. I really would like to get away from that. Maybe I'd have more time
for other hobbies. Like writing. ;)

-Todd

On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira <g...@confluent.io> wrote:

> I'm pretty satisfied with the current workarounds (Avro container
> format), so I'm not too excited about the extra work required to do
> headers in Kafka. I absolutely don't mind it if you do it...
> I think the Apache convention for "good idea, but not willing to put
> any work toward it" is +0.5? anyway, that's what I was trying to
> convey :)
>
> On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino <tpal...@gmail.com> wrote:
> > Well I guess my question for you, then, is what is holding you back from
> > full support for headers? What’s the bit that you’re missing that has you
> > under a full +1?
> >
> > -Todd
> >
> >
> > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira <g...@confluent.io> wrote:
> >
> >> I know why people who support headers support them, and I've seen what
> >> the discussion is like.
> >>
> >> This is why I'm asking people who are against headers (especially
> >> committers) what will make them change their mind - so we can get this
> >> part over one way or another.
> >>
> >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
> >> just looking for something concrete we can do to move the discussion
> >> along to the yummy design details (which is the argument I really am
> >> looking forward to).
> >>
> >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino <tpal...@gmail.com> wrote:
> >> > So, Gwen, to your question (even though I’m not a committer)...
> >> >
> >> > I have always been a strong supporter of introducing the concept of an
> >> > envelope to messages, which headers accomplishes. The message key is
> >> > already an example of a piece of envelope information. By providing a
> >> means
> >> > to do this within Kafka itself, and not relying on use-case specific
> >> > implementations, you make it much easier for components to
> interoperate.
> >> It
> >> > simplifies development of all these things (message routing, auditing,
> >> > encryption, etc.) because each one does not have to reinvent the
> wheel.
> >> >
> >> > It also makes it much easier from a client point of view if the
> headers
> >> are
> >> > defined as part of the protocol and/or message format in general
> because
> >> > you can easily produce and consume messages without having to take
> into
> >> > account specific cases. For example, I want to route messages, but
> >> client A
> >> > doesn’t support the way audit implemented headers, and client B
> doesn’t
> >> > support the way encryption or routing implemented headers, so now my
> >> > application has to create some really fragile (my autocorrect just
> tried
> >> to
> >> > make that “tragic”, which is probably appropriate too) code to strip
> >> > everything off, rather than just consuming the messages, picking out
> the
> >> 1
> >> > or 2 headers it’s interested in, and performing its function.
> >> >
> >> > Honestly, this discussion has been going on for a long time, and it’s
> >> > always “Oh, you came up with 2 use cases, and yeah, those use cases
> are
> >> > real things that someone would want to do. Here’s an alternate way to
> >> > implement them so let’s not do headers.” If we have a few use cases
> that
> >> we
> >> > actually came up with, you can be sure that over the next year
> there’s a
> >> > dozen others that we didn’t think of that someone would like to do. I
> >> > really think it’s time to stop rehashing this discussion and instead
> >> focus
> >> > on a workable standard that we can adopt.
> >> >
> >> > -Todd
> >> >
> >> >
> >> > On Thu, Dec 1, 2016 at 1:39 PM,

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
Well I guess my question for you, then, is what is holding you back from
full support for headers? What’s the bit that you’re missing that has you
under a full +1?

-Todd


On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira <g...@confluent.io> wrote:

> I know why people who support headers support them, and I've seen what
> the discussion is like.
>
> This is why I'm asking people who are against headers (especially
> committers) what will make them change their mind - so we can get this
> part over one way or another.
>
> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
> just looking for something concrete we can do to move the discussion
> along to the yummy design details (which is the argument I really am
> looking forward to).
>
> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino <tpal...@gmail.com> wrote:
> > So, Gwen, to your question (even though I’m not a committer)...
> >
> > I have always been a strong supporter of introducing the concept of an
> > envelope to messages, which headers accomplishes. The message key is
> > already an example of a piece of envelope information. By providing a
> means
> > to do this within Kafka itself, and not relying on use-case specific
> > implementations, you make it much easier for components to interoperate.
> It
> > simplifies development of all these things (message routing, auditing,
> > encryption, etc.) because each one does not have to reinvent the wheel.
> >
> > It also makes it much easier from a client point of view if the headers
> are
> > defined as part of the protocol and/or message format in general because
> > you can easily produce and consume messages without having to take into
> > account specific cases. For example, I want to route messages, but
> client A
> > doesn’t support the way audit implemented headers, and client B doesn’t
> > support the way encryption or routing implemented headers, so now my
> > application has to create some really fragile (my autocorrect just tried
> to
> > make that “tragic”, which is probably appropriate too) code to strip
> > everything off, rather than just consuming the messages, picking out the
> 1
> > or 2 headers it’s interested in, and performing its function.
> >
> > Honestly, this discussion has been going on for a long time, and it’s
> > always “Oh, you came up with 2 use cases, and yeah, those use cases are
> > real things that someone would want to do. Here’s an alternate way to
> > implement them so let’s not do headers.” If we have a few use cases that
> we
> > actually came up with, you can be sure that over the next year there’s a
> > dozen others that we didn’t think of that someone would like to do. I
> > really think it’s time to stop rehashing this discussion and instead
> focus
> > on a workable standard that we can adopt.
> >
> > -Todd
> >
> >
> > On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino <tpal...@gmail.com> wrote:
> >
> >> C. per message encryption
> >>> One drawback of this approach is that this significantly reduce the
> >>> effectiveness of compression, which happens on a set of serialized
> >>> messages. An alternative is to enable SSL for wire encryption and rely
> on
> >>> the storage system (e.g. LUKS) for at rest encryption.
> >>
> >>
> >> Jun, this is not sufficient. While this does cover the case of removing
> a
> >> drive from the system, it will not satisfy most compliance requirements
> for
> >> encryption of data as whoever has access to the broker itself still has
> >> access to the unencrypted data. For end-to-end encryption you need to
> >> encrypt at the producer, before it enters the system, and decrypt at the
> >> consumer, after it exits the system.
> >>
> >> -Todd
> >>
> >>
> >> On Thu, Dec 1, 2016 at 1:03 PM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >>
> >>> another big plus of headers in the protocol is that it would enable
> rapid
> >>> iteration on ideas outside of core kafka and would reduce the number of
> >>> future wire format changes required.
> >>>
> >>> a lot of what is currently a KIP represents use cases that are not 100%
> >>> relevant to all users, and some of them require rather invasive wire
> >>> protocol changes. a thing a good recent example of this is kip-98.
> >>> tx-utilizing traffic is expected to be a very small fraction of total
> >>> traffic and yet the changes are invasive.
> >>>
> >>> every such wire

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
So, Gwen, to your question (even though I’m not a committer)...

I have always been a strong supporter of introducing the concept of an
envelope to messages, which headers accomplishes. The message key is
already an example of a piece of envelope information. By providing a means
to do this within Kafka itself, and not relying on use-case specific
implementations, you make it much easier for components to interoperate. It
simplifies development of all these things (message routing, auditing,
encryption, etc.) because each one does not have to reinvent the wheel.

It also makes it much easier from a client point of view if the headers are
defined as part of the protocol and/or message format in general because
you can easily produce and consume messages without having to take into
account specific cases. For example, I want to route messages, but client A
doesn’t support the way audit implemented headers, and client B doesn’t
support the way encryption or routing implemented headers, so now my
application has to create some really fragile (my autocorrect just tried to
make that “tragic”, which is probably appropriate too) code to strip
everything off, rather than just consuming the messages, picking out the 1
or 2 headers it’s interested in, and performing its function.

Honestly, this discussion has been going on for a long time, and it’s
always “Oh, you came up with 2 use cases, and yeah, those use cases are
real things that someone would want to do. Here’s an alternate way to
implement them so let’s not do headers.” If we have a few use cases that we
actually came up with, you can be sure that over the next year there’s a
dozen others that we didn’t think of that someone would like to do. I
really think it’s time to stop rehashing this discussion and instead focus
on a workable standard that we can adopt.

-Todd


On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino <tpal...@gmail.com> wrote:

> C. per message encryption
>> One drawback of this approach is that this significantly reduce the
>> effectiveness of compression, which happens on a set of serialized
>> messages. An alternative is to enable SSL for wire encryption and rely on
>> the storage system (e.g. LUKS) for at rest encryption.
>
>
> Jun, this is not sufficient. While this does cover the case of removing a
> drive from the system, it will not satisfy most compliance requirements for
> encryption of data as whoever has access to the broker itself still has
> access to the unencrypted data. For end-to-end encryption you need to
> encrypt at the producer, before it enters the system, and decrypt at the
> consumer, after it exits the system.
>
> -Todd
>
>
> On Thu, Dec 1, 2016 at 1:03 PM, radai <radai.rosenbl...@gmail.com> wrote:
>
>> another big plus of headers in the protocol is that it would enable rapid
>> iteration on ideas outside of core kafka and would reduce the number of
>> future wire format changes required.
>>
>> a lot of what is currently a KIP represents use cases that are not 100%
>> relevant to all users, and some of them require rather invasive wire
>> protocol changes. a thing a good recent example of this is kip-98.
>> tx-utilizing traffic is expected to be a very small fraction of total
>> traffic and yet the changes are invasive.
>>
>> every such wire format change translates into painful and slow adoption of
>> new versions.
>>
>> i think a lot of functionality currently in KIPs could be "spun out" and
>> implemented as opt-in plugins transmitting data over headers. this would
>> keep the core wire format stable(r), core codebase smaller, and avoid the
>> "burden of proof" thats sometimes required to prove a certain feature is
>> useful enough for a wide-enough audience to warrant a wire format change
>> and code complexity additions.
>>
>> (to be clear - kip-98 goes beyond "mere" wire format changes and im not
>> saying it could have been completely done with headers, but exactly-once
>> delivery certainly could)
>>
>> On Thu, Dec 1, 2016 at 11:20 AM, Gwen Shapira <g...@confluent.io> wrote:
>>
>> > On Thu, Dec 1, 2016 at 10:24 AM, radai <radai.rosenbl...@gmail.com>
>> wrote:
>> > > "For use cases within an organization, one could always use other
>> > > approaches such as company-wise containers"
>> > > this is what linkedin has traditionally done but there are now cases
>> > (read
>> > > - topics) where this is not acceptable. this makes headers useful even
>> > > within single orgs for cases where one-container-fits-all cannot
>> apply.
>> > >
>> > > as for the particular use cases listed, i dont want this to devo

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
>
> C. per message encryption
> One drawback of this approach is that this significantly reduce the
> effectiveness of compression, which happens on a set of serialized
> messages. An alternative is to enable SSL for wire encryption and rely on
> the storage system (e.g. LUKS) for at rest encryption.


Jun, this is not sufficient. While this does cover the case of removing a
drive from the system, it will not satisfy most compliance requirements for
encryption of data as whoever has access to the broker itself still has
access to the unencrypted data. For end-to-end encryption you need to
encrypt at the producer, before it enters the system, and decrypt at the
consumer, after it exits the system.

-Todd


On Thu, Dec 1, 2016 at 1:03 PM, radai  wrote:

> another big plus of headers in the protocol is that it would enable rapid
> iteration on ideas outside of core kafka and would reduce the number of
> future wire format changes required.
>
> a lot of what is currently a KIP represents use cases that are not 100%
> relevant to all users, and some of them require rather invasive wire
> protocol changes. a thing a good recent example of this is kip-98.
> tx-utilizing traffic is expected to be a very small fraction of total
> traffic and yet the changes are invasive.
>
> every such wire format change translates into painful and slow adoption of
> new versions.
>
> i think a lot of functionality currently in KIPs could be "spun out" and
> implemented as opt-in plugins transmitting data over headers. this would
> keep the core wire format stable(r), core codebase smaller, and avoid the
> "burden of proof" thats sometimes required to prove a certain feature is
> useful enough for a wide-enough audience to warrant a wire format change
> and code complexity additions.
>
> (to be clear - kip-98 goes beyond "mere" wire format changes and im not
> saying it could have been completely done with headers, but exactly-once
> delivery certainly could)
>
> On Thu, Dec 1, 2016 at 11:20 AM, Gwen Shapira  wrote:
>
> > On Thu, Dec 1, 2016 at 10:24 AM, radai 
> wrote:
> > > "For use cases within an organization, one could always use other
> > > approaches such as company-wise containers"
> > > this is what linkedin has traditionally done but there are now cases
> > (read
> > > - topics) where this is not acceptable. this makes headers useful even
> > > within single orgs for cases where one-container-fits-all cannot apply.
> > >
> > > as for the particular use cases listed, i dont want this to devolve to
> a
> > > discussion of particular use cases - i think its enough that some of
> them
> >
> > I think a main point of contention is that: We identified few
> > use-cases where headers are useful, do we want Kafka to be a system
> > that supports those use-cases?
> >
> > For example, Jun said:
> > "Not sure how widely useful record-level lineage is though since the
> > overhead could
> > be significant."
> >
> > We know NiFi supports record level lineage. I don't think it was
> > developed for lols, I think it is safe to assume that the NSA needed
> > that functionality. We also know that certain financial institutes
> > need to track tampering with records at a record level and there are
> > federal regulations that absolutely require this.  They also need to
> > prove that routing apps that "touches" the messages and either reads
> > or updates headers couldn't have possibly modified the payload itself.
> > They use record level encryption to do that - apps can read and
> > (sometimes) modify headers but can't touch the payload.
> >
> > We can totally say "those are corner cases and not worth adding
> > headers to Kafka for", they should use a different pubsub message for
> > that (Nifi or one of the other 1000 that cater specifically to the
> > financial industry).
> >
> > But this gets us into a catch 22:
> > If we discuss a specific use-case, someone can always say it isn't
> > interesting enough for Kafka. If we discuss more general trends,
> > others can say "well, we are not sure any of them really needs headers
> > specifically. This is just hand waving and not interesting.".
> >
> > I think discussing use-cases in specifics is super important to decide
> > implementation details for headers (my use-cases lean toward numerical
> > keys with namespaces and object values, others differ), but I think we
> > need to answer the general "Are we going to have headers" question
> > first.
> >
> > I'd love to hear from the other committers in the discussion:
> > What would it take to convince you that headers in Kafka are a good
> > idea in general, so we can move ahead and try to agree on the details?
> >
> > I feel like we keep moving the goal posts and this is truly exhausting.
> >
> > For the record, I mildly support adding headers to Kafka (+0.5?).
> > The community can continue to find workarounds to the issue and there
> > are some benefits to 

[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-11-27 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15700938#comment-15700938
 ] 

Todd Palino commented on KAFKA-3959:


+1 as well. I think keeping the config/server.properties file as a quickstart 
that gets you going with a standalone broker is a good plan.

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-11-15 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668010#comment-15668010
 ] 

Todd Palino commented on KAFKA-3959:


As noted, I just want the config enforced. If RF=3 is configured, that's what 
we should get. If you need RF=1 for testing, or for specific use cases, set it. 
Even make the default 1 if that's really what we want. But if I explicitly set 
RF=3, that's what I should get. And if it causes errors, and I've explicitly 
set it, that's on me as the user.

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Plans to improve SSL performance in Kafka, for 0.10.x?

2016-09-06 Thread Todd Palino
Yeah, that's why I mentioned it with a caveat :) Someone (I can't recall
who, but it was someone I consider reasonably knowledgable as I actually
gave it some weight) mentioned it, but I haven't looked into it further
than that. I agree that I don't see how this is going to help us at the app
layer.

-Todd

On Tuesday, September 6, 2016, Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Todd,
>
> Thanks for sharing your experience enabling TLS in your clusters. Very
> helpful. One comment below.
>
> On Sun, Sep 4, 2016 at 6:28 PM, Todd Palino <tpal...@gmail.com
> <javascript:;>> wrote:
> >
> > Right now, we're specifically avoiding moving consume traffic to SSL, due
> > to the zero copy send issue. Now I've been told (but I have not
> > investigated) that OpenSSL can solve this. It would probably be a good
> use
> > of time to look into that further.
> >
>
> As far as I know, OpenSSL can reduce the TLS overhead, but we will still
> lose the zero-copy optimisation. There is some attempts at making it
> possible to retain zero-copy with TLS in the kernel[1][2], but it's
> probably too early for us to consider that for Kafka.
>
> Ismael
>
> [1] https://lwn.net/Articles/666509/
> [2]
> http://techblog.netflix.com/2016/08/protecting-netflix-
> viewing-privacy-at.html
>


-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Plans to improve SSL performance in Kafka, for 0.10.x?

2016-09-04 Thread Todd Palino
We've been using SSL for produce traffic (mirror makers only right now, but
that's a very large percentage of traffic for us), and we're in the process
of turning it on for inter broker traffic as well. Our experience is that
this does not present a significant amount of overhead to the brokers.
Specifically with switching over the IBP, we were expecting a lot more of a
hit, and it really only ended up being something like a 5% increase in
system load, and no reduction in the cluster capacity, in our test cluster.
Note that this relies on the fix in KAFKA-4050 and switching the PRNG to
SHA1PRNG.

Right now, we're specifically avoiding moving consume traffic to SSL, due
to the zero copy send issue. Now I've been told (but I have not
investigated) that OpenSSL can solve this. It would probably be a good use
of time to look into that further.

That said, switching the message format to the newer option (KIP-31 I
believe?) will result in the brokers not needing to recompress message
batches that are produced. This should result in a significant reduction in
CPU usage, which may offset the cost of SSL. We haven't had a chance to
fully investigate this, however, as changing that config depends on the
clients being updated to support the new format.

-Todd

On Sunday, September 4, 2016, Jaikiran Pai <jai.forums2...@gmail.com> wrote:

> We are using 0.10.0.1 of Kafka and (Java) client libraries. We recently
> decided to start using SSL for Kafka communication between broker and
> clients. Right now, we have a pretty basic setup with just 1 broker with
> SSL keystore setup and the Java client(s) communicate using the
> Producer/Consumer APIs against this single broker. There's no client auth
> (intentionally) right now. We also have plain text enabled for the initial
> testing.
>
> What we have noticed is that the consumer/producer performance when SSL is
> enabled is noticeably poor when compared to plain text. I understand that
> there are expected to be performance impacts when SSL is enabled but the
> order of magnitude is too high and in fact it shows up in a very noticeable
> fashion in our product. I do have the numbers, but I haven't been able to
> narrow it down yet (because there's nothing that stands out, except that
> it's slow). Our application code is exactly the same between non-SSL and
> SSL usage.
>
> Furthermore, I'm aware of this specific JIRA in Kafka
> https://issues.apache.org/jira/browse/KAFKA-2561 which acknowledges a
> similar issue. So what I would like to know is, in context of Kafka 0.10.x
> releases and Java 8 support, are there any timelines that the dev team is
> looking for in terms of improving this performance issue (which I believe
> requires usage of OpenSSL or other non-JDK implementations of SSLEngine)?
> We would like to go for GA of our product in the next couple of months and
> in order to do that, we do plan to have Kafka over SSL working with
> reasonably good performance, but the current performance isn't promising.
> Expecting this to be fixed in the next couple of months and have it
> available in 0.10.x is probably too much to expect, but if we know the
> plans around this, we should be able to come up with a plan of our own for
> our product.
>
>
> -Jaikiran
>


-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: [ANNOUNCE] New Kafka PMC member Gwen Shapira

2016-08-18 Thread Todd Palino
Congratulations, Gwen! Well deserved :)

-Todd


On Thu, Aug 18, 2016 at 10:27 AM, Gwen Shapira <g...@confluent.io> wrote:

> Thanks team Kafka :) Very excited and happy to contribute and be part
> of this fantastic community.
>
>
>
> On Thu, Aug 18, 2016 at 9:52 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> > Congrats Gwen!
> >
> > On Thu, Aug 18, 2016 at 9:27 AM, Ashish Singh <asi...@cloudera.com>
> wrote:
> >
> >> Congrats Gwen!
> >>
> >> On Thursday, August 18, 2016, Grant Henke <ghe...@cloudera.com> wrote:
> >>
> >> > Congratulations Gwen!
> >> >
> >> >
> >> >
> >> > On Thu, Aug 18, 2016 at 2:36 AM, Ismael Juma <ism...@juma.me.uk
> >> > <javascript:;>> wrote:
> >> >
> >> > > Congratulations Gwen! Great news.
> >> > >
> >> > > Ismael
> >> > >
> >> > > On 18 Aug 2016 2:44 am, "Jun Rao" <j...@confluent.io <javascript:;>>
> >> > wrote:
> >> > >
> >> > > > Hi, Everyone,
> >> > > >
> >> > > > Gwen Shapira has been active in the Kafka community since she
> became
> >> a
> >> > > > Kafka committer
> >> > > > about a year ago. I am glad to announce that Gwen is now a member
> of
> >> > > Kafka
> >> > > > PMC.
> >> > > >
> >> > > > Congratulations, Gwen!
> >> > > >
> >> > > > Jun
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > Grant Henke
> >> > Software Engineer | Cloudera
> >> > gr...@cloudera.com <javascript:;> | twitter.com/gchenke |
> >> > linkedin.com/in/granthenke
> >> >
> >>
> >>
> >> --
> >> Ashish h
> >>
> >
> >
> >
> > --
> > -- Guozhang
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>



-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: [DISCUSS] KIP-73: Replication Quotas

2016-08-18 Thread Todd Palino
Yeah, I’m good with where we are right now on this KIP. It’s a workable
solution that we can add tooling to support. I would prefer to have soft
quotas, but given that it is more complex to implement, we can go with hard
quotas for the time being and consider it as an improvement later.

-Todd


On Thu, Aug 18, 2016 at 11:13 AM, Jun Rao <j...@confluent.io> wrote:

> Todd,
>
> Thanks for the detailed reply. So, it sounds like that you are ok with the
> current proposal in the KIP for now and we can brainstorm on more automated
> stuff separately? Are you comfortable with starting the vote on the current
> proposal?
>
> Jun
>
> On Thu, Aug 18, 2016 at 11:00 AM, Todd Palino <tpal...@gmail.com> wrote:
>
> > Joel just reminded me to take another look at this one :) So first off,
> > this is great. It’s something that we definitely need to have, especially
> > as we get into the realm of moving partitions around more often.
> >
> > I do prefer to have the cluster handle this automatically. What I
> envision
> > is a single configuration for “bootstrap replication quota” that is
> applied
> > when we have a replica that is in this situation. There’s 2 legitimate
> > cases that I’m aware of right now:
> > 1 - We are moving a partition to a new replica. We know about this (at
> > least the controller does), so we should be able to apply the quota
> without
> > too much trouble here
> > 2 - We have a broker that lost its disk and has to recover the partition
> > from the cluster. Harder to detect, but in this case, I’m not sure I even
> > want to throttle it because this is recovery activity.
> >
> > The problem with this becomes the question of “why”. Why are you moving a
> > partition? Are you doing it because you want to balance traffic? Or are
> you
> > doing it because you lost a piece of hardware and you need to get the RF
> > for the partition back up to the desired level? As an admin, these have
> > different priorities. I may be perfectly fine with having the replication
> > traffic saturate the cluster in the latter case, because reliability and
> > availability is more important than performance.
> >
> > Given the complexity of trying to determine intent, I’m going to agree
> with
> > implementing a manual procedure for now. We definitely need to have a
> > discussion about automating it as much as possible, but I think it’s part
> > of a larger conversation about how much automation should be built into
> the
> > broker itself, and how much should be part of a bolt-on “cluster
> manager”.
> > I’m not sure putting all that complexity into the broker is the right
> > choice.
> >
> > I do agree with Joel here that while a hard quota is typically better
> from
> > a client point of view, in the case of replication traffic a soft quota
> is
> > appropriate, and desirable. Probably a combination of both, as I think we
> > still want a hard limit that stops short of saturating the entire cluster
> > with replication traffic.
> >
> > -Todd
> >
> >
> > On Thu, Aug 18, 2016 at 10:21 AM, Joel Koshy <jjkosh...@gmail.com>
> wrote:
> >
> > > > For your first comment. We thought about determining "effect"
> replicas
> > > > automatically as well. First, there are some tricky stuff that one
> has
> > to
> > > >
> > >
> > > Auto-detection of effect traffic: i'm fairly certain it's doable but
> > > definitely tricky. I'm also not sure it is something worth tackling at
> > the
> > > outset. If we want to spend more time thinking over it even if it's
> just
> > an
> > > academic exercise I would be happy to brainstorm offline.
> > >
> > >
> > > > For your second comment, we discussed that in the client quotas
> > design. A
> > > > down side of that for client quotas is that a client may be surprised
> > > that
> > > > its traffic is not throttled at one time, but throttled as another
> with
> > > the
> > > > same quota (basically, less predicability). You can imaging setting a
> > > quota
> > > > for all replication traffic and only slow down the "effect" replicas
> if
> > > > needed. The thought is more or less the same as the above. It
> requires
> > > more
> > > >
> > >
> > > For clients, this is true. I think this is much less of an issue for
> > > server-side replication since the "users" here are the Kafka SREs who
> > > generally know these internal details.
> 

Re: [DISCUSS] KIP-73: Replication Quotas

2016-08-18 Thread Todd Palino
on
> >> > > >>>>>>>> the
> >> > > >>>>>>>>>>>> leader, but we can leave the replica threading model
> >> > > >>>> unchanged.
> >> > > >>>>>> So,
> >> > > >>>>>>>>>>>> overall, this still seems to be a simpler approach.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Thanks,
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Jun
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> On Tue, Aug 9, 2016 at 11:57 AM, Mayuresh Gharat <
> >> > > >>>>>>>>>>>> gharatmayures...@gmail.com <javascript:;>
> >> > > >>>>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Nice write up Ben.
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> I agree with Joel for keeping this simple by excluding
> >> > > >> the
> >> > > >>>>>>>> partitions
> >> > > >>>>>>>>>>>> from
> >> > > >>>>>>>>>>>>> the fetch request/response when the quota is violated
> at
> >> > > >>> the
> >> > > >>>>>>>> follower
> >> > > >>>>>>>>>>> or
> >> > > >>>>>>>>>>>>> leader instead of having a separate set of threads for
> >> > > >>>>> handling
> >> > > >>>>>>> the
> >> > > >>>>>>>>>>> quota
> >> > > >>>>>>>>>>>>> and non quota cases. Even though its different from
> the
> >> > > >>>>> current
> >> > > >>>>>>>> quota
> >> > > >>>>>>>>>>>>> implementation it should be OK since its internal to
> >> > > >>> brokers
> >> > > >>>>> and
> >> > > >>>>>>> can
> >> > > >>>>>>>>> be
> >> > > >>>>>>>>>>>>> handled by tuning the quota configs for it
> appropriately
> >> > > >>> by
> >> > > >>>>> the
> >> > > >>>>>>>>> admins.
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Also can you elaborate with an example how this would
> be
> >> > > >>>>>> handled :
> >> > > >>>>>>>>>>>>> *guaranteeing
> >> > > >>>>>>>>>>>>> ordering of updates when replicas shift threads*
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Thanks,
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Mayuresh
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> On Tue, Aug 9, 2016 at 10:49 AM, Joel Koshy <
> >> > > >>>>>> jjkosh...@gmail.com <javascript:;>>
> >> > > >>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>>
&g

Re: [DISCUSS] KIP-73: Replication Quotas

2016-08-18 Thread Todd Palino
;>>>>>>>>>>>>> for clarifying. For completeness, can we add this
> > > >> detail
> > > >>> to
> > > >>>>> the
> > > >>>>>>>> doc -
> > > >>>>>>>>>>>>> say,
> > > >>>>>>>>>>>>>> after the quote that I pasted earlier?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> From an implementation perspective though: I’m still
> > > >>>>> interested
> > > >>>>>>> in
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>> simplicity of not having to add separate replica
> > > >>> fetchers,
> > > >>>>>> delay
> > > >>>>>>>>>>> queue
> > > >>>>>>>>>>>> on
> > > >>>>>>>>>>>>>> the leader, and “move” partitions from the throttled
> > > >>>> replica
> > > >>>>>>>> fetchers
> > > >>>>>>>>>>>> to
> &g

[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-17 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425756#comment-15425756
 ] 

Todd Palino commented on KAFKA-4050:


So first off, yes, the thread dump (which [~jjkoshy] posted) shows that the 
offending line of code is "NativePRNG.java:481". I checked, and that's very 
clearly in the non-blocking NativePRNG variant that explictly uses /dev/urandom.

I had considered changing the default, [~ijuma], and I actually thought about 
adding a note to this ticket about it earlier today. Despite the fact that the 
default clearly has performance issues, I don't think we should change the 
default behavior, which is to let the JRE pick the PRNG implementation. The 
reason is that we can't be sure that on any given system, in any given JRE, 
that the new one we set explicitly will exist, and that would cause the default 
behavior to break. The SHA1PRNG implementation should exist everywhere, but I'd 
rather not take the risk. I think it's better to leave the default as is, and 
call out the issue very clearly in the documentation.

> Allow configuration of the PRNG used for SSL
> 
>
> Key: KAFKA-4050
> URL: https://issues.apache.org/jira/browse/KAFKA-4050
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>Reporter: Todd Palino
>Assignee: Todd Palino
>  Labels: security, ssl
>
> This change will make the pseudo-random number generator (PRNG) 
> implementation used by the SSLContext configurable. The configuration is not 
> required, and the default is to use whatever the default PRNG for the JDK/JRE 
> is. Providing a string, such as "SHA1PRNG", will cause that specific 
> SecureRandom implementation to get passed to the SSLContext.
> When enabling inter-broker SSL in our certification cluster, we observed 
> severe performance issues. For reference, this cluster can take up to 600 
> MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close 
> to saturation, and the mirror maker normally produces about 400 MB/sec 
> (unless it is lagging). When we enabled inter-broker SSL, we saw persistent 
> replication problems in the cluster at any inbound rate of more than about 6 
> or 7 MB/sec per-broker. This was narrowed down to all the network threads 
> blocking on a single lock in the SecureRandom code.
> It turns out that the default PRNG implementation on Linux is NativePRNG. 
> This uses randomness from /dev/urandom (which, by itself, is a non-blocking 
> read) and mixes it with randomness from SHA1. The problem is that the entire 
> application shares a single SecureRandom instance, and NativePRNG has a 
> global lock within the implNextBytes method. Switching to another 
> implementation (SHA1PRNG, which has better performance characteristics and is 
> still considered secure) completely eliminated the bottleneck and allowed the 
> cluster to work properly at saturation.
> The SSLContext initialization has an optional argument to provide a 
> SecureRandom instance, which the code currently sets to null. This change 
> creates a new config to specify an implementation, and instantiates that and 
> passes it to SSLContext if provided. This will also let someone select a 
> stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-16 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15423400#comment-15423400
 ] 

Todd Palino commented on KAFKA-4050:


It appears to be called every time something needs to be encrypted (have to get 
randomness to run the crypto routines), so yeah, every single packet being sent 
would need a call to get random bytes.

> Allow configuration of the PRNG used for SSL
> 
>
> Key: KAFKA-4050
> URL: https://issues.apache.org/jira/browse/KAFKA-4050
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>    Reporter: Todd Palino
>Assignee: Todd Palino
>  Labels: security, ssl
>
> This change will make the pseudo-random number generator (PRNG) 
> implementation used by the SSLContext configurable. The configuration is not 
> required, and the default is to use whatever the default PRNG for the JDK/JRE 
> is. Providing a string, such as "SHA1PRNG", will cause that specific 
> SecureRandom implementation to get passed to the SSLContext.
> When enabling inter-broker SSL in our certification cluster, we observed 
> severe performance issues. For reference, this cluster can take up to 600 
> MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close 
> to saturation, and the mirror maker normally produces about 400 MB/sec 
> (unless it is lagging). When we enabled inter-broker SSL, we saw persistent 
> replication problems in the cluster at any inbound rate of more than about 6 
> or 7 MB/sec per-broker. This was narrowed down to all the network threads 
> blocking on a single lock in the SecureRandom code.
> It turns out that the default PRNG implementation on Linux is NativePRNG. 
> This uses randomness from /dev/urandom (which, by itself, is a non-blocking 
> read) and mixes it with randomness from SHA1. The problem is that the entire 
> application shares a single SecureRandom instance, and NativePRNG has a 
> global lock within the implNextBytes method. Switching to another 
> implementation (SHA1PRNG, which has better performance characteristics and is 
> still considered secure) completely eliminated the bottleneck and allowed the 
> cluster to work properly at saturation.
> The SSLContext initialization has an optional argument to provide a 
> SecureRandom instance, which the code currently sets to null. This change 
> creates a new config to specify an implementation, and instantiates that and 
> passes it to SSLContext if provided. This will also let someone select a 
> stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-16 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-4050:
--

 Summary: Allow configuration of the PRNG used for SSL
 Key: KAFKA-4050
 URL: https://issues.apache.org/jira/browse/KAFKA-4050
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 0.10.0.1
Reporter: Todd Palino
Assignee: Todd Palino


This change will make the pseudo-random number generator (PRNG) implementation 
used by the SSLContext configurable. The configuration is not required, and the 
default is to use whatever the default PRNG for the JDK/JRE is. Providing a 
string, such as "SHA1PRNG", will cause that specific SecureRandom 
implementation to get passed to the SSLContext.

When enabling inter-broker SSL in our certification cluster, we observed severe 
performance issues. For reference, this cluster can take up to 600 MB/sec of 
inbound produce traffic over SSL, with RF=2, before it gets close to 
saturation, and the mirror maker normally produces about 400 MB/sec (unless it 
is lagging). When we enabled inter-broker SSL, we saw persistent replication 
problems in the cluster at any inbound rate of more than about 6 or 7 MB/sec 
per-broker. This was narrowed down to all the network threads blocking on a 
single lock in the SecureRandom code.

It turns out that the default PRNG implementation on Linux is NativePRNG. This 
uses randomness from /dev/urandom (which, by itself, is a non-blocking read) 
and mixes it with randomness from SHA1. The problem is that the entire 
application shares a single SecureRandom instance, and NativePRNG has a global 
lock within the implNextBytes method. Switching to another implementation 
(SHA1PRNG, which has better performance characteristics and is still considered 
secure) completely eliminated the bottleneck and allowed the cluster to work 
properly at saturation.

The SSLContext initialization has an optional argument to provide a 
SecureRandom instance, which the code currently sets to null. This change 
creates a new config to specify an implementation, and instantiates that and 
passes it to SSLContext if provided. This will also let someone select a 
stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-08-12 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15419365#comment-15419365
 ] 

Todd Palino commented on KAFKA-3959:


Agree with Onur 100% here. We've been running into this a lot lately, and we 
never know about it until we do a rolling restart of the cluster and consumers 
break when the topic goes offline.

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic

2016-06-10 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323930#comment-15323930
 ] 

Todd Palino commented on KAFKA-3797:


Obviously we can't do something like that with a running consumer. But we do it 
all the time with inactive consumers.

I haven't looked at the most recent changes to the offset API yet, so I'm not 
sure whether or not there are restrictions in place that would allow a client 
that is not part of the group to commit offsets for the group.

> Improve security of __consumer_offsets topic
> 
>
> Key: KAFKA-3797
> URL: https://issues.apache.org/jira/browse/KAFKA-3797
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> By default, we allow clients to override committed offsets and group metadata 
> using the Produce API as long as they have Write access to the 
> __consumer_offsets topic. From one perspective, this is fine: administrators 
> can restrict access to this topic to trusted users. From another, it seems 
> less than ideal for Write permission on that topic to subsume Group-level 
> permissions for the full cluster. With this access, a user can cause all 
> kinds of mischief including making the group "lose" data by setting offsets 
> ahead of the actual position. This is probably not obvious to administrators 
> who grant access to topics using a wildcard and it increases the risk from 
> incorrectly applying topic patterns (if we ever add support for them). It 
> seems reasonable to consider safer default behavior:
> 1. A simple option to fix this would be to prevent wildcard topic rules from 
> applying to internal topics. To write to an internal topic, you need a 
> separate rule which explicitly grants authorization to that topic.
> 2. A more extreme and perhaps safer option might be to prevent all writes to 
> this topic (and potentially other internal topics) through the Produce API. 
> Do we have any use cases which actually require writing to 
> __consumer_offsets? The only potential case that comes to mind is replication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic

2016-06-07 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318854#comment-15318854
 ] 

Todd Palino commented on KAFKA-3797:


I think the first option is more reasonable, and provides sufficient security. 
I can see cases where you might want to have an external tool for changing a 
group's committed offsets.

> Improve security of __consumer_offsets topic
> 
>
> Key: KAFKA-3797
> URL: https://issues.apache.org/jira/browse/KAFKA-3797
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>
> By default, we allow clients to override committed offsets and group metadata 
> using the Produce API as long as they have Write access to the 
> __consumer_offsets topic. From one perspective, this is fine: administrators 
> can restrict access to this topic to trusted users. From another, it seems 
> less than ideal for Write permission on that topic to subsume Group-level 
> permissions for the full cluster. With this access, a user can cause all 
> kinds of mischief including making the group "lose" data by setting offsets 
> ahead of the actual position. This is probably not obvious to administrators 
> who grant access to topics using a wildcard and it increases the risk from 
> incorrectly applying topic patterns (if we ever add support for them). It 
> seems reasonable to consider safer default behavior:
> 1. A simple option to fix this would be to prevent wildcard topic rules from 
> applying to internal topics. To write to an internal topic, you need a 
> separate rule which explicitly grants authorization to that topic.
> 2. A more extreme and perhaps safer option might be to prevent all writes to 
> this topic (and potentially other internal topics) through the Produce API. 
> Do we have any use cases which actually require writing to 
> __consumer_offsets? The only potential case that comes to mind is replication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3725) Update documentation with regards to XFS

2016-06-02 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312288#comment-15312288
 ] 

Todd Palino commented on KAFKA-3725:


I'll take a look at this and put together a PR with some updates.

> Update documentation with regards to XFS
> 
>
> Key: KAFKA-3725
> URL: https://issues.apache.org/jira/browse/KAFKA-3725
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>    Assignee: Todd Palino
>
> Our documentation currently states that only Ext4 has been tried (by 
> LinkedIn):
> "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS 
> supposedly handle locking during fsync better. We have only tried Ext4, 
> though."
> http://kafka.apache.org/documentation.html#ext4
> I think this is no longer true, so we should update the documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3725) Update documentation with regards to XFS

2016-06-02 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino reassigned KAFKA-3725:
--

Assignee: Todd Palino

> Update documentation with regards to XFS
> 
>
> Key: KAFKA-3725
> URL: https://issues.apache.org/jira/browse/KAFKA-3725
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>    Assignee: Todd Palino
>
> Our documentation currently states that only Ext4 has been tried (by 
> LinkedIn):
> "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS 
> supposedly handle locking during fsync better. We have only tried Ext4, 
> though."
> http://kafka.apache.org/documentation.html#ext4
> I think this is no longer true, so we should update the documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: reading the consumer offsets topic

2016-05-09 Thread Todd Palino
The GroupMetadataManager one should be working for you with 0.9. I don’t
have a 0.9 KCC set up at the moment, so I’m using an 0.8 version where it’s
different (it’s the other class for me). The only thing I can offer now is
did you put quotes around the arg to --formatter so you don’t get weird
shell interference?

-Todd


On Mon, May 9, 2016 at 8:18 AM, Cliff Rhyne <crh...@signal.co> wrote:

> Thanks, Todd.  It's still not working unfortunately.
>
> This results in nothing getting printed to the console and requires kill -9
> in another window to stop (ctrl-c doesn't work):
>
> /kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper
>  --topic __consumer_offsets --formatter
> kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter
>
> This results in a stack trace because it can't find the class:
>
> ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper
>  --topic __consumer_offsets --formatter
> kafka.server.OffsetManager\$OffsetsMessageFormatter
>
> Exception in thread "main" java.lang.ClassNotFoundException:
> kafka.server.OffsetManager$OffsetsMessageFormatter
>
>
> I'm on 0.9.0.1. "broker-list" is invalid and zookeeper is required
> regardless of the bootstrap-server parameter.
>
>
> Thanks,
>
> Cliff
>
> On Sun, May 8, 2016 at 7:35 PM, Todd Palino <tpal...@gmail.com> wrote:
>
> > It looks like you’re just missing the proper message formatter. Of
> course,
> > that largely depends on your version of the broker. Try:
> >
> > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic
> > __consumer_offsets
> > --formatter
> kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter
> >
> >
> > If for some reason that doesn’t work, you can try
> > "kafka.server.OffsetManager\$OffsetsMessageFormatter” instead.
> >
> > -Todd
> >
> >
> >
> >
> > On Sun, May 8, 2016 at 1:26 PM, Cliff Rhyne <crh...@signal.co> wrote:
> >
> > > I'm having difficulty reading the consumer offsets topic from the
> command
> > > line.  I try the following but it doesn't seem to work (along with a
> few
> > > related variants including specifying the zookeeper hosts):
> > >
> > > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic
> > > __consumer_offsets
> > >
> > > Is there a special way to read the consumer offsets topic?
> > >
> > > Thanks,
> > > Cliff
> > >
> > > --
> > > Cliff Rhyne
> > > Software Engineering Manager
> > > e: crh...@signal.co
> > > signal.co
> > > 
> > >
> > > Cut Through the Noise
> > >
> > > This e-mail and any files transmitted with it are for the sole use of
> the
> > > intended recipient(s) and may contain confidential and privileged
> > > information. Any unauthorized use of this email is strictly prohibited.
> > > ©2016 Signal. All rights reserved.
> > >
> >
> >
> >
> > --
> > *—-*
> > *Todd Palino*
> > Staff Site Reliability Engineer
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
> >
>
>
>
> --
> Cliff Rhyne
> Software Engineering Manager
> e: crh...@signal.co
> signal.co
> 
>
> Cut Through the Noise
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized use of this email is strictly prohibited.
> ©2016 Signal. All rights reserved.
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: reading the consumer offsets topic

2016-05-08 Thread Todd Palino
It looks like you’re just missing the proper message formatter. Of course,
that largely depends on your version of the broker. Try:

./kafka-console-consumer.sh --broker-list localhost:9092 --topic
__consumer_offsets
--formatter kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter


If for some reason that doesn’t work, you can try
"kafka.server.OffsetManager\$OffsetsMessageFormatter” instead.

-Todd




On Sun, May 8, 2016 at 1:26 PM, Cliff Rhyne <crh...@signal.co> wrote:

> I'm having difficulty reading the consumer offsets topic from the command
> line.  I try the following but it doesn't seem to work (along with a few
> related variants including specifying the zookeeper hosts):
>
> ./kafka-console-consumer.sh --broker-list localhost:9092 --topic
> __consumer_offsets
>
> Is there a special way to read the consumer offsets topic?
>
> Thanks,
> Cliff
>
> --
> Cliff Rhyne
> Software Engineering Manager
> e: crh...@signal.co
> signal.co
> 
>
> Cut Through the Noise
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized use of this email is strictly prohibited.
> ©2016 Signal. All rights reserved.
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


[jira] [Commented] (KAFKA-3174) Re-evaluate the CRC32 class performance.

2016-02-01 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15126292#comment-15126292
 ] 

Todd Palino commented on KAFKA-3174:


Yeah, definitely no problems with Java 1.8. We've been running 1.8 u5 for quite 
some time, and we're in the process of updating to u40. It's worth noting that 
we have been running into a number of SEGVs with mirror maker (but not the 
broker) under u5, but the problem is supposedly fixed in u40.

> Re-evaluate the CRC32 class performance.
> 
>
> Key: KAFKA-3174
> URL: https://issues.apache.org/jira/browse/KAFKA-3174
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.9.0.1
>
>
> We used org.apache.kafka.common.utils.CRC32 in clients because it has better 
> performance than java.util.zip.CRC32 in Java 1.6.
> In a recent test I ran it looks in Java 1.8 the CRC32 class is 2x as fast as 
> the Crc32 class we are using now. We may want to re-evaluate the performance 
> of Crc32 class and see it makes sense to simply use java CRC32 instead.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-28 Thread Todd Palino
 > returned from the interceptor.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Please let me know if you have any suggestions or objections to
> the
> > > > > above.
> > > > > >
> > > > > > Thanks,
> > > > > > Anna
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <a...@confluent.io
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Mayuresh,
> > > > > > >
> > > > > > > I see why you would want to check for messages left in the
> > > > > > > RecordAccumulator. However, I don't think this will completely
> > > solve
> > > > > the
> > > > > > > problem. Messages could be in-flight somewhere else, like in
> the
> > > > > socket,
> > > > > > or
> > > > > > > there maybe in-flight messages on the consumer side of the
> > > > MirrorMaker.
> > > > > > So,
> > > > > > > if we go the route of checking whether there are any in-flight
> > > > messages
> > > > > > for
> > > > > > > topic deletion use-case, maybe it is better count them with
> > > onSend()
> > > > > and
> > > > > > > onAcknowledge() -- whether all messages sent were
> acknowledged. I
> > > > also
> > > > > > > think that it would be better to solve this without
> interceptors,
> > > > such
> > > > > as
> > > > > > > fix error handling in this scenario. However, I do not have any
> > > good
> > > > > > > proposal right now, so these are just general thoughts.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Anna
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
> > > > > > > gharatmayures...@gmail.com> wrote:
> > > > > > >
> > > > > > >> Calling producer.flush(), flushes all the data. So this is OK.
> > But
> > > > > when
> > > > > > >> you
> > > > > > >> are running Mirror maker, I am not sure there is a way to
> > flush()
> > > > from
> > > > > > >> outside.
> > > > > > >>
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >> Mayuresh
> > > > > > >>
> > > > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin <
> > > becket@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Mayuresh,
> > > > > > >> >
> > > > > > >> > Regarding your use case about mirror maker. Is it good
> enough
> > as
> > > > > long
> > > > > > >> as we
> > > > > > >> > know there is no message for the topic in the producer
> > anymore?
> > > If
> > > > > > that
> > > > > > >> is
> > > > > > >> > the case, call producer.flush() is sufficient.
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> >
> > > > > > >> > Jiangjie (Becket) Qin
> > > > > > >> >
> > > > > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
> > > > > > >> > gharatmayures...@gmail.com
> > > > > > >> > > wrote:
> > > > > > >> >
> > > > > > >> > > Hi Anna,
> > > > > > >> > >
> > > > > > >> > > Thanks a lot for summarizing the discussion on this kip.
> > > > > > >> > >
> > > > > > >> > > It LGTM.
> > > > > > >> > > This is really nice :
> > > > > > >> > > We decided not to add any callbacks to producer and
> consumer
> > > > > > >> > > interceptors t

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Todd Palino
gt; > > > and we won't need a console auditor.
> > > >
> > > > One potential issue in this approach and any elaborate on-arrival
> > > > processing for that matter is that you may need to deserialize the
> > > message
> > > > as well which can drive up produce request handling times. However
> I'm
> > > not
> > > > terribly concerned about that especially if the audit header can be
> > > > separated out easily or even deserialized partially as this Avro
> thread
> > > > touches on
> > > >
> > > >
> > >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
> > > > gharatmayures...@gmail.com> wrote:
> > > >
> > > > > Nice KIP. Excellent idea.
> > > > > Was just thinking if we can add onDequed() to the
> ProducerIterceptor
> > > > > interface. Since we have the onEnqueued(), it will help the client
> or
> > > the
> > > > > tools to know how much time the message spent in the
> > RecordAccumulator.
> > > > > Also an API to check if there are any messages left for a
> particular
> > > > topic
> > > > > in the RecordAccumulator would help.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino <tpal...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Great idea. I’ve been talking about this for 2 years, and I’m
> glad
> > > > > someone
> > > > > > is finally picking it up. Will take a look at the KIP at some
> point
> > > > > > shortly.
> > > > > >
> > > > > > -Todd
> > > > > >
> > > > > >
> > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps <j...@confluent.io>
> > > wrote:
> > > > > >
> > > > > > > Hey Becket,
> > > > > > >
> > > > > > > Yeah this is really similar to the callback. The difference is
> > > really
> > > > > in
> > > > > > > who sets the behavior. The idea of the interceptor is that it
> > > doesn't
> > > > > > > require any code change in apps so you can globally add
> behavior
> > to
> > > > > your
> > > > > > > Kafka usage without changing app code. Whereas the callback is
> > > added
> > > > by
> > > > > > the
> > > > > > > app. The idea is to kind of obviate the need for the wrapper
> code
> > > > that
> > > > > > e.g.
> > > > > > > LinkedIn maintains to hold this kind of stuff.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin <
> > becket@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > This could be a useful feature. And I think there are some
> use
> > > > cases
> > > > > to
> > > > > > > > mutate the data like rejected alternative one mentioned.
> > > > > > > >
> > > > > > > > I am wondering if there is functional overlapping between
> > > > > > > > ProducerInterceptor.onAcknowledgement() and the producer
> > > callback?
> > > > I
> > > > > > can
> > > > > > > > see that the Callback could be a per record setting while
> > > > > > > > onAcknowledgement() is a producer level setting. Other than
> > that,
> > > > is
> > > > > > > there
> > > > > > > > any difference between them?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede <
> > > n...@confluent.io>
> > > > > > > wrote:
> > > > > >

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Todd Palino
,
> >
> > Mayuresh
> >
> > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner <a...@confluent.io>
> wrote:
> >
> > > Thanks Ismael and Todd for your feedback!
> > >
> > > I agree about coming up with lean, but useful interfaces that will be
> > easy
> > > to extend later.
> > >
> > > When we discuss the minimal set of producer and consumer interceptor
> API
> > in
> > > today’s KIP meeting (discussion item #2 in my previous email), lets
> > compare
> > > two options:
> > >
> > > *1. Minimal set of immutable API for producer and consumer
> interceptors*
> > >
> > > ProducerInterceptor:
> > > public void onSend(ProducerRecord<K, V> record);
> > > public void onAcknowledgement(RecordMetadata metadata, Exception
> > > exception);
> > >
> > > ConsumerInterceptor:
> > > public void onConsume(ConsumerRecords<K, V> records);
> > > public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
> > >
> > > Use-cases:
> > > — end-to-end monitoring; custom tracing and logging
> > >
> > >
> > > *2. Minimal set of mutable API for producer and consumer interceptors*
> > >
> > > ProducerInterceptor:
> > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
> > > void onAcknowledgement(RecordMetadata metadata, Exception exception);
> > >
> > > ConsumerInterceptor:
> > > void onConsume(ConsumerRecords<K, V> records);
> > > void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
> > >
> > > Additional use-cases to #1:
> > > — Ability to add metadata to a message or fill in standard fields for
> > audit
> > > and routing.
> > >
> > > Implications
> > > — Partition assignment will be done based on modified key/value instead
> > of
> > > original key/value. If key/value transformation is not consistent (same
> > key
> > > and value does not mutate to the same, but modified, key/value), then
> log
> > > compaction would not work. However, audit and routing use-cases from
> Todd
> > > will likely do consistent transformation.
> > >
> > >
> > > *Additional callbacks (discussion item #3 in my previous email):*
> > >
> > > If we want to support encryption, we would want to be able to modify
> > > serialized key/value, rather than key and value objects. This will add
> > the
> > > following API to producer and consumer interceptors:
> > >
> > > ProducerInterceptor:
> > > SerializedKeyValue onEnqueued(TopicPartition tp, ProducerRecord<K, V>
> > > record, SerializedKeyValue serializedKeyValue);
> > >
> > > ConsumerInterceptor:
> > > SerializedKeyValue onReceive(TopicPartition tp, SerializedKeyValue
> > > serializedKeyValue);
> > >
> > >
> > > I am leaning towards implementing the minimal set of immutable or
> mutable
> > > interfaces, making sure that we have a compatibility plan that allows
> us
> > to
> > > add more callbacks in the future (per Ismael comment), and add more
> APIs
> > > later. E.g., for encryption use-case, there could be an argument in
> doing
> > > encryption after message compression vs. per-record encryption that
> could
> > > be done using the above additional API. There is also more implications
> > for
> > > every API that modifies records: modifying serialized key/value will
> > again
> > > impact partition assignment (we will likely do that after partition
> > > assignment), which may impact log compaction and mirror maker
> > partitioning.
> > >
> > >
> > > Thanks,
> > > Anna
> > >
> > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino <tpal...@gmail.com>
> wrote:
> > >
> > > > Finally got a chance to take a look at this. I won’t be able to make
> > the
> > > > KIP meeting due to a conflict.
> > > >
> > > > I’m somewhat disappointed in this proposal. I think that the explicit
> > > > exclusion of modification of the messages is short-sighted, and not
> > > > accounting for it now is going to bite us later. Jay, aren’t you the
> > one
> > > > railing against public interfaces and how difficult they are to work
> > with
> > > > when you don’t get them right? The “simple” change to one of these
> > > >

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Todd Palino
Great idea. I’ve been talking about this for 2 years, and I’m glad someone
is finally picking it up. Will take a look at the KIP at some point shortly.

-Todd


On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Becket,
>
> Yeah this is really similar to the callback. The difference is really in
> who sets the behavior. The idea of the interceptor is that it doesn't
> require any code change in apps so you can globally add behavior to your
> Kafka usage without changing app code. Whereas the callback is added by the
> app. The idea is to kind of obviate the need for the wrapper code that e.g.
> LinkedIn maintains to hold this kind of stuff.
>
> -Jay
>
> On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin <becket@gmail.com> wrote:
>
> > This could be a useful feature. And I think there are some use cases to
> > mutate the data like rejected alternative one mentioned.
> >
> > I am wondering if there is functional overlapping between
> > ProducerInterceptor.onAcknowledgement() and the producer callback? I can
> > see that the Callback could be a per record setting while
> > onAcknowledgement() is a producer level setting. Other than that, is
> there
> > any difference between them?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede <n...@confluent.io>
> wrote:
> >
> > > James,
> > >
> > > That is one of the many monitoring use cases for the interceptor
> > interface.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng <jch...@tivo.com> wrote:
> > >
> > > > Anna,
> > > >
> > > > I'm trying to understand a concrete use case. It sounds like producer
> > > > interceptors could be used to implement part of LinkedIn's Kafak
> Audit
> > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale
> > > >
> > > > Part of that is done by a wrapper library around the kafka producer
> > that
> > > > keeps a count of the number of messages produced, and then sends that
> > > count
> > > > to a side-topic. It sounds like the producer interceptors could
> > possibly
> > > be
> > > > used to implement that?
> > > >
> > > > -James
> > > >
> > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner <a...@confluent.io>
> wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > I just created a KIP-42 for adding producer and consumer
> interceptors
> > > for
> > > > > intercepting messages at different points on producer and consumer.
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > > >
> > > > > Comments and suggestions are welcome!
> > > > >
> > > > > Thanks,
> > > > > Anna
> > > >
> > > >
> > > > 
> > > >
> > > > This email and any attachments may contain confidential and
> privileged
> > > > material for the sole use of the intended recipient. Any review,
> > copying,
> > > > or distribution of this email (or any attachments) by others is
> > > prohibited.
> > > > If you are not the intended recipient, please contact the sender
> > > > immediately and permanently delete this email and any attachments. No
> > > > employee or agent of TiVo Inc. is authorized to conclude any binding
> > > > agreement on behalf of TiVo Inc. by email. Binding agreements with
> TiVo
> > > > Inc. may only be made by a signed written agreement.
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


[jira] [Commented] (KAFKA-3015) Improve JBOD data balancing

2015-12-19 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15065636#comment-15065636
 ] 

Todd Palino commented on KAFKA-3015:


[~jkreps] So yes, I'm essentially saying that I would prefer to see 
optimizations to the current partitioning scheme, and the addition of being 
able to handle single disk failures without terminating the entire broker. I 
would argue that this would have a higher payoff for people who do not have the 
ability to easily swap in new machines (as we do, or those in AWS would), 
because it will allow for more granular failures. Disks tend to fail more than 
any other component, so the ability to survive a disk failure should be 
attractive to anyone.

As noted, there are a lot of benefits of using JBOD, and I would not argue 
against having good support for it. Especially as we've been moving to new 
hardware, we now can't take advantage of all the network we have with 10gig 
interfaces primarily due to limitations on disk capacity and throughput. 
Additionally, we'd like to move to RF=3, but can't stomach it because of the 
cost. If we drop the RAID 10 we will significantly increase throughput and 
double our storage capacity. Then we can easily move to RF=3 (using 50% more 
disk) with little impact.

> Improve JBOD data balancing
> ---
>
> Key: KAFKA-3015
> URL: https://issues.apache.org/jira/browse/KAFKA-3015
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>
> When running with multiple data directories (i.e. JBOD) we currently place 
> partitions entirely within one data directory. This tends to lead to poor 
> balancing across disks as some topics have more throughput/retention and not 
> all disks get data from all topics. You can't fix this problem with smarter 
> partition placement strategies because ultimately you don't know when a 
> partition is created when or how heavily it will be used (this is a subtle 
> point, and the tendency is to try to think of some more sophisticated way to 
> place partitions based on current data size but this is actually 
> exceptionally dangerous and can lead to much worse imbalance when creating 
> many partitions at once as they would all go to the disk with the least 
> data). We don't support online rebalancing across directories/disks so this 
> imbalance is a big problem and limits the usefulness of this configuration. 
> Implementing online rebalancing of data across disks without downtime is 
> actually quite hard and requires lots of I/O since you have to actually 
> rewrite full partitions of data.
> An alternative would be to place each partition in *all* directories/drives 
> and round-robin *segments* within the partition across the directories. So 
> the layout would be something like:
>   drive-a/mytopic-0/
>   000.data
>   000.index
>   0024680.data
>   0024680.index
>   drive-a/mytopic-0/
>   0012345.data
>   0012345.index
>   0036912.data
>   0036912.index
> This is a little harder to implement than the current approach but not very 
> hard, and it is a lot easier than implementing online data balancing across 
> disks while retaining the current approach. I think this could easily be done 
> in a backwards compatible way.
> I think the balancing you would get from this in most cases would be good 
> enough to make JBOD the default configuration. Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3015) Improve JBOD data balancing

2015-12-18 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15065239#comment-15065239
 ] 

Todd Palino commented on KAFKA-3015:


While this seems good on the surface, it makes it impossible to continue 
running the broker on a single disk failure. This is one of our primary 
complaints about JBOD, and one of the main reasons we cannot use it (as much as 
we would like to).

> Improve JBOD data balancing
> ---
>
> Key: KAFKA-3015
> URL: https://issues.apache.org/jira/browse/KAFKA-3015
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>
> When running with multiple data directories (i.e. JBOD) we currently place 
> partitions entirely within one data directory. This tends to lead to poor 
> balancing across disks as some topics have more throughput/retention and not 
> all disks get data from all topics. You can't fix this problem with smarter 
> partition placement strategies because ultimately you don't know when a 
> partition is created when or how heavily it will be used (this is a subtle 
> point, and the tendency is to try to think of some more sophisticated way to 
> place partitions based on current data size but this is actually 
> exceptionally dangerous and can lead to much worse imbalance when creating 
> many partitions at once as they would all go to the disk with the least 
> data). We don't support online rebalancing across directories/disks so this 
> imbalance is a big problem and limits the usefulness of this configuration. 
> Implementing online rebalancing of data across disks without downtime is 
> actually quite hard and requires lots of I/O since you have to actually 
> rewrite full partitions of data.
> An alternative would be to place each partition in *all* directories/drives 
> and round-robin *segments* within the partition across the directories. So 
> the layout would be something like:
>   drive-a/mytopic-0/
>   000.data
>   000.index
>   0024680.data
>   0024680.index
>   drive-a/mytopic-0/
>   0012345.data
>   0012345.index
>   0036912.data
>   0036912.index
> This is a little harder to implement than the current approach but not very 
> hard, and it is a lot easier than implementing online data balancing across 
> disks while retaining the current approach. I think this could easily be done 
> in a backwards compatible way.
> I think the balancing you would get from this in most cases would be good 
> enough to make JBOD the default configuration. Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-30 Allow for brokers to have plug-able consensus and meta data storage sub systems

2015-12-03 Thread Todd Palino
more pain. One of the things that
> > > worked with ZK was to have a community and a group of people
> responsible
> > > for thinking about it and maintaining it.
> > >
> > > >
> > > > I don't have a proposal for how this would work and it's some effort
> to
> > > > scope it out. The obvious thing to do would just be to keep the
> > existing
> > > > ISR/Controller setup and rebuild the controller etc on a RAFT/Paxos
> > impl
> > > > using the Kafka network/log/etc and have a replicated config database
> > > > (maybe rocksdb) that was fed off the log and shared by all nodes.
> > > >
> > >
> > > In principle, you need mechanisms like watches, ephemerals, and just
> pure
> > > storage. Just implement a replicated state machine with those
> operations.
> > > Watches and ephemerals make it a bit hard to do using a REST API, but
> it
> > is
> > > fine if you don't care about doing long polls. Perhaps this is too much
> > > detail for the status of this discussion.
> > >
> > >
> > > > If done well this could have the advantage of potentially allowing us
> > to
> > > > scale the number of partitions quite significantly (the k/v store
> would
> > > not
> > > > need to be all in memory), though you would likely still have limits
> on
> > > the
> > > > number of partitions per machine. This would make the minimum Kafka
> > > cluster
> > > > size be just your replication factor.
> > > >
> > >
> > > The idea of not having it in memory to scale isn't a bad one, specially
> > if
> > > you SSDs to cache data and such.
> > >
> > >
> > > > People tend to feel that implementing things like RAFT or Paxos is
> too
> > > hard
> > > > for mere mortals. But I actually think it is within our capabilities,
> > and
> > > > our testing capabilities as well as experience with this type of
> thing
> > > have
> > > > improved to the point where we should not be scared off if it is the
> > > right
> > > > path.
> > >
> > > It is hard, but certainly not impossible. I have implemented quite a
> few
> > > Paxos prototypes over the years for experiments, and getting off the
> > ground
> > > isn't hard, but getting an implementation that really works and that it
> > > maintainable is difficult. I've seen more than one instance where
> people
> > > got it wrong and caused pain. But again, maybe times have changed and
> > that
> > > won't happen here.
> > >
> > > -Flavio
> > >
> > > >
> > > > This approach is likely more work then plugins (though maybe not,
> once
> > > you
> > > > factor in all the docs, testing, etc) but if done correctly it would
> be
> > > an
> > > > unambiguous step forward--a simpler, more scalable implementation
> with
> > no
> > > > operational dependencies.
> > > >
> > > > Thoughts?
> > > >
> > > > -Jay
> > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Dec 1, 2015 at 11:12 AM, Joe Stein <joe.st...@stealth.ly>
> > wrote:
> > > >
> > > >> I would like to start a discussion around the work that has started
> in
> > > >> regards to KIP-30
> > > >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-30+-+Allow+for+brokers+to+have+plug-able+consensus+and+meta+data+storage+sub+systems
> > > >>
> > > >> The impetus for working on this came a lot from the community. For
> the
> > > last
> > > >> year(~+) it has been the most asked question at any talk I have
> given
> > > >> (personally speaking). It has come up a bit also on the mailing list
> > > >> talking about zkclient vs currator. A lot of folks want to use Kafka
> > but
> > > >> introducing dependencies are hard for the enterprise so the goals
> > behind
> > > >> this is making it so that using Kafka can be done as easy as
> possible
> > > for
> > > >> the operations teams to-do when they do. If they are already
> > supporting
> > > >> ZooKeeper they can keep doing that but if not they want (users) to
> use
> > > >> something else they are already supporting that can plug-in to-do
> the
> > > same
> > > >> things.
> > > >>
> > > >> For the core project I think we should leave in upstream what we
> have.
> > > This
> > > >> gives a great baseline regression for folks and makes the work for
> > > "making
> > > >> what we have plug-able work" a good defined task (carve out, layer
> in
> > > API
> > > >> impl, push back tests pass). From there then when folks want their
> > > >> implementation to be something besides ZooKeeper they can develop,
> > test
> > > and
> > > >> support that if they choose.
> > > >>
> > > >> We would like to suggest that we have the plugin interface be Java
> > based
> > > >> for minimizing depends for JVM impl. This could be in another
> > directory
> > > >> something TBD /.
> > > >>
> > > >> If you have a server you want to try to get it working but you
> aren't
> > on
> > > >> the JVM don't be afraid just think about a REST impl and if you can
> > work
> > > >> inside of that you have some light RPC layers (this was the first
> pass
> > > >> prototype we did to flush-out the public api presented on the KIP).
> > > >>
> > > >> There are a lot of parts to working on this and the more
> > > implementations we
> > > >> have the better we can flush out the public interface. I will leave
> > the
> > > >> technical details and design to JIRA tickets that are linked through
> > the
> > > >> confluence page as these decisions come about and code starts for
> > > reviews
> > > >> and we can target the specific modules having the context separate
> is
> > > >> helpful especially if multiple folks are working on it.
> > > >> https://issues.apache.org/jira/browse/KAFKA-2916
> > > >>
> > > >> Do other folks want to build implementations? Maybe we should start
> a
> > > >> confluence page for those or use an existing one and add to it so we
> > can
> > > >> coordinate some there to.
> > > >>
> > > >> Thanks!
> > > >>
> > > >> ~ Joe Stein
> > > >> - - - - - - - - - - - - - - - - - - -
> > > >> [image: Logo-Black.jpg]
> > > >>  http://www.elodina.net
> > > >>http://www.stealth.ly
> > > >> - - - - - - - - - - - - - - - - - - -
> > > >>
> > >
> > >
> >
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: [DISCUSS] KIP-30 Allow for brokers to have plug-able consensus and meta data storage sub systems

2015-12-03 Thread Todd Palino
Come on, Jay. Anyone can get up in the morning and run if they have the
willpower :)

Granted I do have some bias here, since we have tooling in place that makes
deployments and monitoring easier. But even at that, I would not say
Zookeeper is difficult to run or monitor. I’m not denying that there are
complaints, but my experience has always been that complaints of that type
are either related more to the specific way the dependency is implemented
(that is, Kafka’s not using it correctly or is otherwise generating errors
that say “Zookeeper” in them), or it’s related to a bias against the
dependency (I don’t like Zookeeper, I have XYZ installed, etc.).

The point that for a small installation Zookeeper can represent a large
footprint is well made. I wonder how many of these people are being
ill-served by recommendations from people like me that you should not run
Kafka and Zookeeper on the same systems. Sure, we’d never do that at
LinkedIn just because we are looking for high performance and a few more
systems isn’t a big deal. But for a lower performance environment, it’s
really not a problem to colocate the applications.

As far as the controller goes, I’m perfectly willing to accept that my
desire to get rid of it is from my bias against it because of how many
problems we’ve run into with that code. We can probably both agree that the
controller code needs an overhaul regardless. It’s stood up well, but as
the clusters get larger it’s definitely shows cracks.

-Todd


On Thu, Dec 3, 2015 at 11:37 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Todd,
>
> I actually agree on both counts.
>
> I would summarize the first comment as "Zookeeper is not hard to
> operationalize if you are Todd Palino"--also in that category of
> things that are not hard: running 13 miles at 5:00 am. Basically I
> totally agree that ZK is now a solved problem at LinkedIn. :-)
>
> Empirically, though, it is really hard for a lot of our users. It is
> one of the largest sources of problems we see in people's clusters. We
> could perhaps get part of the way by improving our zk usage and
> documentation, and it is certainly the case that we could potentially
> make things worse in trying to make them better, but I don't think
> that is the same as saying there isn't a problem.
>
> I totally agree with your second comment. In some sense what I was
> sketching out is just replacing ZK. But part of the design of Kafka
> was because we already had ZK. So there might be a way to further
> rationalize the metadata log and the data logs if you kind of went
> back to first principles and thought about it. I don't have any idea
> how, but I share that intuition.
>
> I do think having the controller, though, is quite useful. I think
> this pattern of avoiding many rounds of consensus by just doing one
> round to pick a leader is a good one. If you think about it Paxos =>
> Multi-paxos is basically optimizing by lumping together consensus
> rounds on a per message basis into a leader which then handles many
> messages, and what Kafka does is kind of like Multi-multi-paxos in
> that it lumps together many leadership elections into one central
> controller election which then picks all the leaders. In some ways
> having central decision makers seems inelegant (aren't we supposed to
> be distributed?) but it does allow you to be both very very fast in
> making lots of decisions (vs doing thousands of independent leadership
> elections) and also to do things that require global knowledge (like
> balancing leadership).
>
> Cheers,
>
> -Jay
>
>
>
> On Thu, Dec 3, 2015 at 10:05 AM, Todd Palino <tpal...@gmail.com> wrote:
> > This kind of discussion always puts me in mind of stories that start “So
> I
> > wrote my own encryption. How hard can it be?” :)
> >
> > Joking aside, I do have a few thoughts on this. First I have to echo
> Joel’s
> > perspective on Zookeeper. Honestly, it is one of the few applications we
> > can forget about, so I have a hard time understanding pain around running
> > it. You set it up, and unless you have a hardware failure to deal with,
> > that’s it. Yes, there are ways to abusively use it, just like with any
> > application, but Kafka is definitely not one of those use cases. I also
> > disagree that it’s hard to share the ZK cluster safely. We do it all the
> > time. We share most often with other Kafka clusters, but we also share
> with
> > other applications. This goes back to the “abusive use pattern”. Yes, we
> > don’t share with them. Nobody should. Yes, it requires monitoring and you
> > have to configure it correctly. But configuration is a one-time charge
> > (both in terms of capital expenses and knowledge acquisition) and as
> > applications 

[jira] [Commented] (KAFKA-2759) Mirror maker can leave gaps of missing messages if the process dies after a partition is reset and before the first offset commit

2015-11-06 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14994590#comment-14994590
 ] 

Todd Palino commented on KAFKA-2759:


The thought we had internally on the situation in KAFKA-1006 was that we should 
differentiate the behavior between what happens if a consumer has no offsets at 
all for a partition and what happens if the consumer thinks it has offsets, but 
is out of range. These are two different situations. In the case of mirror 
maker, we would want "earliest" behavior for the first situation (whether 
default or configured, doesn't matter), as we never want to lose data from new 
topics or partitions. For the second situation we want "closest". That is, if 
your out of range offset is closer to the earliest offset, start there. 
Otherwise, if your out of range offset is closer to the latest offset, start 
there. There's a little more detail involved on this, as there are intricacies 
with the fetch response. [~jjkoshy] has more information on that discussion.


> Mirror maker can leave gaps of missing messages if the process dies after a 
> partition is reset and before the first offset commit
> -
>
> Key: KAFKA-2759
> URL: https://issues.apache.org/jira/browse/KAFKA-2759
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.2
>Reporter: Ewen Cheslack-Postava
>Priority: Minor
>
> Based on investigation of KAFKA-2747. When mirror maker first starts or if it 
> picks up new topics/partitions, it will use the reset policy to choose where 
> to start. By default this uses 'latest'. If it starts reading messages and 
> then dies before committing offsets for the first time, then the mirror maker 
> that takes over that partition will also reset. This can result in some 
> messages making it to the consumer, then a gap that were skipped, and then 
> messages that get processed by the new MM process.
> One solution to this problem would be to make sure that offsets are committed 
> after they are reset but before the first message is passed to the producer. 
> In other words, in the case of a reset, MM should record where it's going to 
> start reading data from before processing any messages. This guarantees all 
> messages after the first one delivered by MM will appear at least once.
> This is minor since it should be very rare, but it does break an assumption 
> that people probably make about the output -- once you start receiving data, 
> you aren't guaranteed all subsequent messages will appear at least once.
> This same issue could affect Copycat as well. In fact, it may be generally 
> useful to allow consumers to know when the offset was reset so they can 
> handle cases like this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2235) LogCleaner offset map overflow

2015-10-23 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14971806#comment-14971806
 ] 

Todd Palino commented on KAFKA-2235:


I don't think we can. I have already increased it from 512MB to 1GB, and we 
still hit the same problems. That only provides a 2x increase in the size of 
the map, and I would need almost a 10x increase to solve the problem.

> LogCleaner offset map overflow
> --
>
> Key: KAFKA-2235
> URL: https://issues.apache.org/jira/browse/KAFKA-2235
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 0.8.1, 0.8.2.0
>Reporter: Ivan Simoneko
>Assignee: Ivan Simoneko
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2235_v1.patch, KAFKA-2235_v2.patch
>
>
> We've seen log cleaning generating an error for a topic with lots of small 
> messages. It seems that cleanup map overflow is possible if a log segment 
> contains more unique keys than empty slots in offsetMap. Check for baseOffset 
> and map utilization before processing segment seems to be not enough because 
> it doesn't take into account segment size (number of unique messages in the 
> segment).
> I suggest to estimate upper bound of keys in a segment as a number of 
> messages in the segment and compare it with the number of available slots in 
> the map (keeping in mind desired load factor). It should work in cases where 
> an empty map is capable to hold all the keys for a single segment. If even a 
> single segment no able to fit into an empty map cleanup process will still 
> fail. Probably there should be a limit on the log segment entries count?
> Here is the stack trace for this error:
> 2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
> java.lang.IllegalArgumentException: requirement failed: Attempt to add a new 
> entry to a full offset map.
>at scala.Predef$.require(Predef.scala:233)
>at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79)
>at 
> kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543)
>at 
> kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538)
>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>at kafka.message.MessageSet.foreach(MessageSet.scala:67)
>at 
> kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512)
>at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512)
>at kafka.log.Cleaner.clean(LogCleaner.scala:307)
>at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
>at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
>at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2235) LogCleaner offset map overflow

2015-10-23 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14971740#comment-14971740
 ] 

Todd Palino commented on KAFKA-2235:


I'm sure [~jjkoshy] will follow along with more detail on this, but we've run 
into a serious problem with this check. Basically, it's impossible to perform 
this kind of check accurately before the offset map is built. We now have 
partitions that should be able to be compacted as the total number of unique 
keys is far below the size of the offset map (currently at ~39 million for our 
configuration) but the messages are very frequent and very small. Even at a 
segment size of 64 MB, we have over 300 million messages in those segments. So 
this check creates a situation where log compaction should succeed, but fails 
because of a speculative check.

While I can play the game of trying to walk back segment sizes, there's no way 
to size segments by number of messages, so it's a guessing game. In addition, 
the check is clearly wrong in that case, so I shouldn't have to config around 
it. Lastly, the check causes the log cleaner thread to exit, which means log 
compaction on the broker fails entirely, rather than just skipping that 
partition.

A better way to handle this would be to cleanly catch the original error you 
are seeing, generate a clear error message in the logs as to what the failure 
is, and allow the log cleaner to continue and handle other partitions. You 
could also maintain a blacklist of partitions in memory in the log cleaner to 
make sure you don't come back around and try and compact the partition again.

> LogCleaner offset map overflow
> --
>
> Key: KAFKA-2235
> URL: https://issues.apache.org/jira/browse/KAFKA-2235
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 0.8.1, 0.8.2.0
>Reporter: Ivan Simoneko
>Assignee: Ivan Simoneko
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2235_v1.patch, KAFKA-2235_v2.patch
>
>
> We've seen log cleaning generating an error for a topic with lots of small 
> messages. It seems that cleanup map overflow is possible if a log segment 
> contains more unique keys than empty slots in offsetMap. Check for baseOffset 
> and map utilization before processing segment seems to be not enough because 
> it doesn't take into account segment size (number of unique messages in the 
> segment).
> I suggest to estimate upper bound of keys in a segment as a number of 
> messages in the segment and compare it with the number of available slots in 
> the map (keeping in mind desired load factor). It should work in cases where 
> an empty map is capable to hold all the keys for a single segment. If even a 
> single segment no able to fit into an empty map cleanup process will still 
> fail. Probably there should be a limit on the log segment entries count?
> Here is the stack trace for this error:
> 2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
> java.lang.IllegalArgumentException: requirement failed: Attempt to add a new 
> entry to a full offset map.
>at scala.Predef$.require(Predef.scala:233)
>at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79)
>at 
> kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543)
>at 
> kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538)
>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>at kafka.message.MessageSet.foreach(MessageSet.scala:67)
>at 
> kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512)
>at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512)
>at kafka.log.Cleaner.clean(LogCleaner.scala:307)
>at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
>at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
>at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-38: ZooKeeper Authentication

2015-10-21 Thread Todd Palino
There seems to be a bit of detail lacking in the KIP. Specifically, I'd
like to understand:

1) What znodes are the brokers going to secure? Is this configurable? How?
2) What ACL is the broker going to apply? Is this configurable?
3) How will the admin tools (such as preferred replica election and
partition reassignment) interact with this?

-Todd


On Wed, Oct 21, 2015 at 9:16 AM, Ismael Juma  wrote:

> On Wed, Oct 21, 2015 at 5:04 PM, Flavio Junqueira  wrote:
>
> > Bringing the points Grant brought to this thread:
> >
> > > Is it worth mentioning the follow up steps that were discussed in the
> KIP
> > > call in this KIP document? Some of them were:
> > >
> > >   - Adding SSL support for Zookeeper
> > >   - Removing the "world readable" assumption
> > >
> >
> > Grant, how would you do it? I see three options:
> >
> > 1- Add to the existing KIP, but then the functionality we should be
> > checking in soon won't include it, so the KIP will remain incomplete
> >
>
> A "Future work" section would make sense to me, but I don't know how this
> is normally handled.
>
> Ismael
>


Re: [VOTE] KIP-38: ZooKeeper authentication

2015-10-21 Thread Todd Palino
I've added a reply on the discuss thread already. However, the point is
that if there were changes as a result of the KIP call (which I often can't
make on Tuesdays), it should be updated on the wiki page so everyone is
aware of what is being voted on.

-Todd


On Wed, Oct 21, 2015 at 9:47 AM, Flavio Junqueira <f...@apache.org> wrote:

> Todd,
>
> There is a discuss thread for this KIP and we talked about it during the
> KIP call yesterday. I'm more than happy to add detail to the KIP if you
> bring it up in the discuss thread. I'd rather not stop the vote thread,
> though.
>
> Thanks,
> -Flavio
>
> > On 21 Oct 2015, at 17:41, Todd Palino <tpal...@gmail.com> wrote:
> >
> > While this is a great idea, is it really ready for vote? I don't see any
> > detail in the wiki about what trees will be secured, and whether or not
> > that is configurable. I also don't see anything about how the use of
> admin
> > tools is going to be addressed.
> >
> > -Todd
> >
> > On Wed, Oct 21, 2015 at 8:48 AM, Grant Henke <ghe...@cloudera.com>
> wrote:
> >
> >> +1
> >>
> >> Is it worth mentioning the follow up steps that were discussed in the
> KIP
> >> call in this KIP document? Some of them were:
> >>
> >>   - Adding SSL support for Zookeeper
> >>   - Removing the "world readable" assumption
> >>
> >> Thank you,
> >> Grant
> >>
> >> On Wed, Oct 21, 2015 at 10:23 AM, Onur Karaman <
> >> okara...@linkedin.com.invalid> wrote:
> >>
> >>> +1
> >>>
> >>> On Wed, Oct 21, 2015 at 8:17 AM, Flavio Junqueira <f...@apache.org>
> >> wrote:
> >>>
> >>>> Thanks everyone for the feedback so far. At this point, I'd like to
> >> start
> >>>> a vote for KIP-38.
> >>>>
> >>>> Summary: Add support for ZooKeeper authentication
> >>>> KIP page:
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-38%3A+ZooKeeper+Authentication
> >>>> <
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-38:+ZooKeeper+Authentication
> >>>>>
> >>>>
> >>>> Thanks,
> >>>> -Flavio
> >>>
> >>
> >>
> >>
> >> --
> >> Grant Henke
> >> Software Engineer | Cloudera
> >> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> >>
>
>


Re: [VOTE] KIP-38: ZooKeeper authentication

2015-10-21 Thread Todd Palino
While this is a great idea, is it really ready for vote? I don't see any
detail in the wiki about what trees will be secured, and whether or not
that is configurable. I also don't see anything about how the use of admin
tools is going to be addressed.

-Todd

On Wed, Oct 21, 2015 at 8:48 AM, Grant Henke  wrote:

> +1
>
> Is it worth mentioning the follow up steps that were discussed in the KIP
> call in this KIP document? Some of them were:
>
>- Adding SSL support for Zookeeper
>- Removing the "world readable" assumption
>
> Thank you,
> Grant
>
> On Wed, Oct 21, 2015 at 10:23 AM, Onur Karaman <
> okara...@linkedin.com.invalid> wrote:
>
> > +1
> >
> > On Wed, Oct 21, 2015 at 8:17 AM, Flavio Junqueira 
> wrote:
> >
> > > Thanks everyone for the feedback so far. At this point, I'd like to
> start
> > > a vote for KIP-38.
> > >
> > > Summary: Add support for ZooKeeper authentication
> > > KIP page:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-38%3A+ZooKeeper+Authentication
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-38:+ZooKeeper+Authentication
> > > >
> > >
> > > Thanks,
> > > -Flavio
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>


Re: [DISCUSS] KIP-38: ZooKeeper Authentication

2015-10-21 Thread Todd Palino
Comments inline.

In addition, the documentation on the migration path is good, but do we
really need a separate utility? Would it be better to have checking and
setting the ACLs be a function of the controller, possibly as a separate
thread either only at controller startup or periodically, with information
available about when the check runs and when it completes (I would like to
see this in a metric - we already have problems determining whether or not
the log compaction thread is running). This would provide continuous
coverage on the ACLs.

Also, what is the downgrade plan?


On Wed, Oct 21, 2015 at 9:56 AM, Flavio Junqueira <f...@apache.org> wrote:

>
> > On 21 Oct 2015, at 17:47, Todd Palino <tpal...@gmail.com> wrote:
> >
> > There seems to be a bit of detail lacking in the KIP. Specifically, I'd
> > like to understand:
> >
> > 1) What znodes are the brokers going to secure? Is this configurable?
> How?
>
> Currently it is securing all paths here except the consumers one:
>
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L56
> <
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L56
> >
>
> This isn't configurable at the moment.
>

That's fine. As long as the consumers tree is exempted, and the admin tools
continue to work properly, I don't see any problems with this not being
configurable. All of those paths are specific to the brokers.



> > 2) What ACL is the broker going to apply? Is this configurable?
>
> The default is CREATOR_ALL_ACL + READ_ACL_UNSAFE, which means that an
> authenticated client can manipulate secured znodes and everyone can read
> znodes. The API of ZkUtils accommodates other ACLs, but the current code is
> using the default.
>

I think we should consider making this configurable. A specific use case I
can see is that in an environment where you have multiple users of the
Zookeeper that your Kafka cluster is in, you will want to separately
protect those applications and Kafka. We may not want to do this as part of
the initial KIP work, as I think it's important to get this in as soon as
possible in some form (we have problems that this will address), but what
do you think about making this improvement shortly thereafter?


> > 3) How will the admin tools (such as preferred replica election and
> > partition reassignment) interact with this?
> >
>
> Currently, you need to set a system property passing the login config file
> to be able to authenticate the client and perform writes to ZK.
>

OK. I'll assume that your changes will include modifying the tools and
script helpers to make this easy to do :)

All of this should be clearly documented in the KIP wiki as well.


> -Flavio
>
> > -Todd
> >
> >
> > On Wed, Oct 21, 2015 at 9:16 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> >> On Wed, Oct 21, 2015 at 5:04 PM, Flavio Junqueira <f...@apache.org>
> wrote:
> >>
> >>> Bringing the points Grant brought to this thread:
> >>>
> >>>> Is it worth mentioning the follow up steps that were discussed in the
> >> KIP
> >>>> call in this KIP document? Some of them were:
> >>>>
> >>>>  - Adding SSL support for Zookeeper
> >>>>  - Removing the "world readable" assumption
> >>>>
> >>>
> >>> Grant, how would you do it? I see three options:
> >>>
> >>> 1- Add to the existing KIP, but then the functionality we should be
> >>> checking in soon won't include it, so the KIP will remain incomplete
> >>>
> >>
> >> A "Future work" section would make sense to me, but I don't know how
> this
> >> is normally handled.
> >>
> >> Ismael
> >>
>
>


Re: [DISCUSS] KIP-38: ZooKeeper Authentication

2015-10-21 Thread Todd Palino
Thanks for the clarification on that, Jun. Obviously, we haven't been doing
much with ZK authentication around here yet. There is still a small concern
there, mostly in that you should not share credentials any more than is
necessary, which would argue for being able to use a different ACL than the
default. I don't really like the idea of having to use the exact same
credentials for executing the admin tools as we do for running the brokers.
Given that we don't need to share the credentials with all consumers, I
think we can work around it.

This does bring up another good question, however. What will be the process
for having to rotate the credentials? That is, if the credentials are
compromised and need to be changed, how can that be accomplished with the
cluster online. I'm guessing some combination of using skipAcl on the
Zookeeper ensemble and config changes to the brokers will be required, but
this is an important enough operation that we should make sure it's
reasonable to perform and that it is documented.

-Todd


On Wed, Oct 21, 2015 at 1:23 PM, Jun Rao <j...@confluent.io> wrote:

> Parth,
>
> For 2), in your approach, the broker/controller will then always have the
> overhead of resetting the ACL on startup after zookeeper.set.acl is set to
> true. The benefit of using a separate migration tool is that you paid the
> cost only once during upgrade. It is an extra step during the upgrade.
> However, given the other things that you need to do to upgrade to 0.9.0
> (e.g. two rounds of rolling upgrades on all brokers, etc), I am not sure if
> it's worth to optimize away of this step. We probably just need to document
> this clearly.
>
> Todd,
>
> Just to be clear about the shared ZK usage. Once you set CREATOR_ALL_ACL +
> READ_ACL_UNSAFE on a path, only ZK clients with the same user as the
> creator can modify the path. Other ZK clients authenticated with a
> different user can read, but not modify the path. Are you concerned about
> the reads or the writes to ZK?
>
> Thanks,
>
> Jun
>
>
>
> On Wed, Oct 21, 2015 at 10:46 AM, Flavio Junqueira <f...@apache.org> wrote:
>
> >
> > > On 21 Oct 2015, at 18:07, Parth Brahmbhatt <
> pbrahmbh...@hortonworks.com>
> > wrote:
> > >
> > > I have 2 suggestions:
> > >
> > > 1) We need to document how does one move from secure to non secure
> > > environment:
> > >   1) change the config on all brokers to zookeeper.set.acl = false
> > and do a
> > > rolling upgrade.
> > >   2) Run the migration script with the jass config file so it is
> sasl
> > > authenticated with zookeeper and change the acls on all subtrees back
> to
> > > World modifiable.
> > >   3) remove the jaas config / or only the zookeeper section from
> the
> > jaas,
> > > and restart all brokers.
> > >
> >
> > Thanks for bringing it up, it makes sense to have a downgrade path and
> > document it.
> >
> >
> > > 2) I am not sure if we should force users trying to move from unsecure
> to
> > > secure environment to execute the migration script. In the second step
> > > once the zookeeper.set.acl is set to true, we can secure all the
> subtrees
> > > by calling ensureCorrectAcls as part of broker initialization (just
> after
> > > makesurePersistentPathExists). Not sure why we want to add one more
> > > manual/admin step when it can be automated. This also has the added
> > > advantage that migration script will not have to take a flag as input
> to
> > > figure out if it should set the acls to secure or unsecure given it
> will
> > > always be used to move from secure to unsecure.
> > >
> >
> > The advantage of the third step is to make a single traversal to change
> > any remaining znodes with the open ACL. As you suggest, each broker would
> > do it, so the overhead is much higher. I do agree that eliminating a step
> > is an advantage, though.
> >
> > > Given we are assuming all the information in zookeeper is world
> readable
> > ,
> > > I don¹t see SSL support as a must have or a blocker for this KIP.
> >
> > OK, but keep in mind that SSL is only available in the 3.5 branch of ZK.
> >
> > -Flavio
> >
> > >
> > > Thanks
> > > Parth
> > >
> > >
> > >
> > > On 10/21/15, 9:56 AM, "Flavio Junqueira" <f...@apache.org> wrote:
> > >
> > >>
> > >>> On 21 Oct 2015, at 17:47, Todd Palino <tpal...@gmail.com> wrote:
> > >>>
> > >>> There seems to be a bit of detail la

Re: [DISCUSS] KIP-38: ZooKeeper Authentication

2015-10-21 Thread Todd Palino
On Wed, Oct 21, 2015 at 3:38 PM, Flavio Junqueira <f...@apache.org> wrote:

>
> > On 21 Oct 2015, at 21:54, Todd Palino <tpal...@gmail.com> wrote:
> >
> > Thanks for the clarification on that, Jun. Obviously, we haven't been
> doing
> > much with ZK authentication around here yet. There is still a small
> concern
> > there, mostly in that you should not share credentials any more than is
> > necessary, which would argue for being able to use a different ACL than
> the
> > default. I don't really like the idea of having to use the exact same
> > credentials for executing the admin tools as we do for running the
> brokers.
> > Given that we don't need to share the credentials with all consumers, I
> > think we can work around it.
> >
>
> Let me add that a feature to separate the sub-trees of users sharing an
> ensemble is chroot.
>
> On different credentials for admin tools, this sounds doable by setting
> the ACLs of znodes. For example, there could be an admin id and a broker
> id, both with the ability of changing znodes, but different credentials.
> Would something like that work for you?
>

It would be a nice option to have, as the credentials can be protected
differently. I would consider this a nice to have, and not an "absolutely
must have" feature at this point.


> This does bring up another good question, however. What will be the
> process
> > for having to rotate the credentials? That is, if the credentials are
> > compromised and need to be changed, how can that be accomplished with the
> > cluster online. I'm guessing some combination of using skipAcl on the
> > Zookeeper ensemble and config changes to the brokers will be required,
> but
> > this is an important enough operation that we should make sure it's
> > reasonable to perform and that it is documented.
>
> Right now there is no kafka support in the plan for this. But this is
> doable directly through the zk api. Would it be sufficient to write down
> how to perform such an operation via the zk api or do we need a tool to do
> it?
>

I think as long as there is a documented procedure for how to do it, that
will be good enough. It's mostly about making sure that we can, and that we
don't put something in place that would require downtime to a cluster in
order to change credentials. We can always develop a tool later if it is a
requested item.

Thanks!

-Todd



>
> -Flavio
>
> >
> >
> > On Wed, Oct 21, 2015 at 1:23 PM, Jun Rao <j...@confluent.io> wrote:
> >
> >> Parth,
> >>
> >> For 2), in your approach, the broker/controller will then always have
> the
> >> overhead of resetting the ACL on startup after zookeeper.set.acl is set
> to
> >> true. The benefit of using a separate migration tool is that you paid
> the
> >> cost only once during upgrade. It is an extra step during the upgrade.
> >> However, given the other things that you need to do to upgrade to 0.9.0
> >> (e.g. two rounds of rolling upgrades on all brokers, etc), I am not
> sure if
> >> it's worth to optimize away of this step. We probably just need to
> document
> >> this clearly.
> >>
> >> Todd,
> >>
> >> Just to be clear about the shared ZK usage. Once you set
> CREATOR_ALL_ACL +
> >> READ_ACL_UNSAFE on a path, only ZK clients with the same user as the
> >> creator can modify the path. Other ZK clients authenticated with a
> >> different user can read, but not modify the path. Are you concerned
> about
> >> the reads or the writes to ZK?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >>
> >> On Wed, Oct 21, 2015 at 10:46 AM, Flavio Junqueira <f...@apache.org>
> wrote:
> >>
> >>>
> >>>> On 21 Oct 2015, at 18:07, Parth Brahmbhatt <
> >> pbrahmbh...@hortonworks.com>
> >>> wrote:
> >>>>
> >>>> I have 2 suggestions:
> >>>>
> >>>> 1) We need to document how does one move from secure to non secure
> >>>> environment:
> >>>>  1) change the config on all brokers to zookeeper.set.acl = false
> >>> and do a
> >>>> rolling upgrade.
> >>>>  2) Run the migration script with the jass config file so it is
> >> sasl
> >>>> authenticated with zookeeper and change the acls on all subtrees back
> >> to
> >>>> World modifiable.
> >>>>  3) remove the jaas config / or only the zookeeper section from
> >> the
> >>> jaas,
&g

Re: [VOTE] KIP-38: ZooKeeper authentication

2015-10-21 Thread Todd Palino
+1 (non-binding)

On Wed, Oct 21, 2015 at 6:53 PM, Jiangjie Qin 
wrote:

> +1 (non-binding)
>
> On Wed, Oct 21, 2015 at 3:40 PM, Joel Koshy  wrote:
>
> > +1 binding
> >
> > On Wed, Oct 21, 2015 at 8:17 AM, Flavio Junqueira 
> wrote:
> >
> > > Thanks everyone for the feedback so far. At this point, I'd like to
> start
> > > a vote for KIP-38.
> > >
> > > Summary: Add support for ZooKeeper authentication
> > > KIP page:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-38%3A+ZooKeeper+Authentication
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-38:+ZooKeeper+Authentication
> > > >
> > >
> > > Thanks,
> > > -Flavio
> >
>


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-10-19 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14964003#comment-14964003
 ] 

Todd Palino commented on KAFKA-2580:


It's about as graceful as an OOM, which is to say "not very". Essentially, it 
hits the limit and falls over and dies with an exception. We've run into it a 
bit with both leaking FDs from an implementation issue, and with runaway 
clients that don't do the right thing. In both situations, you are correct that 
you will generally end up seeing it as a cascading failure through the cluster.

> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>Assignee: Grant Henke
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-10-19 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14963986#comment-14963986
 ] 

Todd Palino commented on KAFKA-2580:


I agree with [~jkreps] here, that having a high FD limit is not a bad thing. As 
[~jjkoshy] noted, we're already running at 400k internally (recently increased 
from 200k). Part of that is to handle growth, and part of that is to have a 
good bit of headroom if something starts to leak FDs so we have some time to 
address it before it kills the process (we alert at 50% utilization).

The LRU cache option is probably the best. You can set it to an arbitrarily 
high number (the best option here might be to cap it near, but below, your 
per-process limit) if you want to effectively disable it, and it would 
generally avoid the process of having to check and act on expiring the FDs in 
the timer option. I can see arguments for setting the default either high or 
low (and I consider 10k to be low). Regardless, as long as it's configurable 
and documented it will be fine.

> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>Assignee: Grant Henke
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-19 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14964228#comment-14964228
 ] 

Todd Palino commented on KAFKA-2017:


I think we definitely need to maintain the ability to get that type of 
information, whether it's within the protocol or via an admin endpoint. Being 
able to tell what consumers exist in a group, as well as what partitions each 
of them owns, is important information to have. And it should be available not 
just from the consumers, but from the coordinator itself. That way you can 
debug issues where they have gone out of sync, and you can provide tools (such 
as burrow) to provide consumer status information independently.

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-16 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960809#comment-14960809
 ] 

Todd Palino commented on KAFKA-2017:


Just to throw in my 2 cents here, I don't think that persisting this state in a 
special topic in Kafka is a bad idea. My only concern is that we have seen 
issues with the offsets already from time to time, and we'll want to make sure 
we take those lessons learned and handle them from the start. The ones I am 
aware of are:

1) Creation of the special topic at cluster initialization. If we specify an RF 
of N for the special topic, then the brokers must make this happen. The first 
broker that comes up can't create it with an RF of 1 and own all the 
partitions. Either it must reject all operations that would use the special 
topic until N brokers are members of the cluster and the it can be created, or 
it must create the topic in such a way that as soon as there are N brokers 
available the RF is corrected to the configured number.

2) Load of the special topic into local cache. Whenever a coordinator loads the 
special topic, there is a period of time while it is loading state where it 
cannot service requests. We've seen problems with this related to log 
compaction, where the partitions were excessively large, but I can see as we 
move an increasing number of (group, partition) tuples over to Kafka-committed 
offsets it could become a scale issue very easily. This should not be a big 
deal for group state information, as that should always be smaller than the 
offset information for the group, but we may want to create a longer term plan 
for handling auto-scaling of the special topics (the ability to increase the 
number of partitions and move group information from the partition it used to 
hash to to the one it hashes to after scaling).

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2015-09-29 Thread Todd Palino
We should also consider what else should be negotiated between the broker
and the client as this comes together. The version is definitely first, but
there are other things, such as the max message size, that should not need
to be replicated on both the broker and the client. Granted, max message
size has per-topic overrides as well, but that should also be considered
(possibly as an addition to the topic metadata response).

Ideally you never want a requirement that is enforced by the broker to be a
surprise to the client, whether that's a supported version or a
configuration parameter. The client should not have to know it in advance
(except for the most basic of connection parameters), and even if it does
have it as a configuration option, it should be able to know before it even
starts running that what it has configured is in conflict with the server.

-Todd


On Tue, Sep 29, 2015 at 11:08 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Right. But there should be a max old version that the broker should support
> to avoid these incompatibility issues.
> For example, if the broker is at version X, it should be able to support
> the versions (clients and interbroker) till X-2. In case we have brokers
> and clients older than that it can send a response warning them to upgrade
> till X-2 minimum.
> The backward compatibility limit can be discussed further. This will help
> for rolling upgrades.
>
> Thanks,
>
> Mayuresh
>
> On Tue, Sep 29, 2015 at 8:25 AM, Grant Henke  wrote:
>
> > If we create a protocol version negotiation api for clients, can we use
> it
> > to replace or improve the ease of upgrades that break inter-broker
> > messaging?
> >
> > Currently upgrades that break the wire protocol take 2 rolling restarts.
> > The first restart we set inter.broker.protocol.version telling all
> brokers
> > to communicate on the old version, and then we restart again removing the
> > inter.broker.protocol.version. With this api the brokers could agree on a
> > version to communicate with, and when bounced re-negotiate to the new
> > version.
> >
> >
> > On Mon, Sep 28, 2015 at 10:26 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Nice write-up.
> > >
> > > Just had a question, instead of returning an empty response back to the
> > > client, would it be better for the broker to return a response that
> gives
> > > some more info to the client regarding the min version they need to
> > upgrade
> > > to in order to communicate with the broker.
> > >
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Mon, Sep 28, 2015 at 6:36 PM, Jiangjie Qin
>  > >
> > > wrote:
> > >
> > > > Thanks for the writeup. I also think having a specific protocol for
> > > > client-broker version negotiation is better.
> > > >
> > > > I'm wondering is it better to let the broker to decide the version to
> > > use?
> > > > It might have some value If brokers have preference for a particular
> > > > version.
> > > > Using a global version is a good approach. For the client-broker
> > > > negotiation, I am thinking about something like:
> > > >
> > > > ProtocolSyncRequest => ClientType [ProtocolVersion]
> > > > ClientType => int8
> > > > ProtocolVersion => int16
> > > >
> > > > ProtocolSyncResponse => PreferredVersion
> > > > PreferredVersion => int16
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Mon, Sep 28, 2015 at 11:59 AM, Jun Rao  wrote:
> > > >
> > > > > I agree with Ewen that having the keys explicitly specified in the
> > > > response
> > > > > is better.
> > > > >
> > > > > In addition to the supported protocol version, there are other
> > > > interesting
> > > > > metadata at the broker level that could be of interest to things
> like
> > > > admin
> > > > > tools (e.g., used disk space, remaining disk space, etc). I am
> > > wondering
> > > > if
> > > > > we should separate those into different requests. For inquiring the
> > > > > protocol version, we can have a separate BrokerProtocolRequest. The
> > > > > response will just include the broker version and perhaps a list of
> > > > > supported requests and versions?
> > > > >
> > > > > As for sending an empty response for unrecognized requests, do you
> > how
> > > is
> > > > > that handled in other similar systems?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Sep 28, 2015 at 10:47 AM, Jason Gustafson <
> > ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Having the version API can make clients more robust, so I'm in
> > favor.
> > > > One
> > > > > > note on the addition of the "rack" field. Since this is a
> > > > broker-specific
> > > > > > setting, the client would have to query BrokerMetadata for every
> > new
> > > > > broker
> > > > > > it connects to (unless we also expose rack in TopicMetadata).
> This
> > is
> > > > > also
> > > > > > kind of unfortunate for admin 

Re: [DISCUSS] KIP-36 - Rack aware replica assignment

2015-09-28 Thread Todd Palino
I tend to like the idea of a pluggable locator. For example, we already
have an interface for discovering information about the physical location
of servers. I don't relish the idea of having to maintain data in multiple
places.

-Todd

On Mon, Sep 28, 2015 at 4:48 PM, Aditya Auradkar <
aaurad...@linkedin.com.invalid> wrote:

> Thanks for starting this KIP Allen.
>
> I agree with Gwen that having a RackLocator class that is pluggable seems
> to be too complex. The KIP refers to potentially non-ZK storage for the
> rack info which I don't think is necessary.
>
> Perhaps we can persist this info in zk under /brokers/ids/
> similar to other broker properties and add a config in KafkaConfig called
> "rack".
> {"jmx_port":-1,"endpoints":[...],"host":"xxx","port":yyy, "rack": "abc"}
>
> Aditya
>
> On Mon, Sep 28, 2015 at 2:30 PM, Gwen Shapira  wrote:
>
> > Hi,
> >
> > First, thanks for putting out a KIP for this. This is super important for
> > production deployments of Kafka.
> >
> > Few questions:
> >
> > 1) Are we sure we want "as many racks as possible"? I'd want to balance
> > between safety (more racks) and network utilization (traffic within a
> rack
> > uses the high-bandwidth TOR switch). One replica on a different rack and
> > the rest on same rack (if possible) sounds better to me.
> >
> > 2) Rack-locator class seems overly complex compared to adding a
> rack.number
> > property to the broker properties file. Why do we want that?
> >
> > Gwen
> >
> >
> >
> > On Mon, Sep 28, 2015 at 12:15 PM, Allen Wang 
> wrote:
> >
> > > Hello Kafka Developers,
> > >
> > > I just created KIP-36 for rack aware replica assignment.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment
> > >
> > > The goal is to utilize the isolation provided by the racks in data
> center
> > > and distribute replicas to racks to provide fault tolerance.
> > >
> > > Comments are welcome.
> > >
> > > Thanks,
> > > Allen
> > >
> >
>


Re: [VOTE] KIP-31 - Move to relative offsets in compressed message sets.

2015-09-23 Thread Todd Palino
+1000

!

-Todd

On Wednesday, September 23, 2015, Jiangjie Qin 
wrote:

> Hi,
>
> Thanks a lot for the reviews and feedback on KIP-31. It looks all the
> concerns of the KIP has been addressed. I would like to start the voting
> process.
>
> The short summary for the KIP:
> We are going to use the relative offset in the message format to avoid
> server side recompression.
>
> In case you haven't got a chance to check, here is the KIP link.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
>
> Thanks,
>
> Jiangjie (Becket) Qin
>


Re: [DISCUSS] KIP-31 - Message format change proposal

2015-09-06 Thread Todd Palino
So, with regards to why you want to search by timestamp, the biggest
problem I've seen is with consumers who want to reset their timestamps to a
specific point, whether it is to replay a certain amount of messages, or to
rewind to before some problem state existed. This happens more often than
anyone would like.

To handle this now we need to constantly export the broker's offset for
every partition to a time-series database and then use external processes
to query this. I know we're not the only ones doing this. The way the
broker handles requests for offsets by timestamp is a little obtuse
(explain it to anyone without intimate knowledge of the internal workings
of the broker - every time I do I see this). In addition, as Becket pointed
out, it causes problems specifically with retention of messages by time
when you move partitions around.

I'm deliberately avoiding the discussion of what timestamp to use. I can
see the argument either way, though I tend to lean towards the idea that
the broker timestamp is the only viable source of truth in this situation.

-Todd


On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava 
wrote:

> On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps  wrote:
>
> >
> > 2. Nobody cares what time it is on the server.
> >
>
> This is a good way of summarizing the issue I was trying to get at, from an
> app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention) is
> reasonably handled by a server-side timestamp. I really just care that a
> message is there long enough that I have a chance to process it. #3
> (searching by timestamp) only seems useful if we can guarantee the
> server-side timestamp is close enough to the original client-side
> timestamp, and any mirror maker step seems to break that (even ignoring any
> issues with broker availability).
>
> I'm also wondering whether optimizing for search-by-timestamp on the broker
> is really something we want to do given that messages aren't really
> guaranteed to be ordered by application-level timestamps on the broker. Is
> part of the need for this just due to the current consumer APIs being
> difficult to work with? For example, could you implement this pretty easily
> client side just the way you would broker-side? I'd imagine a couple of
> random seeks + reads during very rare occasions (i.e. when the app starts
> up) wouldn't be a problem performance-wise. Or is it also that you need the
> broker to enforce things like monotonically increasing timestamps since you
> can't do the query properly and efficiently without that guarantee, and
> therefore what applications are actually looking for *is* broker-side
> timestamps?
>
> -Ewen
>
>
>
> > Consider cases where data is being copied from a database or from log
> > files. In steady-state the server time is very close to the client time
> if
> > their clocks are sync'd (see 1) but there will be times of large
> divergence
> > when the copying process is stopped or falls behind. When this occurs it
> is
> > clear that the time the data arrived on the server is irrelevant, it is
> the
> > source timestamp that matters. This is the problem you are trying to fix
> by
> > retaining the mm timestamp but really the client should always set the
> time
> > with the use of server-side time as a fallback. It would be worth talking
> > to the Samza folks and reading through this blog post (
> >
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> > )
> > on this subject since we went through similar learnings on the stream
> > processing side.
> >
> > I think the implication of these two is that we need a proposal that
> > handles potentially very out-of-order timestamps in some kind of sanish
> way
> > (buggy clients will set something totally wrong as the time).
> >
> > -Jay
> >
> > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps  wrote:
> >
> > > The magic byte is used to version message format so we'll need to make
> > > sure that check is in place--I actually don't see it in the current
> > > consumer code which I think is a bug we should fix for the next release
> > > (filed KAFKA-2523). The purpose of that field is so there is a clear
> > check
> > > on the format rather than the scrambled scenarios Becket describes.
> > >
> > > Also, Becket, I don't think just fixing the java client is sufficient
> as
> > > that would break other clients--i.e. if anyone writes a v1 messages,
> even
> > > by accident, any non-v1-capable consumer will break. I think we
> probably
> > > need a way to have the server ensure a particular message format either
> > at
> > > read or write time.
> > >
> > > -Jay
> > >
> > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin  >
> > > wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> I checked the code again. Actually CRC check probably won't fail. The
> > >> newly
> > >> added timestamp field might be treated as keyLength instead, so we are
> > >> likely 

[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2015-08-22 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708282#comment-14708282
 ] 

Todd Palino commented on KAFKA-1566:


This makes a lot of sense. We don't use any of the standard start/stop 
components or scripts, just the admin CLI tools, so I don't see this having an 
effect on us.

 Kafka environment configuration (kafka-env.sh)
 --

 Key: KAFKA-1566
 URL: https://issues.apache.org/jira/browse/KAFKA-1566
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Cosmin Lehene
Assignee: Sriharsha Chintalapani
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, 
 KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch


 It would be useful (especially for automated deployments) to have an 
 environment configuration file that could be sourced from the launcher files 
 (e.g. kafka-run-server.sh). 
 This is how this could look like kafka-env.sh 
 {code}
 export KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseCompressedOops 
 -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
 -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
 -XX:InitiatingHeapOccupancyPercent=35' % 
 export KAFKA_HEAP_OPTS='-Xmx1G -Xms1G' % 
 export KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=/var/log/kafka 
 {code} 
 kafka-server-start.sh 
 {code} 
 ... 
 source $base_dir/config/kafka-env.sh 
 ... 
 {code} 
 This approach is consistent with Hadoop and HBase. However the idea here is 
 to be able to set these values in a single place without having to edit 
 startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: kafka - tcp ports

2015-08-02 Thread Todd Palino
It does not. That last connection, the one in CLOSE_WAIT, is an outbound
connection from the broker you are looking at to one of the other brokers.
57821 is the source TCP port, and it is selected (somewhat) randomly from a
range of high port numbers. Note that the other end of the connection is
9092, which is the destination port on the remote host.

This is likely one of the replica fetchers.

-Todd


On Sun, Aug 2, 2015 at 6:58 AM, madhavan kumar madhavan020...@gmail.com
wrote:

 hi all,
i am monitoring the ports that are used by kafka server on my machine.
 When i do,

 sudo netstat -ntp | grep 9092

 it shows,

 tcp0  0 192.168.122.1:9092  192.168.122.6:59158
 ESTABLISHED 27413/java
 tcp0  0 192.168.122.1:9092  192.168.122.1:58338
 ESTABLISHED 27413/java
 *tcp1  0 192.168.122.1:57821 http://192.168.122.1:57821
 192.168.122.1:9092 http://192.168.122.1:9092  CLOSE_WAIT
 27413/java *


 The last entry in CLOSE_WAIT state surprises me because

 sudo lsof -i tcp:57821
 ​  shows​


 COMMAND   PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
 java27413 root   42u  IPv4 3266648  0t0  TCP
 sk-box:57821-sk-box:9092 (CLOSE_WAIT)

 indicates, that the port 57821 is owned by kafka process
 ​ (pid - 27413)​
 . Does kafka use extra ports other than listening port, 9092 to talk to
 itself?

 thanks,
 saravana



Re: Gauging Interest in adding Encryption to Kafka

2015-07-31 Thread Todd Palino
1 - Yes, authorization combined with encryption does get us most of the way
there. However, depending on the auditor it might not be good enough. The
problem is that if you are encrypting at the broker, then by definition
anyone who has access to the broker (i.e. operations staff) have access to
the data. Consider the case where you are passing salary and other
information through the system, and those people do not need a view of it.
I admit, the 90% solution might be better here than going for a perfect
solution, but it is something to think about.

2 - My worry is people wanting to integrate with different key systems. For
example, one person may be fine with providing it in a config file, while
someone else may want to use the solution from vendor A, someone else wants
vendor B, and yet another person wants this obscure hardware-based solution
that exists elsewhere.

The compaction concern is definitely a good one I hadn't thought of. I'm
wondering if it's reasonable to just say that compaction will not work
properly with encrypted keys if you do not have consistent encryption (that
is, the same string encrypts to the same string every time).

Ultimately I don't like the idea of the broker doing any encrypt/decrypt
steps OR compression/decompression. This is all CPU overhead that you're
concentrating in one place instead of distributing the load out to the
clients. Now yes, I know that the broker decompresses to check the CRC and
assign offsets and then compresses, and we can potentially avoid the
compression step with assigning the batch an offset and a count instead but
we still need to consider the CRC. Adding encrypt/decrypt steps adds even
more overhead and it's going to get very difficult to handle even 2 Gbits
worth of traffic at that rate.

There are other situations that concern me, such as revocation of keys, and
I'm not sure whether it is better with client-based or server-based
encryption. For example, if I want to revoke a key with client-based
encryption it becomes similar to how we handle Avro schemas (internally)
now - you change keys, and depending on what your desire is you either
expire out the data for some period of time with the older keys, or you
just let it sit there and your consuming clients won't have an issue. With
broker-based encryption, the broker has to work with the multiple keys
per-topic.

-Todd


On Fri, Jul 31, 2015 at 2:38 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Good points :)

 1) Kafka already (pending commit) has an authorization layer, so
 theoretically we are good for SOX, HIPAA, PCI, etc. Transparent broker
 encryption will support PCI
 never-let-unencrypted-card-number-hit-disk.

 2) Agree on Key Management being complete PITA. It may better to
 centralize this pain in the broker rather than distributing it to
 clients. Any reason you think its better to let the clients handle it?
 The way I see it, we'll need to handle key management the way we did
 authorization - give an API for interfacing with existing systems.

 More important, we need the broker to be able to decrypt and encrypt
 in order to support compaction (unless we can find a cool
 key-uniqueness-preserving encryption algorithm, but this may not be as
 secure). I think we also need the broker to be able to re-compress
 data, and since we always encrypt compressed bits (compressing
 encrypted bits doesn't compress), we need the broker to decrypt before
 re-compressing.



 On Fri, Jul 31, 2015 at 2:27 PM, Todd Palino tpal...@gmail.com wrote:
  It does limit it to clients that have an implementation for encryption,
  however encryption on the client side is better from an auditing point of
  view (whether that is SOX, HIPAA, PCI, or something else). Most of those
  types of standards are based around allowing visibility of data to just
 the
  people who need it. That includes the admins of the system (who are often
  not the people who use the data).
 
  Additionally, key management is a royal pain, and there are lots of
  different types of systems that one may want to use. This is a pretty big
  complication for the brokers.
 
  -Todd
 
 
  On Fri, Jul 31, 2015 at 2:21 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  I've seen interest in HDFS-like encryption zones in Kafka.
 
  This has the advantage of magically encrypting data at rest regardless
  of which client is used as a producer.
  Adding it on the client side limits the feature to the java client.
 
  Gwen
 
  On Fri, Jul 31, 2015 at 1:20 PM, eugene miretsky
  eugene.miret...@gmail.com wrote:
   I think that Hadoop and Cassandra do [1] (Transparent Encryption)
  
   We're doing [2] (on a side note, for [2] you still need
 authentication on
   the producer side - you don't want an unauthorized user writing
 garbage).
   Right now we have the 'user' doing the  encryption and submitting raw
  bytes
   to the producer. I was suggesting implementing an encryptor in the
   producer itself - I think it's cleaner and can be reused

Re: Gauging Interest in adding Encryption to Kafka

2015-07-31 Thread Todd Palino
It does limit it to clients that have an implementation for encryption,
however encryption on the client side is better from an auditing point of
view (whether that is SOX, HIPAA, PCI, or something else). Most of those
types of standards are based around allowing visibility of data to just the
people who need it. That includes the admins of the system (who are often
not the people who use the data).

Additionally, key management is a royal pain, and there are lots of
different types of systems that one may want to use. This is a pretty big
complication for the brokers.

-Todd


On Fri, Jul 31, 2015 at 2:21 PM, Gwen Shapira gshap...@cloudera.com wrote:

 I've seen interest in HDFS-like encryption zones in Kafka.

 This has the advantage of magically encrypting data at rest regardless
 of which client is used as a producer.
 Adding it on the client side limits the feature to the java client.

 Gwen

 On Fri, Jul 31, 2015 at 1:20 PM, eugene miretsky
 eugene.miret...@gmail.com wrote:
  I think that Hadoop and Cassandra do [1] (Transparent Encryption)
 
  We're doing [2] (on a side note, for [2] you still need authentication on
  the producer side - you don't want an unauthorized user writing garbage).
  Right now we have the 'user' doing the  encryption and submitting raw
 bytes
  to the producer. I was suggesting implementing an encryptor in the
  producer itself - I think it's cleaner and can be reused by other users
  (instead of having to do their own encryption)
 
  Cheers,
  Eugene
 
  On Fri, Jul 31, 2015 at 4:04 PM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
 
  I think the goal here is to make the actual message stored on broker to
 be
  encrypted, because after we have SSL, the transmission would be
 encrypted.
 
  In general there might be tow approaches:
  1. Broker do the encryption/decryption
  2. Client do the encryption/decryption
 
  From performance point of view, I would prefer [2]. It is just in that
  case, maybe user does not necessarily need to use SSL anymore because
 the
  data would be encrypted anyway.
 
  If we let client do the encryption, there are also two ways to do so -
  either we let producer take an encryptor or users can do
  serialization/encryption outside the producer and send raw bytes. The
 only
  difference between the two might be flexibility. For example, if someone
  wants to know the actual bytes of a message that got sent over the wire,
  doing it outside the producer would probably more preferable.
 
  Jiangjie (Becket) Qin
 
  On Thu, Jul 30, 2015 at 12:16 PM, eugene miretsky 
  eugene.miret...@gmail.com
   wrote:
 
   Hi,
  
   Based on the security wiki page
   https://cwiki.apache.org/confluence/display/KAFKA/Security
 encryption
  of
   data at rest is out of scope for the time being. However, we are
implementing  encryption in Kafka and would like to see if there is
   interest in submitting a patch got it.
  
   I suppose that one way to implement  encryption would be to add an
   'encrypted key' field to the Message/MessageSet  structures in the
   wire protocole - however, this is a very big and fundamental change.
  
   A simpler way to add encryption support would be:
   1) Custom Serializer, but it wouldn't be compatible with other  custom
   serializers (Avro, etc. )
   2)  Add a step in KafkaProducer after serialization to encrypt the
 data
   before it's being submitted to the accumulator (encryption is done in
 the
   submitting thread, not in the producer io thread)
  
   Is there interest in adding #2 to Kafka?
  
   Cheers,
   Eugene
  
 



Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Todd Palino
Since I've been dealing with the fallout of this particular problem all
week, I'll add a few thoughts...


On Wed, Jul 22, 2015 at 10:51 AM, Gwen Shapira gshap...@cloudera.com
wrote:



  On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
   core/src/main/scala/kafka/network/SocketServer.scala, line 465
   
 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465
 
  
   Turns out that catching Throwable is a really bad idea:
 https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
 
  Jiangjie Qin wrote:
  Ah... Didn't know that before. I explicitly listed the exceptions.
 
  Guozhang Wang wrote:
  Searching : Throwable gives me 180+ cases in code base :P Though
 many of them are from unit tests (which, arguably maybe OK) there are still
 a lot in the core package. I agree that we should avoid catching Throwable
 whenever possible, which will also help enforcing the developers to think
 about possible checked exceptions in the calling trace.

 I know :(
 I'm not sure if going over and converting everything is worth the effort.
 Although it can be a nice newbie jira.


  On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
   core/src/main/scala/kafka/network/SocketServer.scala, line 400
   
 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400
 
  
   So in case of unexpected exception, we log an error and keep
 running?
  
   Isn't it better to kill the processor, since we don't know what's
 the state of the system? If the acceptor keeps placing messages in the
 queue for a dead processor, isn't it a separate issue?
 
  Jiangjie Qin wrote:
  This part I'm not quite sure. I am not very experienced in the error
 handling in such case, so please correct me if I missed something.
  Here is what I thought.
 
  The way it currently works is that the acceptor will
  1. accept new connection request and create new socket channel
  2. choose a processor and put the socket channel into the
 processor's new connection queue
 
  The processor will just take the socket channels from the queue and
 register it to the selector.
  If the processor runs and get an uncaught exception, there are
 several possibilities.
  Case 1: The exception was from one socket channel.
  Case 2: The exception was associated with a bad request.
  In case 1, ideally we should just disconnect that socket channel
 without affecting other socket channels.
  In case 2, I think we should log the error and skip the message -
 assuming client will retry sending data if no response was received for a
 given peoriod of time.
 
  I am not sure if letting processor exit is a good idea because this
 will lead to the result of a badly behaving client screw the entire cluster
 - it might screw processors one by one. Comparing with that, I kind of
 leaning towards keeping the processor running and serving other normal TCP
 connections if possible, but log the error so monitoring system can detect
 and see if human intervention is needed.
 
  Also, I don't know what to do here to prevent the thread from
 exiting without catching all the throwables.
  According to this blog
 http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
  I guess I can rethrow all the ControlThrowables, but intercept the
 rests?
 
  Guozhang Wang wrote:
  I would also prefer not to close the Processor thread upon
 exceptions, mainly for avoid one bad client killing a shared Kafka cluster.
 In the past we have seen such issues like DDoS MetadataRequest killing the
 cluster and all other clients gets affected, etc, and the quota work is
 towards preventing it. Since Processor threads are shared (8 by default on
 a broker), it should not be closed by a single socket / bad client request.

 I like your thinking around cases #1 and #2. I think this should go as a
 code comment somewhere, so when people improve / extend SocketServer they
 will keep this logic in mind. Maybe even specify in specific catch clauses
 if they are handling possible errors in request level or channel level.

 My concern is with possible case #3: Each processor has an
 o.a.k.common.network.Selector. I'm concerned about the possibility of
 something going wrong in the state of the selector, which will possibly be
 an issue for all channels. For example failure to register could be an
 issue with the channel.register call, but also perhaps an issue with
 keys.put (just an example - I'm not sure something can actually break keys
 table).

 I'd like to be able to identify cases where the Selector state may have
 gone wrong and close the processor in that case. Does that make any sense?
 Or am I being too paranoid?


If there are error cases that are not associated with a specific connection
or request, then I agree that we should handle that a little differently.
What we need to keep in mind is that close the processor is actually
terminate the 

Re: [Discussion] Limitations on topic names

2015-07-11 Thread Todd Palino
 find dots more common in my customer base, so I will definitely feel
 the pain of removing them.
 
 However, . are already used in metrics, file names, directories, etc
 - so if we keep the dots, we need to keep code that translates them
 and document the translation. Just banning . seems more natural.
 Also, as Grant mentioned, we'll probably have our own special usage
 for . down the line.
 
 On Fri, Jul 10, 2015 at 2:12 PM, Todd Palino tpal...@gmail.com wrote:
 I absolutely disagree with #2, Neha. That will break a lot of
 infrastructure within LinkedIn. That said, removing . might break
 other
 people as well, but I think we should have a clearer idea of how much
 usage
 there is on either side.
 
 -Todd
 
 
 On Fri, Jul 10, 2015 at 2:08 PM, Neha Narkhede n...@confluent.io
 wrote:
 
 . seems natural for grouping topic names. +1 for 2) going forward
 only
 without breaking previously created topics with _ though that might
 require us to patch the code somewhat awkwardly till we phase it out
 a
 couple (purposely left vague to stay out of Ewen's wrath :-))
 versions
 later.
 
 On Fri, Jul 10, 2015 at 2:02 PM, Gwen Shapira gshap...@cloudera.com
 
 wrote:
 
 I don't think we should break existing topics. Just disallow new
 topics going forward.
 
 Agree that having both is horrible, but we should have a solution
 that
 fails when you run kafka_topics.sh --create, not when you
 configure
 Ganglia.
 
 Gwen
 
 On Fri, Jul 10, 2015 at 1:53 PM, Jay Kreps j...@confluent.io
 wrote:
 Unfortunately '.' is pretty common too. I agree that it is
 perverse,
 but
 people seem to do it. Breaking all the topics with '.' in the
 name
 seems
 like it could be worse than combining metrics for people who
 have a
 'foo_bar' AND 'foo.bar' (and after all, having both is DEEPLY
 perverse,
 no?).
 
 Where is our Dean of Compatibility, Ewen, on this?
 
 -Jay
 
 On Fri, Jul 10, 2015 at 1:32 PM, Todd Palino tpal...@gmail.com
 wrote:
 
 My selfish point of view is that we do #1, as we use _
 extensively
 in
 topic names here :) I also happen to think it's the right
 choice,
 specifically because . has more special meanings, as you
 noted.
 
 -Todd
 
 
 On Fri, Jul 10, 2015 at 1:30 PM, Gwen Shapira 
 gshap...@cloudera.com
 wrote:
 
 Unintentional side effect from allowing IP addresses in
 consumer
 client
 IDs :)
 
 So the question is, what do we do now?
 
 1) disallow .
 2) disallow _
 3) find a reversible way to encode . and _ that won't
 break
 existing
 metrics
 4) all of the above?
 
 btw. it looks like . and .. are currently valid. Topic
 names
 are
 used for directories, right? this sounds like fun :)
 
 I vote for option #1, although if someone has a good idea for
 #3
 it
 will be even better.
 
 Gwen
 
 
 
 On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke 
 ghe...@cloudera.com
 wrote:
 Found it was added here:
 https://issues.apache.org/jira/browse/KAFKA-697
 
 On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino 
 tpal...@gmail.com
 wrote:
 
 This was definitely changed at some point after KAFKA-495.
 The
 question
 is
 when and why.
 
 Here's the relevant code from that patch:
 ===
 --- core/src/main/scala/kafka/utils/Topic.scala (revision
 1390178)
 +++ core/src/main/scala/kafka/utils/Topic.scala (working
 copy)
 @@ -21,24 +21,21 @@
 import util.matching.Regex
 
 object Topic {
 +  val legalChars = [a-zA-Z0-9_-]
 
 
 
 -Todd
 
 
 On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke 
 ghe...@cloudera.com
 wrote:
 
 kafka.common.Topic shows that currently period is a valid
 character
 and I
 have verified I can use kafka-topics.sh to create a new
 topic
 with a
 period.
 
 
 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK
 currently
 uses
 Topic.validate before writing to Zookeeper.
 
 Should period character support be removed? I was under
 the
 same
 impression
 as Gwen, that a period was used by many as a way to
 group
 topics.
 
 The code is pasted below since its small:
 
 object Topic {
  val legalChars = [a-zA-Z0-9\\._\\-]
  private val maxNameLength = 255
  private val rgx = new Regex(legalChars + +)
 
  val InternalTopics =
 Set(OffsetManager.OffsetsTopicName)
 
  def validate(topic: String) {
if (topic.length = 0)
  throw new InvalidTopicException(topic name is
 illegal,
 can't
 be
 empty)
else if (topic.equals(.) || topic.equals(..))
  throw new InvalidTopicException(topic name cannot
 be
 \.\ or
 \..\)
else if (topic.length  maxNameLength)
  throw new InvalidTopicException(topic name is
 illegal,
 can't
 be
 longer than  + maxNameLength +  characters)
 
rgx.findFirstIn(topic) match {
  case Some(t) =
if (!t.equals(topic))
  throw new InvalidTopicException(topic name  +
 topic
 + 
 is
 illegal, contains a character other than ASCII
 alphanumerics,
 '.',
 '_'
 and
 '-')
  case None = throw new InvalidTopicException(topic
 name
 
 +
 topic
 +
  is illegal,  contains a character other than ASCII
 alphanumerics

Re: [Discussion] Limitations on topic names

2015-07-10 Thread Todd Palino
I had to go look this one up again to make sure -
https://issues.apache.org/jira/browse/KAFKA-495

The only valid character names for topics are alphanumeric, underscore, and
dash. A period is not supposed to be a valid character to use. If you're
seeing them, then one of two things have happened:

1) You have topic names that are grandfathered in from before that patch
2) The patch is not working properly and there is somewhere in the broker
that the standard is not being enforced.

-Todd


On Fri, Jul 10, 2015 at 12:13 PM, Brock Noland br...@apache.org wrote:

 On Fri, Jul 10, 2015 at 11:34 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
  Hi Kafka Fans,
 
  If you have one topic named kafka_lab_2 and the other named
  kafka.lab.2, the topic level metrics will be named kafka_lab_2 for
  both, effectively making it impossible to monitor them properly.
 
  The reason this happens is that using . in topic names is pretty
  common, especially as a way to group topics into data centers,
  relevant apps, etc - basically a work-around to our current lack of
  name spaces. However, most metric monitoring systems using . to
  annotate hierarchy, so to avoid issues around metric names, Kafka
  replaces the . in the name with an underscore.
 
  This generates good metric names, but creates the problem with name
 collisions.
 
  I'm wondering if it makes sense to simply limit the range of
  characters permitted in a topic name and disallow _? Obviously
  existing topics will need to remain as is, which is a bit awkward.

 Interesting problem! Many if not most users I personally am aware of
 use _ as a separator in topic names. I am sure that many users would
 be quite surprised by this limitation. With that said, I am sure
 they'd transition accordingly.

 
  If anyone has better backward-compatible solutions to this, I'm all ears
 :)
 
  Gwen



Re: [Discussion] Limitations on topic names

2015-07-10 Thread Todd Palino
This was definitely changed at some point after KAFKA-495. The question is
when and why.

Here's the relevant code from that patch:

===
--- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178)
+++ core/src/main/scala/kafka/utils/Topic.scala (working copy)
@@ -21,24 +21,21 @@
 import util.matching.Regex

 object Topic {
+  val legalChars = [a-zA-Z0-9_-]



-Todd


On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com wrote:

 kafka.common.Topic shows that currently period is a valid character and I
 have verified I can use kafka-topics.sh to create a new topic with a
 period.


 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently uses
 Topic.validate before writing to Zookeeper.

 Should period character support be removed? I was under the same impression
 as Gwen, that a period was used by many as a way to group topics.

 The code is pasted below since its small:

 object Topic {
   val legalChars = [a-zA-Z0-9\\._\\-]
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + +)

   val InternalTopics = Set(OffsetManager.OffsetsTopicName)

   def validate(topic: String) {
 if (topic.length = 0)
   throw new InvalidTopicException(topic name is illegal, can't be
 empty)
 else if (topic.equals(.) || topic.equals(..))
   throw new InvalidTopicException(topic name cannot be \.\ or
 \..\)
 else if (topic.length  maxNameLength)
   throw new InvalidTopicException(topic name is illegal, can't be
 longer than  + maxNameLength +  characters)

 rgx.findFirstIn(topic) match {
   case Some(t) =
 if (!t.equals(topic))
   throw new InvalidTopicException(topic name  + topic +  is
 illegal, contains a character other than ASCII alphanumerics, '.', '_' and
 '-')
   case None = throw new InvalidTopicException(topic name  + topic +
  is illegal,  contains a character other than ASCII alphanumerics, '.',
 '_' and '-')
 }
   }
 }

 On Fri, Jul 10, 2015 at 2:50 PM, Todd Palino tpal...@gmail.com wrote:

  I had to go look this one up again to make sure -
  https://issues.apache.org/jira/browse/KAFKA-495
 
  The only valid character names for topics are alphanumeric, underscore,
 and
  dash. A period is not supposed to be a valid character to use. If you're
  seeing them, then one of two things have happened:
 
  1) You have topic names that are grandfathered in from before that patch
  2) The patch is not working properly and there is somewhere in the broker
  that the standard is not being enforced.
 
  -Todd
 
 
  On Fri, Jul 10, 2015 at 12:13 PM, Brock Noland br...@apache.org wrote:
 
   On Fri, Jul 10, 2015 at 11:34 AM, Gwen Shapira gshap...@cloudera.com
   wrote:
Hi Kafka Fans,
   
If you have one topic named kafka_lab_2 and the other named
kafka.lab.2, the topic level metrics will be named kafka_lab_2 for
both, effectively making it impossible to monitor them properly.
   
The reason this happens is that using . in topic names is pretty
common, especially as a way to group topics into data centers,
relevant apps, etc - basically a work-around to our current lack of
name spaces. However, most metric monitoring systems using . to
annotate hierarchy, so to avoid issues around metric names, Kafka
replaces the . in the name with an underscore.
   
This generates good metric names, but creates the problem with name
   collisions.
   
I'm wondering if it makes sense to simply limit the range of
characters permitted in a topic name and disallow _? Obviously
existing topics will need to remain as is, which is a bit awkward.
  
   Interesting problem! Many if not most users I personally am aware of
   use _ as a separator in topic names. I am sure that many users would
   be quite surprised by this limitation. With that said, I am sure
   they'd transition accordingly.
  
   
If anyone has better backward-compatible solutions to this, I'm all
  ears
   :)
   
Gwen
  
 



 --
 Grant Henke
 Solutions Consultant | Cloudera
 ghe...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke



Re: [Discussion] Limitations on topic names

2015-07-10 Thread Todd Palino
Thanks, Grant. That seems like a bad solution to the problem that John ran
into in that ticket. It's entirely reasonable to have separate validators
for separate things, but it seems like the choice was made to try and mash
it all into a single validator. And it appears that despite the commentary
in the ticket at the time, Gwen's identified a very good reason to be
restrictive about topic naming.

-Todd



On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com wrote:

 Found it was added here: https://issues.apache.org/jira/browse/KAFKA-697

 On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com wrote:

  This was definitely changed at some point after KAFKA-495. The question
 is
  when and why.
 
  Here's the relevant code from that patch:
 
  ===
  --- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178)
  +++ core/src/main/scala/kafka/utils/Topic.scala (working copy)
  @@ -21,24 +21,21 @@
   import util.matching.Regex
 
   object Topic {
  +  val legalChars = [a-zA-Z0-9_-]
 
 
 
  -Todd
 
 
  On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com
 wrote:
 
   kafka.common.Topic shows that currently period is a valid character
 and I
   have verified I can use kafka-topics.sh to create a new topic with a
   period.
  
  
   AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently
 uses
   Topic.validate before writing to Zookeeper.
  
   Should period character support be removed? I was under the same
  impression
   as Gwen, that a period was used by many as a way to group topics.
  
   The code is pasted below since its small:
  
   object Topic {
 val legalChars = [a-zA-Z0-9\\._\\-]
 private val maxNameLength = 255
 private val rgx = new Regex(legalChars + +)
  
 val InternalTopics = Set(OffsetManager.OffsetsTopicName)
  
 def validate(topic: String) {
   if (topic.length = 0)
 throw new InvalidTopicException(topic name is illegal, can't be
   empty)
   else if (topic.equals(.) || topic.equals(..))
 throw new InvalidTopicException(topic name cannot be \.\ or
   \..\)
   else if (topic.length  maxNameLength)
 throw new InvalidTopicException(topic name is illegal, can't be
   longer than  + maxNameLength +  characters)
  
   rgx.findFirstIn(topic) match {
 case Some(t) =
   if (!t.equals(topic))
 throw new InvalidTopicException(topic name  + topic +  is
   illegal, contains a character other than ASCII alphanumerics, '.', '_'
  and
   '-')
 case None = throw new InvalidTopicException(topic name  +
 topic
  +
is illegal,  contains a character other than ASCII alphanumerics,
 '.',
   '_' and '-')
   }
 }
   }
  
   On Fri, Jul 10, 2015 at 2:50 PM, Todd Palino tpal...@gmail.com
 wrote:
  
I had to go look this one up again to make sure -
https://issues.apache.org/jira/browse/KAFKA-495
   
The only valid character names for topics are alphanumeric,
 underscore,
   and
dash. A period is not supposed to be a valid character to use. If
  you're
seeing them, then one of two things have happened:
   
1) You have topic names that are grandfathered in from before that
  patch
2) The patch is not working properly and there is somewhere in the
  broker
that the standard is not being enforced.
   
-Todd
   
   
On Fri, Jul 10, 2015 at 12:13 PM, Brock Noland br...@apache.org
  wrote:
   
 On Fri, Jul 10, 2015 at 11:34 AM, Gwen Shapira 
  gshap...@cloudera.com
 wrote:
  Hi Kafka Fans,
 
  If you have one topic named kafka_lab_2 and the other named
  kafka.lab.2, the topic level metrics will be named kafka_lab_2
  for
  both, effectively making it impossible to monitor them properly.
 
  The reason this happens is that using . in topic names is
 pretty
  common, especially as a way to group topics into data centers,
  relevant apps, etc - basically a work-around to our current lack
 of
  name spaces. However, most metric monitoring systems using . to
  annotate hierarchy, so to avoid issues around metric names, Kafka
  replaces the . in the name with an underscore.
 
  This generates good metric names, but creates the problem with
 name
 collisions.
 
  I'm wondering if it makes sense to simply limit the range of
  characters permitted in a topic name and disallow _? Obviously
  existing topics will need to remain as is, which is a bit
 awkward.

 Interesting problem! Many if not most users I personally am aware
 of
 use _ as a separator in topic names. I am sure that many users
  would
 be quite surprised by this limitation. With that said, I am sure
 they'd transition accordingly.

 
  If anyone has better backward-compatible solutions to this, I'm
 all
ears
 :)
 
  Gwen

   
  
  
  
   --
   Grant Henke
   Solutions

Re: [Discussion] Limitations on topic names

2015-07-10 Thread Todd Palino
I absolutely disagree with #2, Neha. That will break a lot of
infrastructure within LinkedIn. That said, removing . might break other
people as well, but I think we should have a clearer idea of how much usage
there is on either side.

-Todd


On Fri, Jul 10, 2015 at 2:08 PM, Neha Narkhede n...@confluent.io wrote:

 . seems natural for grouping topic names. +1 for 2) going forward only
 without breaking previously created topics with _ though that might
 require us to patch the code somewhat awkwardly till we phase it out a
 couple (purposely left vague to stay out of Ewen's wrath :-)) versions
 later.

 On Fri, Jul 10, 2015 at 2:02 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  I don't think we should break existing topics. Just disallow new
  topics going forward.
 
  Agree that having both is horrible, but we should have a solution that
  fails when you run kafka_topics.sh --create, not when you configure
  Ganglia.
 
  Gwen
 
  On Fri, Jul 10, 2015 at 1:53 PM, Jay Kreps j...@confluent.io wrote:
   Unfortunately '.' is pretty common too. I agree that it is perverse,
 but
   people seem to do it. Breaking all the topics with '.' in the name
 seems
   like it could be worse than combining metrics for people who have a
   'foo_bar' AND 'foo.bar' (and after all, having both is DEEPLY perverse,
   no?).
  
   Where is our Dean of Compatibility, Ewen, on this?
  
   -Jay
  
   On Fri, Jul 10, 2015 at 1:32 PM, Todd Palino tpal...@gmail.com
 wrote:
  
   My selfish point of view is that we do #1, as we use _ extensively
 in
   topic names here :) I also happen to think it's the right choice,
   specifically because . has more special meanings, as you noted.
  
   -Todd
  
  
   On Fri, Jul 10, 2015 at 1:30 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
Unintentional side effect from allowing IP addresses in consumer
  client
IDs :)
   
So the question is, what do we do now?
   
1) disallow .
2) disallow _
3) find a reversible way to encode . and _ that won't break
  existing
metrics
4) all of the above?
   
btw. it looks like . and .. are currently valid. Topic names are
used for directories, right? this sounds like fun :)
   
I vote for option #1, although if someone has a good idea for #3 it
will be even better.
   
Gwen
   
   
   
On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com
   wrote:
 Found it was added here:
   https://issues.apache.org/jira/browse/KAFKA-697

 On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com
   wrote:

 This was definitely changed at some point after KAFKA-495. The
   question
is
 when and why.

 Here's the relevant code from that patch:


 ===
 --- core/src/main/scala/kafka/utils/Topic.scala (revision
 1390178)
 +++ core/src/main/scala/kafka/utils/Topic.scala (working copy)
 @@ -21,24 +21,21 @@
  import util.matching.Regex

  object Topic {
 +  val legalChars = [a-zA-Z0-9_-]



 -Todd


 On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke 
 ghe...@cloudera.com
wrote:

  kafka.common.Topic shows that currently period is a valid
  character
and I
  have verified I can use kafka-topics.sh to create a new topic
  with a
  period.
 
 
  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK
  currently
uses
  Topic.validate before writing to Zookeeper.
 
  Should period character support be removed? I was under the
 same
 impression
  as Gwen, that a period was used by many as a way to group
  topics.
 
  The code is pasted below since its small:
 
  object Topic {
val legalChars = [a-zA-Z0-9\\._\\-]
private val maxNameLength = 255
private val rgx = new Regex(legalChars + +)
 
val InternalTopics = Set(OffsetManager.OffsetsTopicName)
 
def validate(topic: String) {
  if (topic.length = 0)
throw new InvalidTopicException(topic name is illegal,
  can't
   be
  empty)
  else if (topic.equals(.) || topic.equals(..))
throw new InvalidTopicException(topic name cannot be
  \.\ or
  \..\)
  else if (topic.length  maxNameLength)
throw new InvalidTopicException(topic name is illegal,
  can't
   be
  longer than  + maxNameLength +  characters)
 
  rgx.findFirstIn(topic) match {
case Some(t) =
  if (!t.equals(topic))
throw new InvalidTopicException(topic name  + topic
  + 
   is
  illegal, contains a character other than ASCII alphanumerics,
  '.',
   '_'
 and
  '-')
case None = throw new InvalidTopicException(topic name
 
  +
topic
 +
   is illegal,  contains a character other than ASCII
  alphanumerics

Re: [Discussion] Limitations on topic names

2015-07-10 Thread Todd Palino
My selfish point of view is that we do #1, as we use _ extensively in
topic names here :) I also happen to think it's the right choice,
specifically because . has more special meanings, as you noted.

-Todd


On Fri, Jul 10, 2015 at 1:30 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Unintentional side effect from allowing IP addresses in consumer client
 IDs :)

 So the question is, what do we do now?

 1) disallow .
 2) disallow _
 3) find a reversible way to encode . and _ that won't break existing
 metrics
 4) all of the above?

 btw. it looks like . and .. are currently valid. Topic names are
 used for directories, right? this sounds like fun :)

 I vote for option #1, although if someone has a good idea for #3 it
 will be even better.

 Gwen



 On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com wrote:
  Found it was added here: https://issues.apache.org/jira/browse/KAFKA-697
 
  On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com wrote:
 
  This was definitely changed at some point after KAFKA-495. The question
 is
  when and why.
 
  Here's the relevant code from that patch:
 
  ===
  --- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178)
  +++ core/src/main/scala/kafka/utils/Topic.scala (working copy)
  @@ -21,24 +21,21 @@
   import util.matching.Regex
 
   object Topic {
  +  val legalChars = [a-zA-Z0-9_-]
 
 
 
  -Todd
 
 
  On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com
 wrote:
 
   kafka.common.Topic shows that currently period is a valid character
 and I
   have verified I can use kafka-topics.sh to create a new topic with a
   period.
  
  
   AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently
 uses
   Topic.validate before writing to Zookeeper.
  
   Should period character support be removed? I was under the same
  impression
   as Gwen, that a period was used by many as a way to group topics.
  
   The code is pasted below since its small:
  
   object Topic {
 val legalChars = [a-zA-Z0-9\\._\\-]
 private val maxNameLength = 255
 private val rgx = new Regex(legalChars + +)
  
 val InternalTopics = Set(OffsetManager.OffsetsTopicName)
  
 def validate(topic: String) {
   if (topic.length = 0)
 throw new InvalidTopicException(topic name is illegal, can't be
   empty)
   else if (topic.equals(.) || topic.equals(..))
 throw new InvalidTopicException(topic name cannot be \.\ or
   \..\)
   else if (topic.length  maxNameLength)
 throw new InvalidTopicException(topic name is illegal, can't be
   longer than  + maxNameLength +  characters)
  
   rgx.findFirstIn(topic) match {
 case Some(t) =
   if (!t.equals(topic))
 throw new InvalidTopicException(topic name  + topic +  is
   illegal, contains a character other than ASCII alphanumerics, '.', '_'
  and
   '-')
 case None = throw new InvalidTopicException(topic name  +
 topic
  +
is illegal,  contains a character other than ASCII alphanumerics,
 '.',
   '_' and '-')
   }
 }
   }
  
   On Fri, Jul 10, 2015 at 2:50 PM, Todd Palino tpal...@gmail.com
 wrote:
  
I had to go look this one up again to make sure -
https://issues.apache.org/jira/browse/KAFKA-495
   
The only valid character names for topics are alphanumeric,
 underscore,
   and
dash. A period is not supposed to be a valid character to use. If
  you're
seeing them, then one of two things have happened:
   
1) You have topic names that are grandfathered in from before that
  patch
2) The patch is not working properly and there is somewhere in the
  broker
that the standard is not being enforced.
   
-Todd
   
   
On Fri, Jul 10, 2015 at 12:13 PM, Brock Noland br...@apache.org
  wrote:
   
 On Fri, Jul 10, 2015 at 11:34 AM, Gwen Shapira 
  gshap...@cloudera.com
 wrote:
  Hi Kafka Fans,
 
  If you have one topic named kafka_lab_2 and the other named
  kafka.lab.2, the topic level metrics will be named kafka_lab_2
  for
  both, effectively making it impossible to monitor them properly.
 
  The reason this happens is that using . in topic names is
 pretty
  common, especially as a way to group topics into data centers,
  relevant apps, etc - basically a work-around to our current
 lack of
  name spaces. However, most metric monitoring systems using .
 to
  annotate hierarchy, so to avoid issues around metric names,
 Kafka
  replaces the . in the name with an underscore.
 
  This generates good metric names, but creates the problem with
 name
 collisions.
 
  I'm wondering if it makes sense to simply limit the range of
  characters permitted in a topic name and disallow _? Obviously
  existing topics will need to remain as is, which is a bit
 awkward.

 Interesting problem! Many if not most users I personally am aware

Re: [Discussion] Limitations on topic names

2015-07-10 Thread Todd Palino
Yes, agree here. While it can be a little confusing, I think it's better to
just disallow the character for all creation steps so you can't create more
bad topic names, but not try and enforce it for topics that already
exist. Anyone who is in that situation is already there with regards to
metrics, and so they are probably making sure they don't collide names that
only differ in the use of _ and .. However, we don't want a new user to
accidentally do it.

-Todd


On Fri, Jul 10, 2015 at 2:02 PM, Gwen Shapira gshap...@cloudera.com wrote:

 I don't think we should break existing topics. Just disallow new
 topics going forward.

 Agree that having both is horrible, but we should have a solution that
 fails when you run kafka_topics.sh --create, not when you configure
 Ganglia.

 Gwen

 On Fri, Jul 10, 2015 at 1:53 PM, Jay Kreps j...@confluent.io wrote:
  Unfortunately '.' is pretty common too. I agree that it is perverse, but
  people seem to do it. Breaking all the topics with '.' in the name seems
  like it could be worse than combining metrics for people who have a
  'foo_bar' AND 'foo.bar' (and after all, having both is DEEPLY perverse,
  no?).
 
  Where is our Dean of Compatibility, Ewen, on this?
 
  -Jay
 
  On Fri, Jul 10, 2015 at 1:32 PM, Todd Palino tpal...@gmail.com wrote:
 
  My selfish point of view is that we do #1, as we use _ extensively in
  topic names here :) I also happen to think it's the right choice,
  specifically because . has more special meanings, as you noted.
 
  -Todd
 
 
  On Fri, Jul 10, 2015 at 1:30 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Unintentional side effect from allowing IP addresses in consumer
 client
   IDs :)
  
   So the question is, what do we do now?
  
   1) disallow .
   2) disallow _
   3) find a reversible way to encode . and _ that won't break
 existing
   metrics
   4) all of the above?
  
   btw. it looks like . and .. are currently valid. Topic names are
   used for directories, right? this sounds like fun :)
  
   I vote for option #1, although if someone has a good idea for #3 it
   will be even better.
  
   Gwen
  
  
  
   On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com
  wrote:
Found it was added here:
  https://issues.apache.org/jira/browse/KAFKA-697
   
On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com
  wrote:
   
This was definitely changed at some point after KAFKA-495. The
  question
   is
when and why.
   
Here's the relevant code from that patch:
   
===
--- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178)
+++ core/src/main/scala/kafka/utils/Topic.scala (working copy)
@@ -21,24 +21,21 @@
 import util.matching.Regex
   
 object Topic {
+  val legalChars = [a-zA-Z0-9_-]
   
   
   
-Todd
   
   
On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com
   wrote:
   
 kafka.common.Topic shows that currently period is a valid
 character
   and I
 have verified I can use kafka-topics.sh to create a new topic
 with a
 period.


 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK
 currently
   uses
 Topic.validate before writing to Zookeeper.

 Should period character support be removed? I was under the same
impression
 as Gwen, that a period was used by many as a way to group
 topics.

 The code is pasted below since its small:

 object Topic {
   val legalChars = [a-zA-Z0-9\\._\\-]
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + +)

   val InternalTopics = Set(OffsetManager.OffsetsTopicName)

   def validate(topic: String) {
 if (topic.length = 0)
   throw new InvalidTopicException(topic name is illegal,
 can't
  be
 empty)
 else if (topic.equals(.) || topic.equals(..))
   throw new InvalidTopicException(topic name cannot be
 \.\ or
 \..\)
 else if (topic.length  maxNameLength)
   throw new InvalidTopicException(topic name is illegal,
 can't
  be
 longer than  + maxNameLength +  characters)

 rgx.findFirstIn(topic) match {
   case Some(t) =
 if (!t.equals(topic))
   throw new InvalidTopicException(topic name  + topic
 + 
  is
 illegal, contains a character other than ASCII alphanumerics,
 '.',
  '_'
and
 '-')
   case None = throw new InvalidTopicException(topic name 
 +
   topic
+
  is illegal,  contains a character other than ASCII
 alphanumerics,
   '.',
 '_' and '-')
 }
   }
 }

 On Fri, Jul 10, 2015 at 2:50 PM, Todd Palino tpal...@gmail.com
   wrote:

  I had to go look this one up again to make sure -
  https://issues.apache.org/jira/browse/KAFKA-495
 
  The only valid character names for topics are alphanumeric,
   underscore,
 and
  dash. A period

Re: [ANNOUNCE] New Committer

2015-07-06 Thread Todd Palino
Congrats, Gwen! It's definitely deserved.

-Todd


 On Jul 6, 2015, at 6:08 PM, Joe Stein joe.st...@stealth.ly wrote:
 
 I am pleased to announce that the Apache Kafka PMC has voted to invite Gwen
 Shapira as a committer and Gwen has accepted.
 
 Please join me on welcoming and congratulating Gwen.
 
 Thanks for the contribution both in the project (code, email, etc, etc,
 etc) and in throughout the community too(other projects, conferences, etc,
 etc, etc). I look forward to your continued contributions and much more to
 come!
 
 ~ Joe Stein
 - - - - - - - - - - - - - - - - - - -
 [image: Logo-Black.jpg]
  http://www.elodina.net
http://www.stealth.ly
 - - - - - - - - - - - - - - - - - - -


[jira] [Commented] (KAFKA-2252) Socket connection closing is logged, but not corresponding opening of socket

2015-06-15 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14586437#comment-14586437
 ] 

Todd Palino commented on KAFKA-2252:


I moved the normal connection closed message to DEBUG level in KAFKA-2175. In 
general, we should not log either normal connection close or open at INFO 
level. There is far too much noise due to these messages.

 Socket connection closing is logged, but not corresponding opening of socket
 

 Key: KAFKA-2252
 URL: https://issues.apache.org/jira/browse/KAFKA-2252
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg

 (using 0.8.2.1)
 We see a large number of Closing socket connection logging to the broker 
 logs, e.g.:
 {code}
 2015-06-04 16:49:30,262  INFO [kafka-network-thread-27330-2] 
 network.Processor - Closing socket connection to /1.2.3.4.
 2015-06-04 16:49:30,262  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /5.6.7.8.
 2015-06-04 16:49:30,695  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /9.10.11.12.
 2015-06-04 16:49:31,465  INFO [kafka-network-thread-27330-1] 
 network.Processor - Closing socket connection to /13.14.15.16.
 2015-06-04 16:49:31,806  INFO [kafka-network-thread-27330-0] 
 network.Processor - Closing socket connection to /17.18.19.20.
 2015-06-04 16:49:31,842  INFO [kafka-network-thread-27330-2] 
 network.Processor - Closing socket connection to /21.22.23.24.
 {code}
 However, we have no corresponding logging for when these connections are 
 established.  Consequently, it's not very useful to see a flood of closed 
 connections, etc.  I'd think we'd want to see the corresponding 'connection 
 established' messages, also logged as INFO.
 Occasionally, we see a flood of the above messages, and have no idea as to 
 whether it indicates a problem, etc.  (Sometimes it might be due to an 
 ongoing rolling restart, or a change in the Zookeeper cluster).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-21 Dynamic Configuration

2015-05-28 Thread Todd Palino
 TopicConfigManager to
 be
 ConfigOverrideManager and have it handle all the override types we will
 have? I think I may just be unclear on what you are proposing...
 
 -Jay
 
 On Mon, May 18, 2015 at 1:34 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:
 
 Yeah, that was just a typo. I've fixed it. Thanks for calling it out.
 
 In KIP-4, I believe we have 3 types of requests: CreateTopic,
 AlterTopic
 and DeleteTopic. The topic configs are a sub-type of the Create and
 Alter
 commands. I think it would be nice to simply have a AlterConfig
 command
 that can alter any type of config rather than having a specific
 ClientConfig.
 
 AlterConfig = [ConfigType [AddedConfigEntry] [DeletedConfig]]
 ConfigType = string
 AddedConfigEntry = ConfigKey ConfigValue
ConfigKey = string
ConfigValue = string
 DeletedConfig = string
 
 The downside of this approach is that we will have 2 separate ways of
 changing topic configs (AlterTopic and AlterConfig). While a general
 AlterConfig only makes sense if we plan to have more than two types
 of
 entity configs.. it's definitely more future proof. Thoughts?
 
 Aditya
 
 
 From: Todd Palino [tpal...@gmail.com]
 Sent: Monday, May 18, 2015 12:39 PM
 To: dev@kafka.apache.org
 Subject: Re: [VOTE] KIP-21 Dynamic Configuration
 
 Agree with Jun here on the JSON format. I think your intention was
 likely
 to have actual JSON here and it was just a typo in the wiki?
 
 -Todd
 
 On Mon, May 18, 2015 at 12:07 PM, Jun Rao j...@confluent.io wrote:
 
 Aditya,
 
 Another thing to consider. In KIP-4, we are adding a new RPC
 request
 to
 change and retrieve topic configs. Do we want to add a similar RPC
 request
 to change configs per client id? If so, do we want to introduce a
 separate
 new request or have a combined new request for both topic and
 client
 id
 level config changes?
 
 A minor point in the wiki, for the json format in ZK, we should
 change
 {X1=Y1,
 X2=Y2..} to a json map, right?
 
 Thanks,
 
 Jun
 
 
 On Mon, May 18, 2015 at 9:48 AM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration
 
 Aditya
 


Re: Can we throw away ResponsesBeingSent Gauge?

2015-05-14 Thread Todd Palino
I don't believe we're using the ResponsesBeingSent information at all. I
know we use NetworkProcessorAvgIdlePercent to keep track of the utilization
of the pool.

-Todd


On Thu, May 14, 2015 at 11:36 AM, Joel Koshy jjkosh...@gmail.com wrote:

 I'm also not sure how useful it is, but there is some discussion on it
 here:

 https://issues.apache.org/jira/browse/KAFKA-1597

 On Thu, May 14, 2015 at 09:26:02PM +0300, Gwen Shapira wrote:
  Hi,
 
  As part of KAFKA-1928, we need to consolidate existing SocketServer
 metrics
  into Selector metrics.
 
  The two metrics are:
  1. NetworkProcessorAvgIdlePercent
  2. ResponsesBeingSent
 
  NetworkProcessorAvgIdlePercent  is currently implemented as Meter, which
  seems to translate nicely into KafkaMetrics Rate.
 
  ResponsesBeingSent is a gauge, which doesn't have a direct KafkaMetrics
  equivalent, but should be easy to add.
 
  However, I'm considering dropping ResponsesBeingSent gauge completely - I
  find it useless.
  Selector currently has Rates for requests-sent, responses-sent,
 bytes-sent,
  bytes-received.
 
  Anyone finds ResponsesBeingSent useful and wants to make a passionate
 plea
  to keep it around?
 
  (I CCed the people I think will care, in case they are not watching the
  list)
 
  Gwen




Re: [DISCUSS] KIP-21 Configuration Management

2015-05-10 Thread Todd Palino
I've been watching this discussion for a while, and I have to jump in and
side with Gwen here. I see no benefit to putting the configs into Zookeeper
entirely, and a lot of downside. The two biggest problems I have with this
are:

1) Configuration management. OK, so you can write glue for Chef to put
configs into Zookeeper. You also need to write glue for Puppet. And
Cfengine. And everything else out there. Files are an industry standard
practice, they're how just about everyone handles it, and there's reasons
for that, not just it's the way it's always been done.

2) Auditing. Configuration files can easily be managed in a source
repository system which tracks what changes were made and who made them. It
also easily allows for rolling back to a previous version. Zookeeper does
not.

I see absolutely nothing wrong with putting the quota (client) configs and
the topic config overrides in Zookeeper, and keeping everything else
exactly where it is, in the configuration file. To handle configurations
for the broker that can be changed at runtime without a restart, you can
use the industry standard practice of catching SIGHUP and rereading the
configuration file at that point.

-Todd


On Sun, May 10, 2015 at 4:00 AM, Gwen Shapira gshap...@cloudera.com wrote:

 I am still not clear about the benefits of managing configuration in
 ZooKeeper vs. keeping the local file and adding a refresh mechanism
 (signal, protocol, zookeeper, or other).

 Benefits of staying with configuration file:
 1. In line with pretty much any Linux service that exists, so admins have a
 lot of related experience.
 2. Much smaller change to our code-base, so easier to patch, review and
 test. Lower risk overall.

 Can you walk me over the benefits of using Zookeeper? Especially since it
 looks like we can't get rid of the file entirely?

 Gwen

 On Thu, May 7, 2015 at 3:33 AM, Jun Rao j...@confluent.io wrote:

  One of the Chef users confirmed that Chef integration could still work if
  all configs are moved to ZK. My rough understanding of how Chef works is
  that a user first registers a service host with a Chef server. After
 that,
  a Chef client will be run on the service host. The user can then push
  config changes intended for a service/host to the Chef server. The server
  is then responsible for pushing the changes to Chef clients. Chef clients
  support pluggable logic. For example, it can generate a config file that
  Kafka broker will take. If we move all configs to ZK, we can customize
 the
  Chef client to use our config CLI to make the config changes in Kafka. In
  this model, one probably doesn't need to register every broker in Chef
 for
  the config push. Not sure if Puppet works in a similar way.
 
  Also for storing the configs, we probably can't store the broker/global
  level configs in Kafka itself (e.g. in a special topic). The reason is
 that
  in order to start a broker, we likely need to make some broker level
 config
  changes (e.g., the default log.dir may not be present, the default port
 may
  not be available, etc). If we need a broker to be up to make those
 changes,
  we get into this chicken and egg problem.
 
  Thanks,
 
  Jun
 
  On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Sorry I missed the call today :)
  
   I think an additional requirement would be:
   Make sure that traditional deployment tools (Puppet, Chef, etc) are
 still
   capable of managing Kafka configuration.
  
   For this reason, I'd like the configuration refresh to be pretty close
 to
   what most Linux services are doing to force a reload of configuration.
   AFAIK, this involves handling HUP signal in the main thread to reload
   configuration. Then packaging scripts can add something nice like
  service
   kafka reload.
  
   (See Apache web server:
   https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101)
  
   Gwen
  
  
   On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com
 wrote:
  
Good discussion. Since we will be talking about this at 11am, I
 wanted
to organize these comments into requirements to see if we are all on
the same page.
   
REQUIREMENT 1: Needs to accept dynamic config changes. This needs to
be general enough to work for all configs that we envision may need
 to
accept changes at runtime. e.g., log (topic), broker, client
 (quotas),
etc.. possible options include:
- ZooKeeper watcher
- Kafka topic
- Direct RPC to controller (or config coordinator)
   
The current KIP is really focused on REQUIREMENT 1 and I think that
 is
reasonable as long as we don't come up with something that requires
significant re-engineering to support the other requirements.
   
REQUIREMENT 2: Provide consistency of configs across brokers (modulo
per-broker overrides) or at least be able to verify consistency.
 What
this effectively means is that config changes must be seen by all
brokers eventually and 

[jira] [Updated] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-2175:
---
Status: Patch Available  (was: Open)

 Reduce server log verbosity at info level
 -

 Key: KAFKA-2175
 URL: https://issues.apache.org/jira/browse/KAFKA-2175
 Project: Kafka
  Issue Type: Improvement
  Components: controller, zkclient
Affects Versions: 0.8.3
Reporter: Todd Palino
Assignee: Todd Palino
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-2175.patch


 Currently, the broker logs two messages at INFO level that should be at a 
 lower level. This serves only to fill up log files on disk, and can cause 
 performance issues due to synchronous logging as well.
 The first is the Closing socket connection message when there is no error. 
 This should be reduced to debug level. The second is the message that ZkUtil 
 writes when updating the partition reassignment JSON. This message contains 
 the entire JSON blob and should never be written at info level. In addition, 
 there is already a message in the controller log stating that the ZK node has 
 been updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino reassigned KAFKA-2175:
--

Assignee: Todd Palino  (was: Neha Narkhede)

 Reduce server log verbosity at info level
 -

 Key: KAFKA-2175
 URL: https://issues.apache.org/jira/browse/KAFKA-2175
 Project: Kafka
  Issue Type: Improvement
  Components: controller, zkclient
Affects Versions: 0.8.3
Reporter: Todd Palino
Assignee: Todd Palino
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-2175.patch


 Currently, the broker logs two messages at INFO level that should be at a 
 lower level. This serves only to fill up log files on disk, and can cause 
 performance issues due to synchronous logging as well.
 The first is the Closing socket connection message when there is no error. 
 This should be reduced to debug level. The second is the message that ZkUtil 
 writes when updating the partition reassignment JSON. This message contains 
 the entire JSON blob and should never be written at info level. In addition, 
 there is already a message in the controller log stating that the ZK node has 
 been updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-2175:
--

 Summary: Reduce server log verbosity at info level
 Key: KAFKA-2175
 URL: https://issues.apache.org/jira/browse/KAFKA-2175
 Project: Kafka
  Issue Type: Improvement
  Components: controller, zkclient
Affects Versions: 0.8.3
Reporter: Todd Palino
Assignee: Neha Narkhede
Priority: Minor


Currently, the broker logs two messages at INFO level that should be at a lower 
level. This serves only to fill up log files on disk, and can cause performance 
issues due to synchronous logging as well.

The first is the Closing socket connection message when there is no error. 
This should be reduced to debug level. The second is the message that ZkUtil 
writes when updating the partition reassignment JSON. This message contains the 
entire JSON blob and should never be written at info level. In addition, there 
is already a message in the controller log stating that the ZK node has been 
updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-04-20 Thread Todd Palino
I tend to agree with Parth's point here. Most ACL systems I run into have deny 
and allow. In general, you have a default policy of allow, then you follow your 
rules stopping at the first line that matches. If you would like a default deny 
policy, you have a bunch of allow rules and your last rule is deny all. This 
says everyone listed is allowed. Everyone else is denied. If you instead want 
a default allow, you have a list of deny rules and the last rule is allow 
all. This says everyone listed is denied. Everyone else is allowed.

I think leaving out a full rule set would be a mistake, as it makes the 
assumption that you know what all the use cases are. I think all it will really 
mean is that we will see a KIP before long to fix it.

-Todd

 On Apr 20, 2015, at 7:13 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
 Thanks for clarifying the logic.
 
 I'm +0 on the deny thing.
 IMO, its not really needed, but if you think its important, I don't
 object to having it in.
 
 Gwen
 
 On Mon, Apr 20, 2015 at 7:07 PM, Parth Brahmbhatt
 pbrahmbh...@hortonworks.com wrote:
 The iptables on unix supports the DENY operator, not that it should
 matter. The deny operator can also be used to specify ³allow user1 to READ
 from topic1 from all hosts but host1,host2². Again we could add a host
 group semantic and extra complexity around that, not sure if its worth it.
 In addition with DENY operator you are now not forced to create a special
 group just to support the authorization use case. I am not convinced that
 the operator it self is really all that confusing. There are 3 practical
 use cases:
 - Resource with no acl what so ever - allow access to everyone ( just for
 backward compatibility, I would much rather fail close and force users to
 explicitly grant acls that allows access to all users.)
 - Resource with some acl attached - only users that have a matching allow
 acl are allowed (i.e. ³allow READ access to topic1 to user1 from all
 hosts², only user1 has READ access and no other user has access of any
 kind)
 - Resource with some allow and some deny acl attached - users are allowed
 to perform operation only when they satisfy allow acl and do not have
 conflicting deny acl. Users that have no acl(allow or deny) will still not
 have any access. (i.e. ³allow READ access to topic1 to user1 from all
 hosts except host1 and host², only user1 has access but not from host1 an
 host2)
 
 I think we need to make a decision on deny primarily because with
 introduction of acl management API, Acl is now a public class that will be
 used by Ranger/Santry and other authroization providers. In Current design
 the acl has a permissionType enum field with possible values of Allow and
 Deny. If we chose to remove deny we can assume all acls to be of allow
 type and remove the permissionType field completely.
 
 Thanks
 Parth
 
 On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
 I think thats how its done in pretty much any system I can think of.
 


Re: [DISCUSS] KIP-18 - JBOD Support

2015-04-11 Thread Todd Palino
I'm with you on that, Jay, although I don't consider your case common. More
common is that there are things on all the disks at their normal retention,
and you add something new. That said, it doesn't really matter because what
you're illustrating is a valid concern. Automatic balancing would probable
alleviate any issues coming from a bad initial placement.

Jumping back an email, yes, it is a really big deal that the entire broker
fails when one mount point fails. It is much better to run with degraded
performance than it is to run with degraded replication, and disks fail
constantly. If I have 10% of my machines offline, Kafka's not going to last
very long at LinkedIn ;)

-Todd


On Sat, Apr 11, 2015 at 11:58 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Todd,

 The problem you pointed out is real. Unfortunately, placing by available
 size at creation time actually makes things worse. The original plan was to
 place new partitions on the disk with the most space, but consider a common
 case:
  disk 1: 500M
  disk 2: 0M
 Now say you are creating 10 partitions for what will be a massively large
 topic. You will place them all on disk 2 as it has the most space, but then
 immediately you will discover that that was a bad idea as those partitions
 get huge. I think the current balancing by number of partitions is better
 than this because at least you get basically random assignment.

 I think to solve the problem you describe we need to do active
 rebalancing--predicting the size of partitions ahead of time is basically
 impossible.

 I think having the controller be unaware of disks is probably a good thing.
 So the controller would balance partitions over servers and the server
 would be responsible for balancing over disks.

 I think this kind of balancing is possible though not totally trivial.
 Basically a background thread in LogManager would decide that there was too
 much skew in data assignment (actually debatable whether it is data size or
 I/O throughput that you should optimize) and would then try to rebalance.
 To do the rebalance it would do a background copy of the log from the
 current disk to the new disk, then it would take the partition offline and
 delete the old log, then bring the partition back using the new log and
 catch it back up off the leader.

 -Jay

 On Thu, Apr 9, 2015 at 8:19 AM, Todd Palino tpal...@gmail.com wrote:

  I think this is a good start. We've been discussing JBOD internally, so
  it's good to see a discussion going externally about it as well.
 
  The other big blocker to using JBOD is the lack of intelligent partition
  assignment logic, and the lack of tools to adjust it. The controller is
 not
  smart enough to take into account disk usage when deciding to place a
  partition, which may not be a big concern (at the controller level, you
  worry about broker usage, not individual mounts). However, the broker is
  not smart enough to do it either, when looking at the local directories.
 It
  just round robins.
 
  In addition, there is no tool available to move a partition from one
 mount
  point to another. So in the case that you do have a hot disk, you cannot
 do
  anything about it without shutting down the broker and doing a manual
 move
  of the log segments.
 
  -Todd
 
 
  On Thu, Apr 9, 2015 at 5:36 AM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Hi,
  
   Let me start discussion thread for KIP-18 - JBOD Support.
  
   Link to wiki:
  
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-18+-+JBOD+Support
  
  
   Thanks,
   Andrii Biletskyi
  
 



Re: [DISCUSS] KIP-18 - JBOD Support

2015-04-09 Thread Todd Palino
I think this is a good start. We've been discussing JBOD internally, so
it's good to see a discussion going externally about it as well.

The other big blocker to using JBOD is the lack of intelligent partition
assignment logic, and the lack of tools to adjust it. The controller is not
smart enough to take into account disk usage when deciding to place a
partition, which may not be a big concern (at the controller level, you
worry about broker usage, not individual mounts). However, the broker is
not smart enough to do it either, when looking at the local directories. It
just round robins.

In addition, there is no tool available to move a partition from one mount
point to another. So in the case that you do have a hot disk, you cannot do
anything about it without shutting down the broker and doing a manual move
of the log segments.

-Todd


On Thu, Apr 9, 2015 at 5:36 AM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Hi,

 Let me start discussion thread for KIP-18 - JBOD Support.

 Link to wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-18+-+JBOD+Support


 Thanks,
 Andrii Biletskyi



[jira] [Commented] (KAFKA-1342) Slow controlled shutdowns can result in stale shutdown requests

2015-04-07 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14484278#comment-14484278
 ] 

Todd Palino commented on KAFKA-1342:


Bump

I think we need to revive this. We have a safe shutdown bit of wrapper code 
we use which relies on an external resource that we should eliminate. It would 
be better to provide a safe shutdown option within Kafka itself without that 
wrapper (i.e. do not shut down unless your under replicated count is 0). 
However, this is not possible without serialized shutdown at the controller 
level. We can't allow a second broker to shut down until the first broker has 
completed its shutdown process. Then the second broker can check the URP count 
and be allowed to proceed.

 Slow controlled shutdowns can result in stale shutdown requests
 ---

 Key: KAFKA-1342
 URL: https://issues.apache.org/jira/browse/KAFKA-1342
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Joel Koshy
Priority: Blocker
 Fix For: 0.9.0


 I don't think this is a bug introduced in 0.8.1., but triggered by the fact
 that controlled shutdown seems to have become slower in 0.8.1 (will file a
 separate ticket to investigate that). When doing a rolling bounce, it is
 possible for a bounced broker to stop all its replica fetchers since the
 previous PID's shutdown requests are still being shutdown.
 - 515 is the controller
 - Controlled shutdown initiated for 503
 - Controller starts controlled shutdown for 503
 - The controlled shutdown takes a long time in moving leaders and moving
   follower replicas on 503 to the offline state.
 - So 503's read from the shutdown channel times out and a new channel is
   created. It issues another shutdown request.  This request (since it is a
   new channel) is accepted at the controller's socket server but then waits
   on the broker shutdown lock held by the previous controlled shutdown which
   is still in progress.
 - The above step repeats for the remaining retries (six more requests).
 - 503 hits SocketTimeout exception on reading the response of the last
   shutdown request and proceeds to do an unclean shutdown.
 - The controller's onBrokerFailure call-back fires and moves 503's replicas
   to offline (not too important in this sequence).
 - 503 is brought back up.
 - The controller's onBrokerStartup call-back fires and moves its replicas
   (and partitions) to online state. 503 starts its replica fetchers.
 - Unfortunately, the (phantom) shutdown requests are still being handled and
   the controller sends StopReplica requests to 503.
 - The first shutdown request finally finishes (after 76 minutes in my case!).
 - The remaining shutdown requests also execute and do the same thing (sends
   StopReplica requests for all partitions to
   503).
 - The remaining requests complete quickly because they end up not having to
   touch zookeeper paths - no leaders left on the broker and no need to
   shrink ISR in zookeeper since it has already been done by the first
   shutdown request.
 - So in the end-state 503 is up, but effectively idle due to the previous
   PID's shutdown requests.
 There are some obvious fixes that can be made to controlled shutdown to help
 address the above issue. E.g., we don't really need to move follower
 partitions to Offline. We did that as an optimization so the broker falls
 out of ISR sooner - which is helpful when producers set required.acks to -1.
 However it adds a lot of latency to controlled shutdown. Also, (more
 importantly) we should have a mechanism to abort any stale shutdown process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-17 - Add HighwaterMarkOffset to OffsetFetchResponse

2015-03-26 Thread Todd Palino
I agree with Jun here, that it would make it easier to do lag checking.
However, for individual checks it's really not that much trouble to do the
second request. If you're doing a lot of lag checking (like every consumer
and every topic) where the scale would start to make a difference, I would
argue that you should not be using individual OffsetFetchRequests to do it.
You should instead consume the __consumer_offsets topic to get the
committed offsets, in which case you're still back to getting the high
watermark another way (either through an OffsetRequest or through JMX).

-Todd


On Thu, Mar 26, 2015 at 7:54 AM, Jun Rao j...@confluent.io wrote:

 Grant,

 In addition to FetchRequest, currently we have another way to get the high
 watermark through OffsetRequest (

 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
 ).
 OffsetRequest is a read-only request and is much lighter than FetchRequest.
 This is what monitoring tools like ConsumerOffsetChecker is using now.

 By returning the high watermark in the OffsetFetchRequest, we can implement
 tools like ConsumerOffsetChecker a bit simpler: instead of making two
 requests, the tool just needs to make one request. However, I am not sure
 if it makes a big difference.

 Thanks,

 Jun


 On Tue, Mar 24, 2015 at 8:13 PM, Grant Henke ghe...@cloudera.com wrote:

  Here is an initial proposal to add HighwaterMarkOffset to the
  OffsetFetchResponse:
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-17+-+Add+HighwaterMarkOffset+to+OffsetFetchResponse
 
  I can add a jira and more implementation details if the
  initial proposal has interest.
 
  Thanks,
  Grant
  --
  Grant Henke
  Solutions Consultant | Cloudera
  ghe...@cloudera.com | 920-980-8979
  twitter.com/ghenke http://twitter.com/gchenke |
  linkedin.com/in/granthenke
 



Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing

2015-03-12 Thread Todd Palino
I understand the desire to not bloat this one change with too much more
work, and it's a good change to start with. That said, I have one note on
your comments:

I don't agree with this because right now you get back the current state of
the partitions so you can (today) write whatever logic you want (with the
information that is there). (with regards to pluggable schemes)

I think this is a really bad place to be. While we're in agreement that
reshuffling the cluster from one scheme to another should not be automated,
the creation and placement of new topics and partitions must be, and you
can't rely on an external process to handle that for you. That leaves gaps
in what is getting done, and a large failure point. Where to put a
partition has to be a decision that the controller makes correctly (where
correctly is defined as the way I want the cluster balanced) upon
creation, and not something that we come in and fix after the fact.

We're in agreement that this should be a new KIP, and that the sourcing and
handling of metadata for something like rack-awareness is non-trivial and
is going to require a lot of discussion. Plus, we're going to not only want
to be rack-aware but also balanced by partition size and/or count. That's
going to be somewhat tricky to get right.

-Todd


On Wed, Mar 11, 2015 at 12:12 PM, Joe Stein joe.st...@stealth.ly wrote:

 Sorry for not catching up on this thread earlier, I wanted to-do this
 before the KIP got its updates so we could discuss if need be and not waste
 more time re-writing/working things that folks have issues with or such. I
 captured all the comments so far here with responses.

  So fair assignment by count (taking into account the current partition
 count of each broker) is very good. However, it's worth noting that all
 partitions are not created equal. We have actually been performing more
 rebalance work based on the partition size on disk, as given equal
 retention of all topics, the size on disk is a better indicator of the
 amount of traffic a partition gets, both in terms of storage and network
 traffic. Overall, this seems to be a better balance.

 Agreed though this is out of scope (imho) for what the motivations for the
 KIP were. The motivations section is blank (that is on me) but honestly it
 is because we did all the development, went back and forth with Neha on the
 testing and then had to back it all into the KIP process... Its a
 time/resource/scheduling and hope to update this soon on the KIP ... all of
 this is in the JIRA and code patch so its not like it is not there just not
 in the place maybe were folks are looking since we changed where folks
 should look.

 Initial cut at Motivations: the --generate is not used by a lot of folks
 because they don't trust it. Issues such as giving different results
 sometimes when you run it. Also other feedback from the community that it
 does not account for specific uses cases like adding new brokers and
 removing brokers (which is where that patch started
 https://issues.apache.org/jira/browse/KAFKA-1678 but then we changed it
 after review into just --rebalance
 https://issues.apache.org/jira/browse/KAFKA-1792). The use case for add
 and
 remove brokers is one that happens in AWS and auto scailing. There are
 other reasons for this too of course.  The goal originally was to make what
 folks are already coding today (with the output of  available in the
 project for the community. Based on the discussion in the JIRA with Neha we
 all agreed that making it be a faire rebalance would fulfill both uses
 cases.

  In addition to this, I think there is very much a need to have Kafka be
 rack-aware. That is, to be able to assure that for a given cluster, you
 never assign all replicas for a given partition in the same rack. This
 would allow us to guard against maintenances or power failures that affect
 a full rack of systems (or a given switch).

 Agreed, this though I think is out of scope for this change and something
 we can also do in the future. There is more that we have to figure out for
 rack aware specifically answering how do we know what rack the broker is
 on. I really really (really) worry that we keep trying to put too much
 into a single change the discussions go into rabbit holes and good
 important features (that are community driven) that could get out there
 will get bogged down with different uses cases and scope creep. So, I think
 rack awareness is its own KIP that has two parts... setting broker rack and
 rebalancing for that. That features doesn't invalidate the need for
 --rebalance but can be built on top of it.

  I think it would make sense to implement the reassignment logic as a
 pluggable component. That way it would be easy to select a scheme when
 performing a reassignment (count, size, rack aware). Configuring a default
 scheme for a cluster would allow for the brokers to create new topics and
 partitions in compliance with the requested policy.

 I don't agree with 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-12 Thread Todd Palino
We're getting off the KIP a little bit here, but while I understand the
idea of not honoring the request timeout, I think that the broker ignoring
it without telling the client is not a good implementation. It gets back to
a larger discussion, which I was mentioning, of just negotiating with (or
at least notifying) the client of important configuration details for the
cluster. Any configuration, like a minimum timeout value or the maximum
message size, which have to be configured the same on both the broker and
the client, or where the client has a required minimum value for a setting
(like fetch size), should be clearly stated by the broker in a handshake.

Don't you get frustrated when you sign up for an account on a website,
select a nice secure password, and then get told after submission Your
password is invalid - it must be between 6 and 14 characters. Then on the
next submission you get told Your password is invalid - it can only
contain certain symbols, and they don't tell you what symbols are allowed?
Why didn't they just tell you all that up front so you could get it right
the first time?

-Todd


On Wed, Mar 11, 2015 at 10:22 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Todd,

 Yeah it is kind of weird to do the quota check after taking a request, but
 since the penalty is applied during that request and it just delays you to
 the right rate, I think it isn't exactly wrong. I admit it is weird,
 though.

 What you say about closing the connection makes sense. The issue is that
 our current model for connections is totally transient. The clients are
 supposed to handle any kind of transient connection loss and just
 re-establish. So basically all existing clients would likely just retry all
 the same whether you closed the connection or not, so at the moment there
 would be no way to know a retried request is actually a retry.

 Your point about the REST proxy is a good one, I don't think we had
 considered that. Currently the java producer just has a single client.id
 for all requests so the rest proxy would be a single client. But actually
 what you want is the original sender to be the client. This is technically
 very hard to do because the client will actually be batching records from
 all senders together into one request so the only way to get the client id
 right would be to make a new producer for each rest proxy client and this
 would mean a lot of memory and connections. This needs thought, not sure
 what solution there is.

 I am not 100% convinced we need to obey the request timeout. The
 configuration issue actually isn't a problem because the request timeout is
 sent with the request so the broker actually knows it now even without a
 handshake. However the question is, if someone sets a pathologically low
 request timeout do we need to obey it? and if so won't that mean we can't
 quota them? I claim the answer is no! I think we should redefine request
 timeout to mean replication timeout, which is actually what it is today.
 Even today if you interact with a slow server it may take longer than that
 timeout (say because the fs write queues up for a long-ass time). I think
 we need a separate client timeout which should be fairly long and unlikely
 to be hit (default to 30 secs or something).

 -Jay

 On Tue, Mar 10, 2015 at 10:12 AM, Todd Palino tpal...@gmail.com wrote:

  Thanks, Jay. On the interface, I agree with Aditya (and you, I believe)
  that we don't need to expose the public API contract at this time, but
  structuring the internal logic to allow for it later with low cost is a
  good idea.
 
  Glad you explained the thoughts on where to hold requests. While my gut
  reaction is to not like processing a produce request that is over quota,
 it
  makes sense to do it that way if you are going to have your quota action
 be
  a delay.
 
  On the delay, I see your point on the bootstrap cases. However, one of
 the
  places I differ, and part of the reason that I prefer the error, is that
 I
  would never allow a producer who is over quota to resend a produce
 request.
  A producer should identify itself at the start of it's connection, and at
  that point if it is over quota, the broker would return an error and
 close
  the connection. The same goes for a consumer. I'm a fan, in general, of
  pushing all error cases and handling down to the client and doing as
 little
  special work to accommodate those cases on the broker side as possible.
 
  A case to consider here is what does this mean for REST endpoints to
 Kafka?
  Are you going to hold the HTTP connection open as well? Is the endpoint
  going to queue and hold requests?
 
  I think the point that we can only delay as long as the producer's
 timeout
  is a valid one, especially given that we do not have any means for the
  broker and client to negotiate settings, whether that is timeouts or
  message sizes or anything else. There are a lot of things that you have
 to
  know when setting up a Kafka client about what

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-09 Thread Todd Palino
First, a couple notes on this...

3 - I generally agree with the direction of not pre-optimizing. However, in
this case I'm concerned about the calculation of the cost of doing plugins
now vs. trying to refactor the code to do it later. It would seem to me
that doing it up front will have less friction. If we wait to do plugins
later, it will probably mean changing a lot of related code which will be
significantly more work. We've spent a lot of time talking about various
implementations, and I think it not unreasonable to believe that what one
group wants initially is not going to solve even most cases, as it will
vary by use case.

4 - I really disagree with this. Slowing down a request means that you're
going to hold onto it in the broker. This takes up resources and time, and
is generally not the way other services handle quota violations. In
addition you are causing potential problems with the clients by taking a
call that's supposed to return as quickly as possible and making it take a
long time. This increases latency and deprives the client of the ability to
make good decisions about what to do. By sending an error back to the
client you inform them of what the problem is, and you allow the client to
make an intelligent decision, such as queuing to send later, sending to
another resource, or handling anything from their upstreams differently.

You're absolutely right that throwing back an immediate error has the
potential to turn a quota violation into a different problem for a badly
behaved client. But OS and upstream networking tools can see a problem
based on a layer 4 issue (rapidly reconnecting client) rather than layers
above. Out of the options provided, I think A is the correct choice. B
seems to be the most work (you have the delay, and the client still has to
handle errors and backoff), and C is what I disagree with doing.

I would also like to see a provision for allowing the client to query its
quota status within the protocol. I think we should allow for a request (or
information within an existing response) where the client can ask what its
current quota status is. This will allow for the clients to manage their
quotas, and it will allow for emitting metrics on the client side for quota
status (rather than relying on the server-side metrics, which tends to put
the responsibility in the wrong place).


-Todd


On Sun, Mar 8, 2015 at 10:52 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Adi,

 Great write-up. Here are some comments:

 1. I don't think you need a way to disable quotas on a per-client basis,
 that is just the equivalent of setting the quota to be infinite, right?

 2. I agree that the configuration problem is a general part of doing
 dynamic configuration, and it is right to separate that into the config
 KIP. But Joe's proposal currently doesn't provide nearly what you need in
 its current form--it doesn't even handle client-id based configuration, let
 alone the notification mechanism you would need to update your quota--so we
 really need to give completely explicitly how that KIP is going to solve
 this problem.

 3. Custom quota implementations: let's do this later. Pluggability comes
 with a high cost and we want to try really hard to avoid it. So in the
 future if we have a really solid case for an alternative quota approach
 let's see if we can't improve the current approach and stick with one good
 implementation. If we really can't then let's add a plugin system. I think
 doing it now is premature.

 4. I think the ideal quota action from the users point of view is just to
 slow down the writer or reader transparently to match their capacity
 allocation. Let's try to see if we can make that work.

 I think immediate error can be ruled out entirely because it depends on the
 client properly backing off. In cases where they don't we may actually make
 things worse. Given the diversity of clients I think this is probably not
 going to happen.

 The only downside to just delaying the request that was pointed out was
 that if the delay exceeded the request timeout the user might retry. This
 is true but it applies to any approach that delays requests (both B and C).
 I think with any sane request timeout and quota the per request delay we
 induce will be way lower (otherwise you would be hitting the timeout all
 the time just due to linux I/O variance, in which case you can't really
 complain).

 5. We need to explain the relationship between the quota stuff in the
 metrics package and this. We need to either remove that stuff or use it. We
 can't have two quota things. Since quota fundamentally apply to windowed
 metrics, I would suggest doing whatever improvements to that to make it
 usable for quotas.

 6. I don't think the quota manager interface is really what we need if I'm
 understanding it correctly. You give a method
   T extends RequestOrResponse boolean check(T request);
 But how would you implement this method? It seems like it would basically
 internally 

[jira] [Created] (KAFKA-2012) Broker should automatically handle corrupt index files

2015-03-09 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-2012:
--

 Summary: Broker should automatically handle corrupt index files
 Key: KAFKA-2012
 URL: https://issues.apache.org/jira/browse/KAFKA-2012
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Todd Palino


We had a bunch of unclean system shutdowns (power failure), which caused 
corruption on our disks holding log segments in many cases. While the broker is 
handling the log segment corruption properly (truncation), it is having 
problems with corruption in the index files. Additionally, this only seems to 
be happening on some startups (while we are upgrading).

The broker should just do what I do when I hit a corrupt index file - remove it 
and rebuild it.

2015/03/09 17:58:53.873 FATAL [KafkaServerStartable] [main] [kafka-server] [] 
Fatal error during KafkaServerStartable startup. Prepare to shutdown
java.lang.IllegalArgumentException: requirement failed: Corrupt index found, 
index file 
(/export/content/kafka/i001_caches/__consumer_offsets-39/.index)
 has non-zero size but the last offset is -2121629628 and the base offset is 0
at scala.Predef$.require(Predef.scala:233)
at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352)
at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:185)
at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:184)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.Log.loadSegments(Log.scala:184)
at kafka.log.Log.init(Log.scala:82)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:141)
at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing

2015-03-05 Thread Todd Palino
I would not think that partitions moving would cause any orphaned messages
like that. I would be more concerned about what happens when you change the
default on a running cluster from one scheme to another. Would we want to
support some kind of automated reassignment of existing partitions
(personally - no. I want to trigger that manually because it is a very disk
and network intensive process)?

-Todd

On Wed, Mar 4, 2015 at 7:33 PM, Tong Li liton...@us.ibm.com wrote:



 Todd,
 I think plugable design is good with solid default. The only issue I
 feel is when you use one and switch to another, will we end up with some
 unread messages hanging around and no one thinks or knows it is their
 responsibility to take care of them?

 Thanks.

 Tong

 Sent from my iPhone

  On Mar 5, 2015, at 10:46 AM, Todd Palino tpal...@gmail.com wrote:
 
  Apologize for the late comment on this...
 
  So fair assignment by count (taking into account the current partition
  count of each broker) is very good. However, it's worth noting that all
  partitions are not created equal. We have actually been performing more
  rebalance work based on the partition size on disk, as given equal
  retention of all topics, the size on disk is a better indicator of the
  amount of traffic a partition gets, both in terms of storage and network
  traffic. Overall, this seems to be a better balance.
 
  In addition to this, I think there is very much a need to have Kafka be
  rack-aware. That is, to be able to assure that for a given cluster, you
  never assign all replicas for a given partition in the same rack. This
  would allow us to guard against maintenances or power failures that
 affect
  a full rack of systems (or a given switch).
 
  I think it would make sense to implement the reassignment logic as a
  pluggable component. That way it would be easy to select a scheme when
  performing a reassignment (count, size, rack aware). Configuring a
 default
  scheme for a cluster would allow for the brokers to create new topics and
  partitions in compliance with the requested policy.
 
  -Todd
 
 
  On Thu, Jan 22, 2015 at 10:13 PM, Joe Stein joe.st...@stealth.ly
 wrote:
 
   I will go back through the ticket and code and write more up. Should be
   able to-do that sometime next week. The intention was to not replace
   existing functionality by issue a WARN on use. The following version it
 is
   released we could then deprecate it... I will fix the KIP for that too.
  
   On Fri, Jan 23, 2015 at 12:34 AM, Neha Narkhede n...@confluent.io
 wrote:
  
Hey Joe,
   
1. Could you add details to the Public Interface section of the KIP?
 This
should include the proposed changes to the partition reassignment
 tool.
Also, maybe the new option can be named --rebalance instead of
--re-balance?
2. It makes sense to list --decommission-broker as part of this KIP.
Similarly, shouldn't we also have an --add-broker option? The way I
 see
this is that there are several events when a partition reassignment
 is
required. Before this functionality is automated on the broker, the
 tool
will generate an ideal replica placement for each such event. The
 users
should merely have to specify the nature of the event e.g. adding a
   broker
or decommissioning an existing broker or merely rebalancing.
3. If I understand the KIP correctly, the upgrade plan for this
 feature
includes removing the existing --generate option on the partition
reassignment tool in 0.8.3 while adding all the new options in the
 same
release. Is that correct?
   
Thanks,
Neha
   
On Thu, Jan 22, 2015 at 9:23 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
   
 Ditto on this one. Can you give the algorithm we want to implement?

 Also I think in terms of scope this is just proposing to change the
   logic
 in ReassignPartitionsCommand? I think we've had the discussion
 various
 times on the mailing list that what people really want is just for
   Kafka
to
 do it's best to balance data in an online fashion (for some
 definition
   of
 balance). i.e. if you add a new node partitions would slowly
 migrate to
it,
 and if a node dies, partitions slowly migrate off it. This could
 potentially be more work, but I'm not sure how much more. Has
 anyone
 thought about how to do it?

 -Jay

 On Wed, Jan 21, 2015 at 10:11 PM, Joe Stein joe.st...@stealth.ly
wrote:

  Posted a KIP for --re-balance for partition assignment in
   reassignment
  tool.
 
 
 

   
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New
 +reassignment+partition+logic+for+re-balancing
 
  JIRA https://issues.apache.org/jira/browse/KAFKA-1792
 
  While going through the KIP I thought of one thing from the JIRA
 that
we
  should change. We should preserve --generate to be existing
functionality
  for the next release

Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing

2015-03-04 Thread Todd Palino
Apologize for the late comment on this...

So fair assignment by count (taking into account the current partition
count of each broker) is very good. However, it's worth noting that all
partitions are not created equal. We have actually been performing more
rebalance work based on the partition size on disk, as given equal
retention of all topics, the size on disk is a better indicator of the
amount of traffic a partition gets, both in terms of storage and network
traffic. Overall, this seems to be a better balance.

In addition to this, I think there is very much a need to have Kafka be
rack-aware. That is, to be able to assure that for a given cluster, you
never assign all replicas for a given partition in the same rack. This
would allow us to guard against maintenances or power failures that affect
a full rack of systems (or a given switch).

I think it would make sense to implement the reassignment logic as a
pluggable component. That way it would be easy to select a scheme when
performing a reassignment (count, size, rack aware). Configuring a default
scheme for a cluster would allow for the brokers to create new topics and
partitions in compliance with the requested policy.

-Todd


On Thu, Jan 22, 2015 at 10:13 PM, Joe Stein joe.st...@stealth.ly wrote:

 I will go back through the ticket and code and write more up. Should be
 able to-do that sometime next week. The intention was to not replace
 existing functionality by issue a WARN on use. The following version it is
 released we could then deprecate it... I will fix the KIP for that too.

 On Fri, Jan 23, 2015 at 12:34 AM, Neha Narkhede n...@confluent.io wrote:

  Hey Joe,
 
  1. Could you add details to the Public Interface section of the KIP? This
  should include the proposed changes to the partition reassignment tool.
  Also, maybe the new option can be named --rebalance instead of
  --re-balance?
  2. It makes sense to list --decommission-broker as part of this KIP.
  Similarly, shouldn't we also have an --add-broker option? The way I see
  this is that there are several events when a partition reassignment is
  required. Before this functionality is automated on the broker, the tool
  will generate an ideal replica placement for each such event. The users
  should merely have to specify the nature of the event e.g. adding a
 broker
  or decommissioning an existing broker or merely rebalancing.
  3. If I understand the KIP correctly, the upgrade plan for this feature
  includes removing the existing --generate option on the partition
  reassignment tool in 0.8.3 while adding all the new options in the same
  release. Is that correct?
 
  Thanks,
  Neha
 
  On Thu, Jan 22, 2015 at 9:23 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Ditto on this one. Can you give the algorithm we want to implement?
  
   Also I think in terms of scope this is just proposing to change the
 logic
   in ReassignPartitionsCommand? I think we've had the discussion various
   times on the mailing list that what people really want is just for
 Kafka
  to
   do it's best to balance data in an online fashion (for some definition
 of
   balance). i.e. if you add a new node partitions would slowly migrate to
  it,
   and if a node dies, partitions slowly migrate off it. This could
   potentially be more work, but I'm not sure how much more. Has anyone
   thought about how to do it?
  
   -Jay
  
   On Wed, Jan 21, 2015 at 10:11 PM, Joe Stein joe.st...@stealth.ly
  wrote:
  
Posted a KIP for --re-balance for partition assignment in
 reassignment
tool.
   
   
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New+reassignment+partition+logic+for+re-balancing
   
JIRA https://issues.apache.org/jira/browse/KAFKA-1792
   
While going through the KIP I thought of one thing from the JIRA that
  we
should change. We should preserve --generate to be existing
  functionality
for the next release it is in. If folks want to use --re-balance then
great, it just won't break any upgrade paths, yet.
   
/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/
   
  
 
 
 
  --
  Thanks,
  Neha
 



[jira] [Created] (KAFKA-1987) Potential race condition in partition creation

2015-02-26 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-1987:
--

 Summary: Potential race condition in partition creation
 Key: KAFKA-1987
 URL: https://issues.apache.org/jira/browse/KAFKA-1987
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Todd Palino
Assignee: Neha Narkhede


I am finding that there appears to be a race condition when creating 
partitions, with replication factor 2 or higher, between the creation of the 
partition on the leader and the follower. What appears to be happening is that 
the follower is processing the command to create the partition before the 
leader does, and when the follower starts the replica fetcher, it fails with an 
UnknownTopicOrPartitionException.

The situation is that I am creating a large number of partitions on a cluster, 
preparing it for data being mirrored from another cluster. So there are a 
sizeable number of create and alter commands being sent sequentially. 
Eventually, the replica fetchers start up properly. But it seems like the 
controller should issue the command to create the partition to the leader, wait 
for confirmation, and then issue the command to create the partition to the 
followers.

2015/02/26 21:11:50.413 INFO [LogManager] [kafka-request-handler-12] 
[kafka-server] [] Created log for partition [topicA,30] in /path_to/i001_caches 
with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 
6, segment.bytes - 268435456, flush.ms - 1, delete.retention.ms - 
8640, index.interval.bytes - 4096, retention.bytes - -1, 
min.insync.replicas - 1, cleanup.policy - delete, 
unclean.leader.election.enable - true, segment.ms - 4320, 
max.message.bytes - 100, flush.messages - 2, 
min.cleanable.dirty.ratio - 0.5, retention.ms - 8640, segment.jitter.ms 
- 0}.
2015/02/26 21:11:50.418 WARN [Partition] [kafka-request-handler-12] 
[kafka-server] [] Partition [topicA,30] on broker 1551: No checkpointed 
highwatermark is found for partition [topicA,30]
2015/02/26 21:11:50.418 INFO [ReplicaFetcherManager] [kafka-request-handler-12] 
[kafka-server] [] [ReplicaFetcherManager on broker 1551] Removed fetcher for 
partitions [topicA,30]
2015/02/26 21:11:50.418 INFO [Log] [kafka-request-handler-12] [kafka-server] [] 
Truncating log topicA-30 to offset 0.
2015/02/26 21:11:50.450 INFO [ReplicaFetcherManager] [kafka-request-handler-12] 
[kafka-server] [] [ReplicaFetcherManager on broker 1551] Added fetcher for 
partitions List([[topicA,30], initOffset 0 to broker 
id:1555,host:host1555.example.com,port:10251] )
2015/02/26 21:11:50.615 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2015/02/26 21:11:50.616 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2015/02/26 21:11:50.618 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2015/02/26 21:11:50.620 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2015/02/26 21:11:50.621 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints

2015-02-12 Thread Todd Palino
The idea is more about isolating the intra-cluster traffic from the normal
clients as much as possible. There's a couple situations we've seen where
this would be useful that I can think of immediately:

1) Normal operation - just having the intra-cluster traffic on a separate
network interface would allow it to not get overwhelmed by something like a
bootstrapping client who is saturating the network interface. We see this
fairly often, where the replication falls behind because of heavy traffic
from one application. We can always adjust the network threads, but
segregating the traffic is the first step.

2) Isolation in case of an error - We have had situations, more than once,
where we are needing to rebuild a cluster after a catastrophic problem and
the clients are causing that process to take too long, or are causing
additional failures. This has mostly come into play with file descriptor
limits in the past, but it's certainly not the only situation. Constantly
reconnecting clients continue to cause the brokers to fall over while we
are trying to recover a down cluster. The only solution was to firewall off
all the clients temporarily. This is a great deal more complicated if the
brokers and the clients are all operating over the same port.

Now, that said, quotas can be a partial solution to this. I don't want to
jump the gun on that discussion (because it's going to come up separately
and in more detail), but it is possible to structure quotas in a way that
will allow the intra-cluster replication to continue to function in the
case of high load. That would partially address case 1, but it does nothing
for case 2. Additionally, I think it is also desirable to segregate the
traffic even with quotas, so that regardless of the client load, the
cluster itself is able to be healthy.

-Todd


On Thu, Feb 12, 2015 at 11:38 AM, Jun Rao j...@confluent.io wrote:

 Todd,

 Could you elaborate on the benefit for having a separate endpoint for
 intra-cluster communication? Is it mainly for giving intra-cluster requests
 a high priority? At this moment, having a separate endpoint just means that
 the socket connection for the intra-cluster communication is handled by a
 separate acceptor thread. The processing of the requests from the network
 and the handling of the requests are still shared by a single thread pool.
 So, if any of the thread pool is exhausted, the intra-cluster requests will
 still be delayed. We can potentially change this model, but this requires
 more work.

 An alternative is to just rely on quotas. Intra-cluster requests will be
 exempt from any kind of throttling.

 Gwen,

 I agree that defaulting wire.protocol.version to the current version is
 probably better. It just means that we need to document the migration path
 for previous versions.

 Thanks,

 Jun


 On Wed, Feb 11, 2015 at 6:33 PM, Todd Palino tpal...@gmail.com wrote:

  Thanks, Gwen. This looks good to me as far as the wire protocol
 versioning
  goes. I agree with you on defaulting to the new wire protocol version for
  new installs. I think it will also need to be very clear (to general
  installer of Kafka, and not just developers) in documentation when the
 wire
  protocol version changes moving forwards, and what the risk/benefit of
  changing to the new version is.
 
  Since a rolling upgrade of the intra-cluster protocol is supported, will
 a
  rolling downgrade work as well? Should a flaw (bug, security, or
 otherwise)
  be discovered after upgrade, is it possible to change the
  wire.protocol.version
  back to 0.8.2 and do a rolling bounce?
 
  On the host/port/protocol specification, specifically the ZK config
 format,
  is it possible to have an un-advertised endpoint? I would see this as
  potentially useful if you wanted to have an endpoint that you are
 reserving
  for intra-cluster communication, and you would prefer to not have it
  advertised at all. Perhaps it is blocked by a firewall rule or other
  authentication method. This could also allow you to duplicate a security
  protocol type but segregate it on a different port or interface (if it is
  unadvertised, there is no ambiguity to the clients as to which endpoint
  should be selected). I believe I asked about that previously, and I
 didn't
  track what the final outcome was or even if it was discussed further.
 
 
  -Todd
 
 
  On Wed, Feb 11, 2015 at 4:38 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Added Jun's notes to the KIP (Thanks for explaining so clearly, Jun. I
  was
   clearly struggling with this...) and removed the reference to
   use.new.wire.protocol.
  
   On Wed, Feb 11, 2015 at 4:19 PM, Joel Koshy jjkosh...@gmail.com
 wrote:
  
The description that Jun gave for (2) was the detail I was looking
 for
- Gwen can you update the KIP with that for completeness/clarity?
   
I'm +1 as well overall. However, I think it would be good if we also
get an ack from someone who is more experienced on the operations
 side

Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints

2015-02-11 Thread Todd Palino
Thanks, Gwen. This looks good to me as far as the wire protocol versioning
goes. I agree with you on defaulting to the new wire protocol version for
new installs. I think it will also need to be very clear (to general
installer of Kafka, and not just developers) in documentation when the wire
protocol version changes moving forwards, and what the risk/benefit of
changing to the new version is.

Since a rolling upgrade of the intra-cluster protocol is supported, will a
rolling downgrade work as well? Should a flaw (bug, security, or otherwise)
be discovered after upgrade, is it possible to change the wire.protocol.version
back to 0.8.2 and do a rolling bounce?

On the host/port/protocol specification, specifically the ZK config format,
is it possible to have an un-advertised endpoint? I would see this as
potentially useful if you wanted to have an endpoint that you are reserving
for intra-cluster communication, and you would prefer to not have it
advertised at all. Perhaps it is blocked by a firewall rule or other
authentication method. This could also allow you to duplicate a security
protocol type but segregate it on a different port or interface (if it is
unadvertised, there is no ambiguity to the clients as to which endpoint
should be selected). I believe I asked about that previously, and I didn't
track what the final outcome was or even if it was discussed further.


-Todd


On Wed, Feb 11, 2015 at 4:38 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Added Jun's notes to the KIP (Thanks for explaining so clearly, Jun. I was
 clearly struggling with this...) and removed the reference to
 use.new.wire.protocol.

 On Wed, Feb 11, 2015 at 4:19 PM, Joel Koshy jjkosh...@gmail.com wrote:

  The description that Jun gave for (2) was the detail I was looking for
  - Gwen can you update the KIP with that for completeness/clarity?
 
  I'm +1 as well overall. However, I think it would be good if we also
  get an ack from someone who is more experienced on the operations side
  (say, Todd) to review especially the upgrade plan.
 
  On Wed, Feb 11, 2015 at 09:40:50AM -0800, Jun Rao wrote:
   +1 for proposed changes in 1 and 2.
  
   1. The impact is that if someone uses SimpleConsumer and references
  Broker
   explicitly, the application needs code change to compile with 0.8.3.
  Since
   SimpleConsumer is not widely used, breaking the API in SimpleConsumer
 but
   maintaining overall code cleanness seems to be a better tradeoff.
  
   2. For clarification, the issue is the following. In 0.8.3, we will be
   evolving the wire protocol of UpdateMedataRequest (to send info about
   endpoints for different security protocols). Since this is used in
   intra-cluster communication, we need to do the upgrade in two steps.
 The
   idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2.
  When
   upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After
  step
   1, all brokers will be capable for processing the new protocol in
 0.8.3,
   but without actually using it. In step 2, we
   configure wire.protocol.version to 0.8.3 in each broker and do another
   rolling restart. After step 2, all brokers will start using the new
   protocol in 0.8.3. Let's say that in the next release 0.9, we are
  changing
   the intra-cluster wire protocol again. We will do the similar thing:
   defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can
  upgrade
   from 0.8.3 to 0.9 in two steps. For people who want to upgrade from
 0.8.2
   to 0.9 directly, they will have to configure wire.protocol.version to
  0.8.2
   first and then do the two-step upgrade to 0.9.
  
   Gwen,
  
   In KIP2, there is still a reference to use.new.protocol. This needs to
 be
   removed. Also, would it be better to use
  intra.cluster.wire.protocol.version
   since this only applies to the wire protocol among brokers?
  
   Others,
  
   The patch in KAFKA-1809 is almost ready. It would be good to wrap up
 the
   discussion on KIP2 soon. So, if you haven't looked at this KIP, please
  take
   a look and send your comments.
  
   Thanks,
  
   Jun
  
  
   On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
Hi Kafka Devs,
   
While reviewing the patch for KAFKA-1809, we came across two
 questions
that we are interested in hearing the community out on.
   
1. This patch changes the Broker class and adds a new class
BrokerEndPoint that behaves like the previous broker.
   
While technically kafka.cluster.Broker is not part of the public API,
it is returned by javaapi, used with the SimpleConsumer.
   
Getting replicas from PartitionMetadata will now return
 BrokerEndPoint
instead of Broker. All method calls remain the same, but since we
return a new type, we break the API.
   
Note that this breakage does not prevent upgrades - existing
SimpleConsumers will continue working (because we are
wire-compatible).
The only thing that won't work is 

Re: [SECURITY DISCUSSION] Refactoring Brokers to support multiple ports

2014-12-02 Thread Todd Palino
Leaving aside the rest of this, on #1, while I consider being able to
advertise the ports a good idea, I don't want to lose the ability for
maintaining multiple ports with the same protocol. For example, being able
to have 2 plaintext ports, one that only brokers communicate over, and one
that general clients use. The ability to segregate this traffic is useful
in a number of situations, over and above other controls like quotas, and
is relatively easy to do once we support multiple ports.

-Todd


On Tue, Dec 2, 2014 at 1:58 PM, Jun Rao jun...@gmail.com wrote:

 Hi, Gwen,

 Thanks for writing up the wiki. Some comments below.

 1. To make it more general, should we support a binding and an advertised
 host for each protocol (e.g. plaintext, ssl, etc)? We will also need to
 figure out how to specify the wildcard binding host.

 2. Broker format change in ZK
 The broker registration in ZK needs to store the host/port for all
 protocols. We will need to bump up the version of the broker registration
 data. Since this is an intra-cluster protocol change, we need an extra
 config for rolling upgrades. So, in the first step, each broker is upgraded
 and is ready to parse brokers registered in the new format, but not
 registering using the new format yet. In the second step, when that new
 config is enabled, the broker will register using the new format.

 3. Wire protocol changes. Currently, the broker info is used in the
 following requests/responses: TopicMetadataResponse ,
 ConsumerMetadataResponse, LeaderAndIsrRequest  and UpdateMetadataRequest.
 3.1 TopicMetadataResponse and ConsumerMetadataResponse:
 These two are used between the clients and the broker. I am not sure that
 we need to make a wire protocol change for them. Currently, the protocol
 includes a single host/port pair in those responses. Based on the type of
 the port on which the request is sent, it seems that we can just pick the
 corresponding host and port to include in the response.
 3.2 UpdateMetadataRequest:
 This is used between the controller and the broker. Since each broker needs
 to cache the host/port of all protocols, we need to make a wire protocol
 change. We also need to change the broker format in MetadataCache
 accordingly. This is also an intra-cluster protocol change. So the upgrade
 path will need to follow that in item 2.
 3.3 LeaderAndIsrRequest:
 This is also used between the controller and the broker. The receiving
 broker uses the host/port of the leader replica to send the fetch request.
 I am not sure if we need a wire protocol change in this case. I was
 imagining that we will just add a new broker config, sth like
 replication.socket.protocol. Base on this config, the controller will pick
 the right host/port to include in the request.

 4. Should we plan to support security just on the new java clients?
 Supporting security in both the old and the new clients adds more work and
 gives people less incentive to migrate off the old clients.

 Thanks,

 Jun


 On Tue, Nov 25, 2014 at 11:13 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Hi Everyone,
 
  One of the pre-requisites we have for supporting multiple security
  protocols (SSL, Kerberos) is to support them on separate ports.
 
  This is done in KAFKA-1684 (The SSL Patch), but that patch addresses
  several different issues - Multiple ports, enriching the channels, SSL
  implementation - which makes it more challenging to review and to test.
 
  I'd like to split this into 3 separate patches: multi-port brokers,
  enriching SocketChannel, and  the actual security implementations.
 
  Since even just adding support for multiple listeners per broker is
  somewhat involved and touches multiple components, I wrote a short design
  document that covers the necessary changes and the upgrade process:
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers
 
  Comments are more than welcome :)
 
  If this is acceptable, hope to have a patch ready in few days.
 
  Gwen Shapira
 



Re: [SECURITY DISCUSSION] Refactoring Brokers to support multiple ports

2014-12-02 Thread Todd Palino
Thanks. Just to add more detail as to why I think it's a good idea to be
able to segregate traffic like that...

One reason would just be to separate out the intra-cluster communication to
a separate port. This can allow you to run it over a different interface
(for example, you could have dedicated links for the brokers to do
replication), though you could do that with a wildcard bind and multiple
interfaces with a little care on the broker config. It also allows for
firewalling off clients from a cluster while leaving the broker
communication intact. We've run into this situation a couple times where it
was advantageous to do this during recovery to prevent things like runaway
file descriptor allocation. It also gives the ability to use QOS tools to
work with networking guarantees for broker traffic.

Maybe it's enough to be able to specify a special protocol name to do this.
Meaning you could configure a port with protocol broker (or something
like that) which could be used by the brokers if it exists. Otherwise they
would default back to something else.

-Todd

-TOdd

On Tue, Dec 2, 2014 at 3:23 PM, Gwen Shapira gshap...@cloudera.com wrote:

 The good news is that I'm not using a map to represent protocol list.

 The bad news is that as mentioned in the wiki: producers, consumers
 and broker configuration specify security protocol, so we'll know
 which host/port pair to use for communication. This implicitly assumes
 there will be only one host/port per protocol.

 I'll think a bit on how this assumption can be relaxed.

 Gwen

 On Tue, Dec 2, 2014 at 3:14 PM, Todd Palino tpal...@gmail.com wrote:
  Gwen - just my reading of what we could expect from what you had
 described
  so far. Without having gone into implementation details, there didn't
 seem
  to be anything that would block the ability to run two ports with the
 same
  protocol configuration, at least from the way you proposed to represent
 it
  in Zookeeper. I'd just like to not go down the path of using something
 like
  a map for representing the protocol list that would eliminate this
  possibility, unless there's a pretty good reason to do so.
 
  -Todd
 
  On Tue, Dec 2, 2014 at 3:00 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Hey Todd,
 
  You say lose the ability - you mean this ability actually exist now?
  Or is this something you hope the new patch will support?
 
  Gwen
 
  On Tue, Dec 2, 2014 at 2:08 PM, Todd Palino tpal...@gmail.com wrote:
   Leaving aside the rest of this, on #1, while I consider being able to
   advertise the ports a good idea, I don't want to lose the ability for
   maintaining multiple ports with the same protocol. For example, being
  able
   to have 2 plaintext ports, one that only brokers communicate over, and
  one
   that general clients use. The ability to segregate this traffic is
 useful
   in a number of situations, over and above other controls like quotas,
 and
   is relatively easy to do once we support multiple ports.
  
   -Todd
  
  
   On Tue, Dec 2, 2014 at 1:58 PM, Jun Rao jun...@gmail.com wrote:
  
   Hi, Gwen,
  
   Thanks for writing up the wiki. Some comments below.
  
   1. To make it more general, should we support a binding and an
  advertised
   host for each protocol (e.g. plaintext, ssl, etc)? We will also need
 to
   figure out how to specify the wildcard binding host.
  
   2. Broker format change in ZK
   The broker registration in ZK needs to store the host/port for all
   protocols. We will need to bump up the version of the broker
  registration
   data. Since this is an intra-cluster protocol change, we need an
 extra
   config for rolling upgrades. So, in the first step, each broker is
  upgraded
   and is ready to parse brokers registered in the new format, but not
   registering using the new format yet. In the second step, when that
 new
   config is enabled, the broker will register using the new format.
  
   3. Wire protocol changes. Currently, the broker info is used in the
   following requests/responses: TopicMetadataResponse ,
   ConsumerMetadataResponse, LeaderAndIsrRequest  and
  UpdateMetadataRequest.
   3.1 TopicMetadataResponse and ConsumerMetadataResponse:
   These two are used between the clients and the broker. I am not sure
  that
   we need to make a wire protocol change for them. Currently, the
 protocol
   includes a single host/port pair in those responses. Based on the
 type
  of
   the port on which the request is sent, it seems that we can just pick
  the
   corresponding host and port to include in the response.
   3.2 UpdateMetadataRequest:
   This is used between the controller and the broker. Since each broker
  needs
   to cache the host/port of all protocols, we need to make a wire
 protocol
   change. We also need to change the broker format in MetadataCache
   accordingly. This is also an intra-cluster protocol change. So the
  upgrade
   path will need to follow that in item 2.
   3.3 LeaderAndIsrRequest:
   This is also used

Re: [SECURITY DISCUSSION] Refactoring Brokers to support multiple ports

2014-12-02 Thread Todd Palino
I don't think it necessarily has to be. I was thinking about that while I
was typing it out, and I realized that as well. With a special broker port,
my biggest concern is making sure that nothing other than a broker uses it
(and so cannot bypass security controls like authentication and
authorization). Outside of that assurance, I don't see a reason to add
overhead like TLS to the broker communication. Someone else might have one,
though.

-Todd


On Tue, Dec 2, 2014 at 4:12 PM, Jun Rao j...@confluent.io wrote:

 Todd,

 Does that imply the intra-broker protocol is always plaintext?

 Thanks,

 Jun

 On Tue, Dec 2, 2014 at 3:31 PM, Todd Palino tpal...@gmail.com wrote:

  Thanks. Just to add more detail as to why I think it's a good idea to be
  able to segregate traffic like that...
 
  One reason would just be to separate out the intra-cluster communication
 to
  a separate port. This can allow you to run it over a different interface
  (for example, you could have dedicated links for the brokers to do
  replication), though you could do that with a wildcard bind and multiple
  interfaces with a little care on the broker config. It also allows for
  firewalling off clients from a cluster while leaving the broker
  communication intact. We've run into this situation a couple times where
 it
  was advantageous to do this during recovery to prevent things like
 runaway
  file descriptor allocation. It also gives the ability to use QOS tools to
  work with networking guarantees for broker traffic.
 
  Maybe it's enough to be able to specify a special protocol name to do
 this.
  Meaning you could configure a port with protocol broker (or something
  like that) which could be used by the brokers if it exists. Otherwise
 they
  would default back to something else.
 
  -Todd
 
  -TOdd
 
  On Tue, Dec 2, 2014 at 3:23 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   The good news is that I'm not using a map to represent protocol list.
  
   The bad news is that as mentioned in the wiki: producers, consumers
   and broker configuration specify security protocol, so we'll know
   which host/port pair to use for communication. This implicitly assumes
   there will be only one host/port per protocol.
  
   I'll think a bit on how this assumption can be relaxed.
  
   Gwen
  
   On Tue, Dec 2, 2014 at 3:14 PM, Todd Palino tpal...@gmail.com wrote:
Gwen - just my reading of what we could expect from what you had
   described
so far. Without having gone into implementation details, there didn't
   seem
to be anything that would block the ability to run two ports with the
   same
protocol configuration, at least from the way you proposed to
 represent
   it
in Zookeeper. I'd just like to not go down the path of using
 something
   like
a map for representing the protocol list that would eliminate this
possibility, unless there's a pretty good reason to do so.
   
-Todd
   
On Tue, Dec 2, 2014 at 3:00 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
   
Hey Todd,
   
You say lose the ability - you mean this ability actually exist
 now?
Or is this something you hope the new patch will support?
   
Gwen
   
On Tue, Dec 2, 2014 at 2:08 PM, Todd Palino tpal...@gmail.com
  wrote:
 Leaving aside the rest of this, on #1, while I consider being able
  to
 advertise the ports a good idea, I don't want to lose the ability
  for
 maintaining multiple ports with the same protocol. For example,
  being
able
 to have 2 plaintext ports, one that only brokers communicate over,
  and
one
 that general clients use. The ability to segregate this traffic is
   useful
 in a number of situations, over and above other controls like
  quotas,
   and
 is relatively easy to do once we support multiple ports.

 -Todd


 On Tue, Dec 2, 2014 at 1:58 PM, Jun Rao jun...@gmail.com wrote:

 Hi, Gwen,

 Thanks for writing up the wiki. Some comments below.

 1. To make it more general, should we support a binding and an
advertised
 host for each protocol (e.g. plaintext, ssl, etc)? We will also
  need
   to
 figure out how to specify the wildcard binding host.

 2. Broker format change in ZK
 The broker registration in ZK needs to store the host/port for
 all
 protocols. We will need to bump up the version of the broker
registration
 data. Since this is an intra-cluster protocol change, we need an
   extra
 config for rolling upgrades. So, in the first step, each broker
 is
upgraded
 and is ready to parse brokers registered in the new format, but
 not
 registering using the new format yet. In the second step, when
 that
   new
 config is enabled, the broker will register using the new format.

 3. Wire protocol changes. Currently, the broker info is used in
 the
 following requests/responses: TopicMetadataResponse ,
 ConsumerMetadataResponse

Re: Security JIRAS

2014-10-17 Thread Todd Palino
For the moment, consumers still need to write under the /consumers tree.
Even if they are committing offsets to Kafka instead of ZK, they will need
to write owner information there when they are balancing. Eventually, you
are correct, this is going away with the new consumer.

-Todd

On Fri, Oct 17, 2014 at 10:09 AM, Arvind Mani am...@linkedin.com.invalid
wrote:


 I'm looking at Kafka Brokers authentication with ZooKeeper since this
 looks independent of other tasks.

 [AM]

 1) Is authentication required only between kafka broker and zookeeper? Can
 we assume world read so that consumers don't have to be authenticated (I
 believe in any case kafka is planning to change in such that consumers
 don't have to interact with zk)? In this case I assume kafka broker can I
 think easily create the znode with appropriate acl list - broker can be
 admin.

 2)  Zookeeper supports Kerberos authentication. Zookeeper supports SSL
 connections (version 3.4 or later) but I don't see an x509 authentication
 provider. Do we want to support x509 cert based authentication for zk?

 - Arvind




  1   2   >