[jira] [Created] (KAFKA-6559) Iterate record sets before calling Log.append
Todd Palino created KAFKA-6559: -- Summary: Iterate record sets before calling Log.append Key: KAFKA-6559 URL: https://issues.apache.org/jira/browse/KAFKA-6559 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 1.0.0 Reporter: Todd Palino Assignee: Todd Palino If a Produce request contains multiple record sets for a single topic-partition, it is better to iterate these before calling Log.append. This is because append will process all the sets together, and therefore will need to reassign offsets even if the offsets for an individual record set are properly formed. By iterating the record sets before calling append, each set can be considered on its own and potentially be appended without reassigning offsets. While the core Java producer client does not current operate this way, it is permitted by the protocol and may be used by other clients that aggregate multiple batches together to produce them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Question Kafka Reassign partitions tool
Yes, the replicas are stored in Zookeeper, so you can iterate over the information there to build a view of the cluster that you can use. If you want an example for this, take a look at the code for kafka-assigner in https://github.com/linkedin/kafka-tools. Or you can just use that tool to adjust replication factors and balance partitions. -Todd On Fri, Dec 22, 2017 at 9:21 AM, Sagar <sagarmeansoc...@gmail.com> wrote: > Hi Todd, > > Thanks for the reply. Problem is I have about 160 topics(5 partitions for > each) for which I need to increase the replication factors for. So, I would > have to find the current leader for each of the partitions and hand code > the json which would become tedious. > > The partition leader info is stored in zookeeper? If yes, then can I > probably use that get the current json and then build the json which can be > fed to the tool. I tried to search using the zk shell but couldn't find... > > Sagar. > > On Fri, Dec 22, 2017 at 7:45 PM, Todd Palino <tpal...@gmail.com> wrote: > > > Preferred replica election is naive. It will always follow the order of > the > > replicas as they are set. So if you want to set the default leader, just > > make it the first replica in the list for the partition. We build the > JASON > > this way all the time. > > > > -Todd > > > > > > On Dec 22, 2017 6:46 AM, "Sagar" <sagarmeansoc...@gmail.com> wrote: > > > > Hi, > > > > Had a question on Kafka reassign partitions tool. > > > > We have a 3 node cluster but our replication factor is set to 1. So we > have > > been looking to increase it to 3 for HA. > > > > I tried the tool on a couple of topics and it increases the replication > > factor alright. Also it doesn't change the leader as the leader still is > in > > the RAR. > > > > This is how I run it: > > > > Json which is used: > > > > {"version":1,"partitions":[ > > > > {"topic":"cric-engine.engine.eng_fow","partition":3," > > replicas":[92,51,101]}]} > > > > Earlier config for the topic > > > > kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper > > 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181 > > Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3 > > Configs: > > Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas: > > 92,51,101 Isr: 101,51,92 > > Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas: > > 92,51,101 Isr: 51,101,92 > > Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 > Isr: > > 92 > > Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas: 101 > > Isr: > > 101 > > Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 > Isr: > > 51 > > > > After running: > > > > kafka-reassign-partitions --reassignment-json-file > > increase-replication-factor.json --execute --zookeeper 10.0.4.165:2181, > > 10.0.5.139:2181,10.0.6.106:2181 > > > > partitions 3 Replicas increase: > > > > kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper > > 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181 > > Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3 > > Configs: > > Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas: > > 92,51,101 Isr: 101,51,92 > > Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas: > > 92,51,101 Isr: 51,101,92 > > Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 > Isr: > > 92 > > Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas: > > 92,51,101 Isr: 101,51,92 > > Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 > Isr: > > 51 > > > > What I wanted to know is that does it affect the preferred replica? If > you > > see the Replicas, all of them are now 92,51,101 even though the leader > has > > remained the same from before. So, if any of the broker goes down or we > > run kafka-preferred-replica-election.sh, wouldn't it move all the > leaders > > to broker 92? Is my assesment correct? > > > > If yes, then is there a way I can still do this operation by getting > leader > > for a partition first, then adding it to the replica list and then > building > > the json dynamically? > > > > Thanks! > > Sagar. > > > -- *Todd Palino* Senior Staff Engineer, Site Reliability Data Infrastructure Streaming linkedin.com/in/toddpalino
Re: Question Kafka Reassign partitions tool
Preferred replica election is naive. It will always follow the order of the replicas as they are set. So if you want to set the default leader, just make it the first replica in the list for the partition. We build the JASON this way all the time. -Todd On Dec 22, 2017 6:46 AM, "Sagar"wrote: Hi, Had a question on Kafka reassign partitions tool. We have a 3 node cluster but our replication factor is set to 1. So we have been looking to increase it to 3 for HA. I tried the tool on a couple of topics and it increases the replication factor alright. Also it doesn't change the leader as the leader still is in the RAR. This is how I run it: Json which is used: {"version":1,"partitions":[ {"topic":"cric-engine.engine.eng_fow","partition":3," replicas":[92,51,101]}]} Earlier config for the topic kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181 Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3 Configs: Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas: 92,51,101 Isr: 101,51,92 Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas: 92,51,101 Isr: 51,101,92 Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 Isr: 92 Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas: 101 Isr: 101 Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 Isr: 51 After running: kafka-reassign-partitions --reassignment-json-file increase-replication-factor.json --execute --zookeeper 10.0.4.165:2181, 10.0.5.139:2181,10.0.6.106:2181 partitions 3 Replicas increase: kafka-topics --describe --topic cric-engine.engine.eng_fow --zookeeper 10.0.4.165:2181,10.0.5.139:2181,10.0.6.106:2181 Topic:cric-engine.engine.eng_fow PartitionCount:5 ReplicationFactor:3 Configs: Topic: cric-engine.engine.eng_fow Partition: 0 Leader: 101 Replicas: 92,51,101 Isr: 101,51,92 Topic: cric-engine.engine.eng_fow Partition: 1 Leader: 51 Replicas: 92,51,101 Isr: 51,101,92 Topic: cric-engine.engine.eng_fow Partition: 2 Leader: 92 Replicas: 92 Isr: 92 Topic: cric-engine.engine.eng_fow Partition: 3 Leader: 101 Replicas: 92,51,101 Isr: 101,51,92 Topic: cric-engine.engine.eng_fow Partition: 4 Leader: 51 Replicas: 51 Isr: 51 What I wanted to know is that does it affect the preferred replica? If you see the Replicas, all of them are now 92,51,101 even though the leader has remained the same from before. So, if any of the broker goes down or we run kafka-preferred-replica-election.sh, wouldn't it move all the leaders to broker 92? Is my assesment correct? If yes, then is there a way I can still do this operation by getting leader for a partition first, then adding it to the replica list and then building the json dynamically? Thanks! Sagar.
Re: Plans to improve SSL performance in Kafka, for 0.10.x?
I will say that we've been turning on TLS consume lately (including using it for IBP, which we've been doing for a while) and we haven't seen any of the performance concerns that we originally did. Very little hit at all. -Todd On Sep 4, 2017 12:45 AM, "Ismael Juma" <ism...@juma.me.uk> wrote: > By the way, in-kernel TLS has now landed in the Linux kernel: > > https://github.com/torvalds/linux/blob/master/ > Documentation/networking/tls.txt > > There is work in progress to take advantage of that in OpenSSL: > > https://github.com/Mellanox/tls-openssl > > Ismael > > > On Tue, Sep 6, 2016 at 1:48 PM, Todd Palino <tpal...@gmail.com> wrote: > > > Yeah, that's why I mentioned it with a caveat :) Someone (I can't recall > > who, but it was someone I consider reasonably knowledgable as I actually > > gave it some weight) mentioned it, but I haven't looked into it further > > than that. I agree that I don't see how this is going to help us at the > app > > layer. > > > > -Todd > > > > On Tuesday, September 6, 2016, Ismael Juma <ism...@juma.me.uk> wrote: > > > > > Hi Todd, > > > > > > Thanks for sharing your experience enabling TLS in your clusters. Very > > > helpful. One comment below. > > > > > > On Sun, Sep 4, 2016 at 6:28 PM, Todd Palino <tpal...@gmail.com > > > <javascript:;>> wrote: > > > > > > > > Right now, we're specifically avoiding moving consume traffic to SSL, > > due > > > > to the zero copy send issue. Now I've been told (but I have not > > > > investigated) that OpenSSL can solve this. It would probably be a > good > > > use > > > > of time to look into that further. > > > > > > > > > > As far as I know, OpenSSL can reduce the TLS overhead, but we will > still > > > lose the zero-copy optimisation. There is some attempts at making it > > > possible to retain zero-copy with TLS in the kernel[1][2], but it's > > > probably too early for us to consider that for Kafka. > > > > > > Ismael > > > > > > [1] https://lwn.net/Articles/666509/ > > > [2] > > > http://techblog.netflix.com/2016/08/protecting-netflix- > > > viewing-privacy-at.html > > > > > > > > > -- > > *Todd Palino* > > Staff Site Reliability Engineer > > Data Infrastructure Streaming > > > > > > > > linkedin.com/in/toddpalino > > >
[jira] [Updated] (KAFKA-5056) Shuffling of partitions in old consumer fetch requests removed
[ https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-5056: --- Summary: Shuffling of partitions in old consumer fetch requests removed (was: Shuffling of partitions in old consume fetch requests removed) > Shuffling of partitions in old consumer fetch requests removed > -- > > Key: KAFKA-5056 > URL: https://issues.apache.org/jira/browse/KAFKA-5056 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 > Reporter: Todd Palino > Assignee: Todd Palino > > [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes] > deprecated the constructor to {{FetchRequest}} which shuffles the > {{requestInfo}} parameter, in favor of round robin reordering logic added to > the replica fetcher and the consumer API. However, this was not added to the > old consumer {{ConsumerFetcherThread}}, which has resulted in unfair > partition fetching since 0.10.1. > In order to maintain the old consumer, we need to add the removed shuffle to > {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} > is composed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed
[ https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-5056: --- Reviewer: Joel Koshy Status: Patch Available (was: In Progress) > Shuffling of partitions in old consume fetch requests removed > - > > Key: KAFKA-5056 > URL: https://issues.apache.org/jira/browse/KAFKA-5056 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 > Reporter: Todd Palino > Assignee: Todd Palino > > [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes] > deprecated the constructor to {{FetchRequest}} which shuffles the > {{requestInfo}} parameter, in favor of round robin reordering logic added to > the replica fetcher and the consumer API. However, this was not added to the > old consumer {{ConsumerFetcherThread}}, which has resulted in unfair > partition fetching since 0.10.1. > In order to maintain the old consumer, we need to add the removed shuffle to > {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} > is composed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed
[ https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5056 started by Todd Palino. -- > Shuffling of partitions in old consume fetch requests removed > - > > Key: KAFKA-5056 > URL: https://issues.apache.org/jira/browse/KAFKA-5056 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 > Reporter: Todd Palino > Assignee: Todd Palino > > [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes] > deprecated the constructor to {{FetchRequest}} which shuffles the > {{requestInfo}} parameter, in favor of round robin reordering logic added to > the replica fetcher and the consumer API. However, this was not added to the > old consumer {{ConsumerFetcherThread}}, which has resulted in unfair > partition fetching since 0.10.1. > In order to maintain the old consumer, we need to add the removed shuffle to > {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} > is composed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed
Todd Palino created KAFKA-5056: -- Summary: Shuffling of partitions in old consume fetch requests removed Key: KAFKA-5056 URL: https://issues.apache.org/jira/browse/KAFKA-5056 Project: Kafka Issue Type: Bug Affects Versions: 0.10.1.0 Reporter: Todd Palino Assignee: Todd Palino [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes] deprecated the constructor to {{FetchRequest}} which shuffles the {{requestInfo}} parameter, in favor of round robin reordering logic added to the replica fetcher and the consumer API. However, this was not added to the old consumer {{ConsumerFetcherThread}}, which has resulted in unfair partition fetching since 0.10.1. In order to maintain the old consumer, we need to add the removed shuffle to {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} is composed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [DISCUSS] KIP-124: Request rate quotas
Rajini - I understand what you’re saying, but the point I’m making is that I don’t believe we need to take it into account directly. The CPU utilization of the network threads is directly proportional to the number of bytes being sent. The more bytes, the more CPU that is required for SSL (or other tasks). This is opposed to the request handler threads, where there are a number of factors that affect CPU utilization. This means that it’s not necessary to separately quota network thread byte usage and CPU - if we quota byte usage (which we already do), we have fixed the CPU usage at a proportional amount. Jun - Thanks for the clarification there. I was thinking of the utilization percentage as being fixed, not what the percentage reflects. I’m not tied to either way of doing it, provided that we do not lock clients to a single thread. For example, if I specify that a given client can use 10% of a single thread, that should also mean they can use 1% on 10 threads. -Todd On Wed, Mar 8, 2017 at 8:57 AM, Jun Rao <j...@confluent.io> wrote: > Hi, Todd, > > Thanks for the feedback. > > I just want to clarify your second point. If the limit percentage is per > thread and the thread counts are changed, the absolute processing limit for > existing users haven't changed and there is no need to adjust them. On the > other hand, if the limit percentage is of total thread pool capacity and > the thread counts are changed, the effective processing limit for a user > will change. So, to preserve the current processing limit, existing user > limits have to be adjusted. If there is a hardware change, the effective > processing limit for a user will change in either approach and the existing > limit may need to be adjusted. However, hardware changes are less common > than thread pool configuration changes. > > Thanks, > > Jun > > On Tue, Mar 7, 2017 at 4:45 PM, Todd Palino <tpal...@gmail.com> wrote: > > > I’ve been following this one on and off, and overall it sounds good to > me. > > > > - The SSL question is a good one. However, that type of overhead should > be > > proportional to the bytes rate, so I think that a bytes rate quota would > > still be a suitable way to address it. > > > > - I think it’s better to make the quota percentage of total thread pool > > capacity, and not percentage of an individual thread. That way you don’t > > have to adjust it when you adjust thread counts (tuning, hardware > changes, > > etc.) > > > > > > -Todd > > > > > > > > On Tue, Mar 7, 2017 at 2:38 PM, Becket Qin <becket@gmail.com> wrote: > > > > > I see. Good point about SSL. > > > > > > I just asked Todd to take a look. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Tue, Mar 7, 2017 at 2:17 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Jiangjie, > > > > > > > > Yes, I agree that byte rate already protects the network threads > > > > indirectly. I am not sure if byte rate fully captures the CPU > overhead > > in > > > > network due to SSL. So, at the high level, we can use request time > > limit > > > to > > > > protect CPU and use byte rate to protect storage and network. > > > > > > > > Also, do you think you can get Todd to comment on this KIP? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Tue, Mar 7, 2017 at 11:21 AM, Becket Qin <becket@gmail.com> > > > wrote: > > > > > > > > > Hi Rajini/Jun, > > > > > > > > > > The percentage based reasoning sounds good. > > > > > One thing I am wondering is that if we assume the network thread > are > > > just > > > > > doing the network IO, can we say bytes rate quota is already sort > of > > > > > network threads quota? > > > > > If we take network threads into the consideration here, would that > be > > > > > somewhat overlapping with the bytes rate quota? > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Tue, Mar 7, 2017 at 11:04 AM, Rajini Sivaram < > > > rajinisiva...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > Jun, > > > > > > > > > > > > Thank you for the explanation, I hadn't realized you meant > > percentage > > &g
Re: [DISCUSS] KIP-124: Request rate quotas
I’ve been following this one on and off, and overall it sounds good to me. - The SSL question is a good one. However, that type of overhead should be proportional to the bytes rate, so I think that a bytes rate quota would still be a suitable way to address it. - I think it’s better to make the quota percentage of total thread pool capacity, and not percentage of an individual thread. That way you don’t have to adjust it when you adjust thread counts (tuning, hardware changes, etc.) -Todd On Tue, Mar 7, 2017 at 2:38 PM, Becket Qinwrote: > I see. Good point about SSL. > > I just asked Todd to take a look. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Mar 7, 2017 at 2:17 PM, Jun Rao wrote: > > > Hi, Jiangjie, > > > > Yes, I agree that byte rate already protects the network threads > > indirectly. I am not sure if byte rate fully captures the CPU overhead in > > network due to SSL. So, at the high level, we can use request time limit > to > > protect CPU and use byte rate to protect storage and network. > > > > Also, do you think you can get Todd to comment on this KIP? > > > > Thanks, > > > > Jun > > > > On Tue, Mar 7, 2017 at 11:21 AM, Becket Qin > wrote: > > > > > Hi Rajini/Jun, > > > > > > The percentage based reasoning sounds good. > > > One thing I am wondering is that if we assume the network thread are > just > > > doing the network IO, can we say bytes rate quota is already sort of > > > network threads quota? > > > If we take network threads into the consideration here, would that be > > > somewhat overlapping with the bytes rate quota? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Tue, Mar 7, 2017 at 11:04 AM, Rajini Sivaram < > rajinisiva...@gmail.com > > > > > > wrote: > > > > > > > Jun, > > > > > > > > Thank you for the explanation, I hadn't realized you meant percentage > > of > > > > the total thread pool. If everyone is OK with Jun's suggestion, I > will > > > > update the KIP. > > > > > > > > Thanks, > > > > > > > > Rajini > > > > > > > > On Tue, Mar 7, 2017 at 5:08 PM, Jun Rao wrote: > > > > > > > > > Hi, Rajini, > > > > > > > > > > Let's take your example. Let's say a user sets the limit to 50%. I > am > > > not > > > > > sure if it's better to apply the same percentage separately to > > network > > > > and > > > > > io thread pool. For example, for produce requests, most of the time > > > will > > > > be > > > > > spent in the io threads whereas for fetch requests, most of the > time > > > will > > > > > be in the network threads. So, using the same percentage in both > > thread > > > > > pools means one of the pools' resource will be over allocated. > > > > > > > > > > An alternative way is to simply model network and io thread pool > > > > together. > > > > > If you get 10 io threads and 5 network threads, you get 1500% > request > > > > > processing power. A 50% limit means a total of 750% processing > power. > > > We > > > > > just add up the time a user request spent in either network or io > > > thread. > > > > > If that total exceeds 750% (doesn't matter whether it's spent more > in > > > > > network or io thread), the request will be throttled. This seems > more > > > > > general and is not sensitive to the current implementation detail > of > > > > having > > > > > a separate network and io thread pool. In the future, if the > > threading > > > > > model changes, the same concept of quota can still be applied. For > > now, > > > > > since it's a bit tricky to add the delay logic in the network > thread > > > > pool, > > > > > we could probably just do the delaying only in the io threads as > you > > > > > suggested earlier. > > > > > > > > > > There is still the orthogonal question of whether a quota of 50% is > > out > > > > of > > > > > 100% or 100% * #total processing threads. My feeling is that the > > latter > > > > is > > > > > slightly better based on my explanation earlier. The way to > describe > > > this > > > > > quota to the users can be "share of elapsed request processing time > > on > > > a > > > > > single CPU" (similar to top). > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Fri, Mar 3, 2017 at 4:22 AM, Rajini Sivaram < > > > rajinisiva...@gmail.com> > > > > > wrote: > > > > > > > > > > > Jun, > > > > > > > > > > > > Agree about the two scenarios. > > > > > > > > > > > > But still not sure about a single quota covering both network > > threads > > > > and > > > > > > I/O threads with per-thread quota. If there are 10 I/O threads > and > > 5 > > > > > > network threads and I want to assign half the quota to userA, the > > > quota > > > > > > would be 750%. I imagine, internally, we would convert this to > 500% > > > for > > > > > I/O > > > > > > and 250% for network threads to allocate 50% of each pool. > > > > > > > > > > > > A couple of scenarios: > > > > > > > > > > > > 1. Admin adds 1 extra network
[jira] [Commented] (KAFKA-1342) Slow controlled shutdowns can result in stale shutdown requests
[ https://issues.apache.org/jira/browse/KAFKA-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893728#comment-15893728 ] Todd Palino commented on KAFKA-1342: [~wushujames], I'm not sure about increasing the number of requests, but a lot of this should have been resolved recently with some updates that I believe [~becket_qin] made. To make it better, we had bumped controller.socket.timeout.ms significantly (to 30). We didn't see any side effects from doing that. > Slow controlled shutdowns can result in stale shutdown requests > --- > > Key: KAFKA-1342 > URL: https://issues.apache.org/jira/browse/KAFKA-1342 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1 >Reporter: Joel Koshy >Assignee: Joel Koshy >Priority: Critical > Labels: newbie++, newbiee, reliability > Fix For: 0.11.0.0 > > > I don't think this is a bug introduced in 0.8.1., but triggered by the fact > that controlled shutdown seems to have become slower in 0.8.1 (will file a > separate ticket to investigate that). When doing a rolling bounce, it is > possible for a bounced broker to stop all its replica fetchers since the > previous PID's shutdown requests are still being shutdown. > - 515 is the controller > - Controlled shutdown initiated for 503 > - Controller starts controlled shutdown for 503 > - The controlled shutdown takes a long time in moving leaders and moving > follower replicas on 503 to the offline state. > - So 503's read from the shutdown channel times out and a new channel is > created. It issues another shutdown request. This request (since it is a > new channel) is accepted at the controller's socket server but then waits > on the broker shutdown lock held by the previous controlled shutdown which > is still in progress. > - The above step repeats for the remaining retries (six more requests). > - 503 hits SocketTimeout exception on reading the response of the last > shutdown request and proceeds to do an unclean shutdown. > - The controller's onBrokerFailure call-back fires and moves 503's replicas > to offline (not too important in this sequence). > - 503 is brought back up. > - The controller's onBrokerStartup call-back fires and moves its replicas > (and partitions) to online state. 503 starts its replica fetchers. > - Unfortunately, the (phantom) shutdown requests are still being handled and > the controller sends StopReplica requests to 503. > - The first shutdown request finally finishes (after 76 minutes in my case!). > - The remaining shutdown requests also execute and do the same thing (sends > StopReplica requests for all partitions to > 503). > - The remaining requests complete quickly because they end up not having to > touch zookeeper paths - no leaders left on the broker and no need to > shrink ISR in zookeeper since it has already been done by the first > shutdown request. > - So in the end-state 503 is up, but effectively idle due to the previous > PID's shutdown requests. > There are some obvious fixes that can be made to controlled shutdown to help > address the above issue. E.g., we don't really need to move follower > partitions to Offline. We did that as an "optimization" so the broker falls > out of ISR sooner - which is helpful when producers set required.acks to -1. > However it adds a lot of latency to controlled shutdown. Also, (more > importantly) we should have a mechanism to abort any stale shutdown process. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [DISCUSS] KIP-112: Handle disk failure for JBOD
re and it takes time to rebuild disk after one disk > > > > failure. RAID 5 implementations are susceptible to system failures > > > because > > > > of trends regarding array rebuild time and the chance of drive > failure > > > > during rebuild. There is no such performance degradation for JBOD and > > > JBOD > > > > can support multiple log directory failure without reducing > performance > > > of > > > > good log directories. Would this be a reasonable reason for using > JBOD > > > > instead of RAID-5/6? > > > > > > > > Previously we discussed wether broker should remove offline replica > > from > > > > replica fetcher thread. I still think it should do it instead of > > > printing a > > > > lot of error in the log4j log. We can still let controller send > > > > StopReplicaRequest to the broker. I am not sure I undertand why > > allowing > > > > broker to remove offline replica from fetcher thread will increase > > churns > > > > in ISR. Do you think this is concern with this approach? > > > > > > > > I have updated the KIP to remove created flag from ZK and change the > > > filed > > > > name to isNewReplica. Can you check if there is any issue with the > > latest > > > > KIP? Thanks for your time! > > > > > > > > Regards, > > > > Dong > > > > > > > > > > > > On Sat, Feb 25, 2017 at 9:11 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Hi, Dong, > > > > > > > > > > Thanks for the reply. > > > > > > > > > > Personally, I'd prefer not to write the created flag per replica in > > ZK. > > > > > Your suggestion of disabling replica creation if there is a bad log > > > > > directory on the broker could work. The only thing is that it may > > delay > > > > the > > > > > creation of new replicas. I was thinking that an alternative is to > > > extend > > > > > LeaderAndIsrRequest by adding a isNewReplica field per replica. > That > > > > field > > > > > will be set when a replica is transitioning from the NewReplica > state > > > to > > > > > Online state. Then, when a broker receives a LeaderAndIsrRequest, > if > > a > > > > > replica is marked as the new replica, it will be created on a good > > log > > > > > directory, if not already present. Otherwise, it only creates the > > > replica > > > > > if all log directories are good and the replica is not already > > present. > > > > > This way, we don't delay the processing of new replicas in the > common > > > > case. > > > > > > > > > > I am ok with not persisting the offline replicas in ZK and just > > > > discovering > > > > > them through the LeaderAndIsrRequest. It handles the cases when a > > > broker > > > > > starts up with bad log directories better. So, the additional > > overhead > > > of > > > > > rediscovering the offline replicas is justified. > > > > > > > > > > > > > > > Another high level question. The proposal rejected RAID5/6 since it > > > adds > > > > > additional I/Os. The main issue with RAID5 is that to write a block > > > that > > > > > doesn't match the RAID stripe size, we have to first read the old > > > parity > > > > to > > > > > compute the new one, which increases the number of I/Os ( > > > > > http://rickardnobel.se/raid-5-write-penalty/). I am wondering if > you > > > > have > > > > > tested RAID5's performance by creating a file system whose block > size > > > > > matches the RAID stripe size (https://www.percona.com/blog/ > > > > > 2011/12/16/setting-up-xfs-the-simple-edition/). This way, writing > a > > > > block > > > > > doesn't require a read first. A large block size may increase the > > > amount > > > > of > > > > > data writes, when the same block has to be written to disk multiple > > > > times. > > > > > However, this is probably ok in Kafka's use case since we batch the > > I/O > > > > > flush already. As you can see, we will be adding some complexity to > > > > support > > > > > JBOD in Kafka one way or another. If we can tune the performance of > > > RAID5 > > > > > to match that of RAID10, perhaps using RAID5 is a simpler solution. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Fri, Feb 24, 2017 at 10:17 AM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > I don't think we should allow failed replicas to be re-created on > > the > > > > > good > > > > > > disks. Say there are 2 disks and each of them is 51% loaded. If > any > > > > disk > > > > > > fail, and we allow replicas to be re-created on the other disks, > > both > > > > > disks > > > > > > will fail. Alternatively we can disable replica creation if there > > is > > > > bad > > > > > > disk on a broker. I personally think it is worth the additional > > > > > complexity > > > > > > in the broker to store created replicas in ZK so that we allow > new > > > > > replicas > > > > > > to be created on the broker even when there is bad log directory. > > > This > > > > > > approach won't add complexity in the controller. But I am fine > with > > > > > > disabling replica creation when there is bad log directory that > if > > it > > > > is > > > > > > the only blocking issue for this KIP. > > > > > > > > > > > > Whether we store created flags is independent of whether/how we > > store > > > > > > offline replicas. Per our previous discussion, do you think it is > > OK > > > > not > > > > > > store offline replicas in ZK and propagate the offline replicas > > from > > > > > broker > > > > > > to controller via LeaderAndIsrRequest? > > > > > > > > > > > > Thanks, > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > -- *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
Re: [DISCUSS] KIP-82 - Add Record Headers
Come on, I’ve done at least 2 talks on this one :) Producing counts to a topic is part of it, but that’s only part. So you count you have 100 messages in topic A. When you mirror topic A to another cluster, you have 99 messages. Where was your problem? Or worse, you have 100 messages, but one producer duplicated messages and another one lost messages. You need details about where the message came from in order to pinpoint problems when they happen. Source producer info, where it was produced into your infrastructure, and when it was produced. This requires you to add the information to the message. And yes, you still need to maintain your clients. So maybe my original example was not the best. My thoughts on not wanting to be responsible for message formats stands, because that’s very much separate from the client. As you know, we have our own internal client library that can insert the right headers, and right now inserts the right audit information into the message fields. If they exist, and assuming the message is Avro encoded. What if someone wants to use JSON instead for a good reason? What if user X wants to encrypt messages, but user Y does not? Maintaining the client library is still much easier than maintaining the message formats. -Todd On Thu, Dec 1, 2016 at 6:21 PM, Gwen Shapira <g...@confluent.io> wrote: > Based on your last sentence, consider me convinced :) > > I get why headers are critical for Mirroring (you need tags to prevent > loops and sometimes to route messages to the correct destination). > But why do you need headers to audit? We are auditing by producing > counts to a side topic (and I was under the impression you do the > same), so we never need to modify the message. > > Another thing - after we added headers, wouldn't you be in the > business of making sure everyone uses them properly? Making sure > everyone includes the right headers you need, not using the header > names you intend to use, etc. I don't think the "policing" business > will ever go away. > > On Thu, Dec 1, 2016 at 5:25 PM, Todd Palino <tpal...@gmail.com> wrote: > > Got it. As an ops guy, I'm not very happy with the workaround. Avro means > > that I have to be concerned with the format of the messages in order to > run > > the infrastructure (audit, mirroring, etc.). That means that I have to > > handle the schemas, and I have to enforce rules about good formats. This > is > > not something I want to be in the business of, because I should be able > to > > run a service infrastructure without needing to be in the weeds of > dealing > > with customer data formats. > > > > Trust me, a sizable portion of my support time is spent dealing with > schema > > issues. I really would like to get away from that. Maybe I'd have more > time > > for other hobbies. Like writing. ;) > > > > -Todd > > > > On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira <g...@confluent.io> wrote: > > > >> I'm pretty satisfied with the current workarounds (Avro container > >> format), so I'm not too excited about the extra work required to do > >> headers in Kafka. I absolutely don't mind it if you do it... > >> I think the Apache convention for "good idea, but not willing to put > >> any work toward it" is +0.5? anyway, that's what I was trying to > >> convey :) > >> > >> On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino <tpal...@gmail.com> wrote: > >> > Well I guess my question for you, then, is what is holding you back > from > >> > full support for headers? What’s the bit that you’re missing that has > you > >> > under a full +1? > >> > > >> > -Todd > >> > > >> > > >> > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira <g...@confluent.io> > wrote: > >> > > >> >> I know why people who support headers support them, and I've seen > what > >> >> the discussion is like. > >> >> > >> >> This is why I'm asking people who are against headers (especially > >> >> committers) what will make them change their mind - so we can get > this > >> >> part over one way or another. > >> >> > >> >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am > >> >> just looking for something concrete we can do to move the discussion > >> >> along to the yummy design details (which is the argument I really am > >> >> looking forward to). > >> >> > >> >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino <tpal...@gmail.com> > wrote: > >> >> > So, Gwen, to yo
Re: [DISCUSS] KIP-82 - Add Record Headers
Got it. As an ops guy, I'm not very happy with the workaround. Avro means that I have to be concerned with the format of the messages in order to run the infrastructure (audit, mirroring, etc.). That means that I have to handle the schemas, and I have to enforce rules about good formats. This is not something I want to be in the business of, because I should be able to run a service infrastructure without needing to be in the weeds of dealing with customer data formats. Trust me, a sizable portion of my support time is spent dealing with schema issues. I really would like to get away from that. Maybe I'd have more time for other hobbies. Like writing. ;) -Todd On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira <g...@confluent.io> wrote: > I'm pretty satisfied with the current workarounds (Avro container > format), so I'm not too excited about the extra work required to do > headers in Kafka. I absolutely don't mind it if you do it... > I think the Apache convention for "good idea, but not willing to put > any work toward it" is +0.5? anyway, that's what I was trying to > convey :) > > On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino <tpal...@gmail.com> wrote: > > Well I guess my question for you, then, is what is holding you back from > > full support for headers? What’s the bit that you’re missing that has you > > under a full +1? > > > > -Todd > > > > > > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira <g...@confluent.io> wrote: > > > >> I know why people who support headers support them, and I've seen what > >> the discussion is like. > >> > >> This is why I'm asking people who are against headers (especially > >> committers) what will make them change their mind - so we can get this > >> part over one way or another. > >> > >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am > >> just looking for something concrete we can do to move the discussion > >> along to the yummy design details (which is the argument I really am > >> looking forward to). > >> > >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino <tpal...@gmail.com> wrote: > >> > So, Gwen, to your question (even though I’m not a committer)... > >> > > >> > I have always been a strong supporter of introducing the concept of an > >> > envelope to messages, which headers accomplishes. The message key is > >> > already an example of a piece of envelope information. By providing a > >> means > >> > to do this within Kafka itself, and not relying on use-case specific > >> > implementations, you make it much easier for components to > interoperate. > >> It > >> > simplifies development of all these things (message routing, auditing, > >> > encryption, etc.) because each one does not have to reinvent the > wheel. > >> > > >> > It also makes it much easier from a client point of view if the > headers > >> are > >> > defined as part of the protocol and/or message format in general > because > >> > you can easily produce and consume messages without having to take > into > >> > account specific cases. For example, I want to route messages, but > >> client A > >> > doesn’t support the way audit implemented headers, and client B > doesn’t > >> > support the way encryption or routing implemented headers, so now my > >> > application has to create some really fragile (my autocorrect just > tried > >> to > >> > make that “tragic”, which is probably appropriate too) code to strip > >> > everything off, rather than just consuming the messages, picking out > the > >> 1 > >> > or 2 headers it’s interested in, and performing its function. > >> > > >> > Honestly, this discussion has been going on for a long time, and it’s > >> > always “Oh, you came up with 2 use cases, and yeah, those use cases > are > >> > real things that someone would want to do. Here’s an alternate way to > >> > implement them so let’s not do headers.” If we have a few use cases > that > >> we > >> > actually came up with, you can be sure that over the next year > there’s a > >> > dozen others that we didn’t think of that someone would like to do. I > >> > really think it’s time to stop rehashing this discussion and instead > >> focus > >> > on a workable standard that we can adopt. > >> > > >> > -Todd > >> > > >> > > >> > On Thu, Dec 1, 2016 at 1:39 PM,
Re: [DISCUSS] KIP-82 - Add Record Headers
Well I guess my question for you, then, is what is holding you back from full support for headers? What’s the bit that you’re missing that has you under a full +1? -Todd On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira <g...@confluent.io> wrote: > I know why people who support headers support them, and I've seen what > the discussion is like. > > This is why I'm asking people who are against headers (especially > committers) what will make them change their mind - so we can get this > part over one way or another. > > If I sound frustrated it is not at Radai, Jun or you (Todd)... I am > just looking for something concrete we can do to move the discussion > along to the yummy design details (which is the argument I really am > looking forward to). > > On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino <tpal...@gmail.com> wrote: > > So, Gwen, to your question (even though I’m not a committer)... > > > > I have always been a strong supporter of introducing the concept of an > > envelope to messages, which headers accomplishes. The message key is > > already an example of a piece of envelope information. By providing a > means > > to do this within Kafka itself, and not relying on use-case specific > > implementations, you make it much easier for components to interoperate. > It > > simplifies development of all these things (message routing, auditing, > > encryption, etc.) because each one does not have to reinvent the wheel. > > > > It also makes it much easier from a client point of view if the headers > are > > defined as part of the protocol and/or message format in general because > > you can easily produce and consume messages without having to take into > > account specific cases. For example, I want to route messages, but > client A > > doesn’t support the way audit implemented headers, and client B doesn’t > > support the way encryption or routing implemented headers, so now my > > application has to create some really fragile (my autocorrect just tried > to > > make that “tragic”, which is probably appropriate too) code to strip > > everything off, rather than just consuming the messages, picking out the > 1 > > or 2 headers it’s interested in, and performing its function. > > > > Honestly, this discussion has been going on for a long time, and it’s > > always “Oh, you came up with 2 use cases, and yeah, those use cases are > > real things that someone would want to do. Here’s an alternate way to > > implement them so let’s not do headers.” If we have a few use cases that > we > > actually came up with, you can be sure that over the next year there’s a > > dozen others that we didn’t think of that someone would like to do. I > > really think it’s time to stop rehashing this discussion and instead > focus > > on a workable standard that we can adopt. > > > > -Todd > > > > > > On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino <tpal...@gmail.com> wrote: > > > >> C. per message encryption > >>> One drawback of this approach is that this significantly reduce the > >>> effectiveness of compression, which happens on a set of serialized > >>> messages. An alternative is to enable SSL for wire encryption and rely > on > >>> the storage system (e.g. LUKS) for at rest encryption. > >> > >> > >> Jun, this is not sufficient. While this does cover the case of removing > a > >> drive from the system, it will not satisfy most compliance requirements > for > >> encryption of data as whoever has access to the broker itself still has > >> access to the unencrypted data. For end-to-end encryption you need to > >> encrypt at the producer, before it enters the system, and decrypt at the > >> consumer, after it exits the system. > >> > >> -Todd > >> > >> > >> On Thu, Dec 1, 2016 at 1:03 PM, radai <radai.rosenbl...@gmail.com> > wrote: > >> > >>> another big plus of headers in the protocol is that it would enable > rapid > >>> iteration on ideas outside of core kafka and would reduce the number of > >>> future wire format changes required. > >>> > >>> a lot of what is currently a KIP represents use cases that are not 100% > >>> relevant to all users, and some of them require rather invasive wire > >>> protocol changes. a thing a good recent example of this is kip-98. > >>> tx-utilizing traffic is expected to be a very small fraction of total > >>> traffic and yet the changes are invasive. > >>> > >>> every such wire
Re: [DISCUSS] KIP-82 - Add Record Headers
So, Gwen, to your question (even though I’m not a committer)... I have always been a strong supporter of introducing the concept of an envelope to messages, which headers accomplishes. The message key is already an example of a piece of envelope information. By providing a means to do this within Kafka itself, and not relying on use-case specific implementations, you make it much easier for components to interoperate. It simplifies development of all these things (message routing, auditing, encryption, etc.) because each one does not have to reinvent the wheel. It also makes it much easier from a client point of view if the headers are defined as part of the protocol and/or message format in general because you can easily produce and consume messages without having to take into account specific cases. For example, I want to route messages, but client A doesn’t support the way audit implemented headers, and client B doesn’t support the way encryption or routing implemented headers, so now my application has to create some really fragile (my autocorrect just tried to make that “tragic”, which is probably appropriate too) code to strip everything off, rather than just consuming the messages, picking out the 1 or 2 headers it’s interested in, and performing its function. Honestly, this discussion has been going on for a long time, and it’s always “Oh, you came up with 2 use cases, and yeah, those use cases are real things that someone would want to do. Here’s an alternate way to implement them so let’s not do headers.” If we have a few use cases that we actually came up with, you can be sure that over the next year there’s a dozen others that we didn’t think of that someone would like to do. I really think it’s time to stop rehashing this discussion and instead focus on a workable standard that we can adopt. -Todd On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino <tpal...@gmail.com> wrote: > C. per message encryption >> One drawback of this approach is that this significantly reduce the >> effectiveness of compression, which happens on a set of serialized >> messages. An alternative is to enable SSL for wire encryption and rely on >> the storage system (e.g. LUKS) for at rest encryption. > > > Jun, this is not sufficient. While this does cover the case of removing a > drive from the system, it will not satisfy most compliance requirements for > encryption of data as whoever has access to the broker itself still has > access to the unencrypted data. For end-to-end encryption you need to > encrypt at the producer, before it enters the system, and decrypt at the > consumer, after it exits the system. > > -Todd > > > On Thu, Dec 1, 2016 at 1:03 PM, radai <radai.rosenbl...@gmail.com> wrote: > >> another big plus of headers in the protocol is that it would enable rapid >> iteration on ideas outside of core kafka and would reduce the number of >> future wire format changes required. >> >> a lot of what is currently a KIP represents use cases that are not 100% >> relevant to all users, and some of them require rather invasive wire >> protocol changes. a thing a good recent example of this is kip-98. >> tx-utilizing traffic is expected to be a very small fraction of total >> traffic and yet the changes are invasive. >> >> every such wire format change translates into painful and slow adoption of >> new versions. >> >> i think a lot of functionality currently in KIPs could be "spun out" and >> implemented as opt-in plugins transmitting data over headers. this would >> keep the core wire format stable(r), core codebase smaller, and avoid the >> "burden of proof" thats sometimes required to prove a certain feature is >> useful enough for a wide-enough audience to warrant a wire format change >> and code complexity additions. >> >> (to be clear - kip-98 goes beyond "mere" wire format changes and im not >> saying it could have been completely done with headers, but exactly-once >> delivery certainly could) >> >> On Thu, Dec 1, 2016 at 11:20 AM, Gwen Shapira <g...@confluent.io> wrote: >> >> > On Thu, Dec 1, 2016 at 10:24 AM, radai <radai.rosenbl...@gmail.com> >> wrote: >> > > "For use cases within an organization, one could always use other >> > > approaches such as company-wise containers" >> > > this is what linkedin has traditionally done but there are now cases >> > (read >> > > - topics) where this is not acceptable. this makes headers useful even >> > > within single orgs for cases where one-container-fits-all cannot >> apply. >> > > >> > > as for the particular use cases listed, i dont want this to devo
Re: [DISCUSS] KIP-82 - Add Record Headers
> > C. per message encryption > One drawback of this approach is that this significantly reduce the > effectiveness of compression, which happens on a set of serialized > messages. An alternative is to enable SSL for wire encryption and rely on > the storage system (e.g. LUKS) for at rest encryption. Jun, this is not sufficient. While this does cover the case of removing a drive from the system, it will not satisfy most compliance requirements for encryption of data as whoever has access to the broker itself still has access to the unencrypted data. For end-to-end encryption you need to encrypt at the producer, before it enters the system, and decrypt at the consumer, after it exits the system. -Todd On Thu, Dec 1, 2016 at 1:03 PM, radaiwrote: > another big plus of headers in the protocol is that it would enable rapid > iteration on ideas outside of core kafka and would reduce the number of > future wire format changes required. > > a lot of what is currently a KIP represents use cases that are not 100% > relevant to all users, and some of them require rather invasive wire > protocol changes. a thing a good recent example of this is kip-98. > tx-utilizing traffic is expected to be a very small fraction of total > traffic and yet the changes are invasive. > > every such wire format change translates into painful and slow adoption of > new versions. > > i think a lot of functionality currently in KIPs could be "spun out" and > implemented as opt-in plugins transmitting data over headers. this would > keep the core wire format stable(r), core codebase smaller, and avoid the > "burden of proof" thats sometimes required to prove a certain feature is > useful enough for a wide-enough audience to warrant a wire format change > and code complexity additions. > > (to be clear - kip-98 goes beyond "mere" wire format changes and im not > saying it could have been completely done with headers, but exactly-once > delivery certainly could) > > On Thu, Dec 1, 2016 at 11:20 AM, Gwen Shapira wrote: > > > On Thu, Dec 1, 2016 at 10:24 AM, radai > wrote: > > > "For use cases within an organization, one could always use other > > > approaches such as company-wise containers" > > > this is what linkedin has traditionally done but there are now cases > > (read > > > - topics) where this is not acceptable. this makes headers useful even > > > within single orgs for cases where one-container-fits-all cannot apply. > > > > > > as for the particular use cases listed, i dont want this to devolve to > a > > > discussion of particular use cases - i think its enough that some of > them > > > > I think a main point of contention is that: We identified few > > use-cases where headers are useful, do we want Kafka to be a system > > that supports those use-cases? > > > > For example, Jun said: > > "Not sure how widely useful record-level lineage is though since the > > overhead could > > be significant." > > > > We know NiFi supports record level lineage. I don't think it was > > developed for lols, I think it is safe to assume that the NSA needed > > that functionality. We also know that certain financial institutes > > need to track tampering with records at a record level and there are > > federal regulations that absolutely require this. They also need to > > prove that routing apps that "touches" the messages and either reads > > or updates headers couldn't have possibly modified the payload itself. > > They use record level encryption to do that - apps can read and > > (sometimes) modify headers but can't touch the payload. > > > > We can totally say "those are corner cases and not worth adding > > headers to Kafka for", they should use a different pubsub message for > > that (Nifi or one of the other 1000 that cater specifically to the > > financial industry). > > > > But this gets us into a catch 22: > > If we discuss a specific use-case, someone can always say it isn't > > interesting enough for Kafka. If we discuss more general trends, > > others can say "well, we are not sure any of them really needs headers > > specifically. This is just hand waving and not interesting.". > > > > I think discussing use-cases in specifics is super important to decide > > implementation details for headers (my use-cases lean toward numerical > > keys with namespaces and object values, others differ), but I think we > > need to answer the general "Are we going to have headers" question > > first. > > > > I'd love to hear from the other committers in the discussion: > > What would it take to convince you that headers in Kafka are a good > > idea in general, so we can move ahead and try to agree on the details? > > > > I feel like we keep moving the goal posts and this is truly exhausting. > > > > For the record, I mildly support adding headers to Kafka (+0.5?). > > The community can continue to find workarounds to the issue and there > > are some benefits to
[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15700938#comment-15700938 ] Todd Palino commented on KAFKA-3959: +1 as well. I think keeping the config/server.properties file as a quickstart that gets you going with a standalone broker is a good plan. > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668010#comment-15668010 ] Todd Palino commented on KAFKA-3959: As noted, I just want the config enforced. If RF=3 is configured, that's what we should get. If you need RF=1 for testing, or for specific use cases, set it. Even make the default 1 if that's really what we want. But if I explicitly set RF=3, that's what I should get. And if it causes errors, and I've explicitly set it, that's on me as the user. > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Plans to improve SSL performance in Kafka, for 0.10.x?
Yeah, that's why I mentioned it with a caveat :) Someone (I can't recall who, but it was someone I consider reasonably knowledgable as I actually gave it some weight) mentioned it, but I haven't looked into it further than that. I agree that I don't see how this is going to help us at the app layer. -Todd On Tuesday, September 6, 2016, Ismael Juma <ism...@juma.me.uk> wrote: > Hi Todd, > > Thanks for sharing your experience enabling TLS in your clusters. Very > helpful. One comment below. > > On Sun, Sep 4, 2016 at 6:28 PM, Todd Palino <tpal...@gmail.com > <javascript:;>> wrote: > > > > Right now, we're specifically avoiding moving consume traffic to SSL, due > > to the zero copy send issue. Now I've been told (but I have not > > investigated) that OpenSSL can solve this. It would probably be a good > use > > of time to look into that further. > > > > As far as I know, OpenSSL can reduce the TLS overhead, but we will still > lose the zero-copy optimisation. There is some attempts at making it > possible to retain zero-copy with TLS in the kernel[1][2], but it's > probably too early for us to consider that for Kafka. > > Ismael > > [1] https://lwn.net/Articles/666509/ > [2] > http://techblog.netflix.com/2016/08/protecting-netflix- > viewing-privacy-at.html > -- *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
Re: Plans to improve SSL performance in Kafka, for 0.10.x?
We've been using SSL for produce traffic (mirror makers only right now, but that's a very large percentage of traffic for us), and we're in the process of turning it on for inter broker traffic as well. Our experience is that this does not present a significant amount of overhead to the brokers. Specifically with switching over the IBP, we were expecting a lot more of a hit, and it really only ended up being something like a 5% increase in system load, and no reduction in the cluster capacity, in our test cluster. Note that this relies on the fix in KAFKA-4050 and switching the PRNG to SHA1PRNG. Right now, we're specifically avoiding moving consume traffic to SSL, due to the zero copy send issue. Now I've been told (but I have not investigated) that OpenSSL can solve this. It would probably be a good use of time to look into that further. That said, switching the message format to the newer option (KIP-31 I believe?) will result in the brokers not needing to recompress message batches that are produced. This should result in a significant reduction in CPU usage, which may offset the cost of SSL. We haven't had a chance to fully investigate this, however, as changing that config depends on the clients being updated to support the new format. -Todd On Sunday, September 4, 2016, Jaikiran Pai <jai.forums2...@gmail.com> wrote: > We are using 0.10.0.1 of Kafka and (Java) client libraries. We recently > decided to start using SSL for Kafka communication between broker and > clients. Right now, we have a pretty basic setup with just 1 broker with > SSL keystore setup and the Java client(s) communicate using the > Producer/Consumer APIs against this single broker. There's no client auth > (intentionally) right now. We also have plain text enabled for the initial > testing. > > What we have noticed is that the consumer/producer performance when SSL is > enabled is noticeably poor when compared to plain text. I understand that > there are expected to be performance impacts when SSL is enabled but the > order of magnitude is too high and in fact it shows up in a very noticeable > fashion in our product. I do have the numbers, but I haven't been able to > narrow it down yet (because there's nothing that stands out, except that > it's slow). Our application code is exactly the same between non-SSL and > SSL usage. > > Furthermore, I'm aware of this specific JIRA in Kafka > https://issues.apache.org/jira/browse/KAFKA-2561 which acknowledges a > similar issue. So what I would like to know is, in context of Kafka 0.10.x > releases and Java 8 support, are there any timelines that the dev team is > looking for in terms of improving this performance issue (which I believe > requires usage of OpenSSL or other non-JDK implementations of SSLEngine)? > We would like to go for GA of our product in the next couple of months and > in order to do that, we do plan to have Kafka over SSL working with > reasonably good performance, but the current performance isn't promising. > Expecting this to be fixed in the next couple of months and have it > available in 0.10.x is probably too much to expect, but if we know the > plans around this, we should be able to come up with a plan of our own for > our product. > > > -Jaikiran > -- *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
Re: [ANNOUNCE] New Kafka PMC member Gwen Shapira
Congratulations, Gwen! Well deserved :) -Todd On Thu, Aug 18, 2016 at 10:27 AM, Gwen Shapira <g...@confluent.io> wrote: > Thanks team Kafka :) Very excited and happy to contribute and be part > of this fantastic community. > > > > On Thu, Aug 18, 2016 at 9:52 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > Congrats Gwen! > > > > On Thu, Aug 18, 2016 at 9:27 AM, Ashish Singh <asi...@cloudera.com> > wrote: > > > >> Congrats Gwen! > >> > >> On Thursday, August 18, 2016, Grant Henke <ghe...@cloudera.com> wrote: > >> > >> > Congratulations Gwen! > >> > > >> > > >> > > >> > On Thu, Aug 18, 2016 at 2:36 AM, Ismael Juma <ism...@juma.me.uk > >> > <javascript:;>> wrote: > >> > > >> > > Congratulations Gwen! Great news. > >> > > > >> > > Ismael > >> > > > >> > > On 18 Aug 2016 2:44 am, "Jun Rao" <j...@confluent.io <javascript:;>> > >> > wrote: > >> > > > >> > > > Hi, Everyone, > >> > > > > >> > > > Gwen Shapira has been active in the Kafka community since she > became > >> a > >> > > > Kafka committer > >> > > > about a year ago. I am glad to announce that Gwen is now a member > of > >> > > Kafka > >> > > > PMC. > >> > > > > >> > > > Congratulations, Gwen! > >> > > > > >> > > > Jun > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > Grant Henke > >> > Software Engineer | Cloudera > >> > gr...@cloudera.com <javascript:;> | twitter.com/gchenke | > >> > linkedin.com/in/granthenke > >> > > >> > >> > >> -- > >> Ashish h > >> > > > > > > > > -- > > -- Guozhang > > > > -- > Gwen Shapira > Product Manager | Confluent > 650.450.2760 | @gwenshap > Follow us: Twitter | blog > -- *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
Re: [DISCUSS] KIP-73: Replication Quotas
Yeah, I’m good with where we are right now on this KIP. It’s a workable solution that we can add tooling to support. I would prefer to have soft quotas, but given that it is more complex to implement, we can go with hard quotas for the time being and consider it as an improvement later. -Todd On Thu, Aug 18, 2016 at 11:13 AM, Jun Rao <j...@confluent.io> wrote: > Todd, > > Thanks for the detailed reply. So, it sounds like that you are ok with the > current proposal in the KIP for now and we can brainstorm on more automated > stuff separately? Are you comfortable with starting the vote on the current > proposal? > > Jun > > On Thu, Aug 18, 2016 at 11:00 AM, Todd Palino <tpal...@gmail.com> wrote: > > > Joel just reminded me to take another look at this one :) So first off, > > this is great. It’s something that we definitely need to have, especially > > as we get into the realm of moving partitions around more often. > > > > I do prefer to have the cluster handle this automatically. What I > envision > > is a single configuration for “bootstrap replication quota” that is > applied > > when we have a replica that is in this situation. There’s 2 legitimate > > cases that I’m aware of right now: > > 1 - We are moving a partition to a new replica. We know about this (at > > least the controller does), so we should be able to apply the quota > without > > too much trouble here > > 2 - We have a broker that lost its disk and has to recover the partition > > from the cluster. Harder to detect, but in this case, I’m not sure I even > > want to throttle it because this is recovery activity. > > > > The problem with this becomes the question of “why”. Why are you moving a > > partition? Are you doing it because you want to balance traffic? Or are > you > > doing it because you lost a piece of hardware and you need to get the RF > > for the partition back up to the desired level? As an admin, these have > > different priorities. I may be perfectly fine with having the replication > > traffic saturate the cluster in the latter case, because reliability and > > availability is more important than performance. > > > > Given the complexity of trying to determine intent, I’m going to agree > with > > implementing a manual procedure for now. We definitely need to have a > > discussion about automating it as much as possible, but I think it’s part > > of a larger conversation about how much automation should be built into > the > > broker itself, and how much should be part of a bolt-on “cluster > manager”. > > I’m not sure putting all that complexity into the broker is the right > > choice. > > > > I do agree with Joel here that while a hard quota is typically better > from > > a client point of view, in the case of replication traffic a soft quota > is > > appropriate, and desirable. Probably a combination of both, as I think we > > still want a hard limit that stops short of saturating the entire cluster > > with replication traffic. > > > > -Todd > > > > > > On Thu, Aug 18, 2016 at 10:21 AM, Joel Koshy <jjkosh...@gmail.com> > wrote: > > > > > > For your first comment. We thought about determining "effect" > replicas > > > > automatically as well. First, there are some tricky stuff that one > has > > to > > > > > > > > > > Auto-detection of effect traffic: i'm fairly certain it's doable but > > > definitely tricky. I'm also not sure it is something worth tackling at > > the > > > outset. If we want to spend more time thinking over it even if it's > just > > an > > > academic exercise I would be happy to brainstorm offline. > > > > > > > > > > For your second comment, we discussed that in the client quotas > > design. A > > > > down side of that for client quotas is that a client may be surprised > > > that > > > > its traffic is not throttled at one time, but throttled as another > with > > > the > > > > same quota (basically, less predicability). You can imaging setting a > > > quota > > > > for all replication traffic and only slow down the "effect" replicas > if > > > > needed. The thought is more or less the same as the above. It > requires > > > more > > > > > > > > > > For clients, this is true. I think this is much less of an issue for > > > server-side replication since the "users" here are the Kafka SREs who > > > generally know these internal details. >
Re: [DISCUSS] KIP-73: Replication Quotas
on > >> > > >>>>>>>> the > >> > > >>>>>>>>>>>> leader, but we can leave the replica threading model > >> > > >>>> unchanged. > >> > > >>>>>> So, > >> > > >>>>>>>>>>>> overall, this still seems to be a simpler approach. > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> Thanks, > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> Jun > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> On Tue, Aug 9, 2016 at 11:57 AM, Mayuresh Gharat < > >> > > >>>>>>>>>>>> gharatmayures...@gmail.com <javascript:;> > >> > > >>>>>>>>>>>>> wrote: > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>>> Nice write up Ben. > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> I agree with Joel for keeping this simple by excluding > >> > > >> the > >> > > >>>>>>>> partitions > >> > > >>>>>>>>>>>> from > >> > > >>>>>>>>>>>>> the fetch request/response when the quota is violated > at > >> > > >>> the > >> > > >>>>>>>> follower > >> > > >>>>>>>>>>> or > >> > > >>>>>>>>>>>>> leader instead of having a separate set of threads for > >> > > >>>>> handling > >> > > >>>>>>> the > >> > > >>>>>>>>>>> quota > >> > > >>>>>>>>>>>>> and non quota cases. Even though its different from > the > >> > > >>>>> current > >> > > >>>>>>>> quota > >> > > >>>>>>>>>>>>> implementation it should be OK since its internal to > >> > > >>> brokers > >> > > >>>>> and > >> > > >>>>>>> can > >> > > >>>>>>>>> be > >> > > >>>>>>>>>>>>> handled by tuning the quota configs for it > appropriately > >> > > >>> by > >> > > >>>>> the > >> > > >>>>>>>>> admins. > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> Also can you elaborate with an example how this would > be > >> > > >>>>>> handled : > >> > > >>>>>>>>>>>>> *guaranteeing > >> > > >>>>>>>>>>>>> ordering of updates when replicas shift threads* > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> Thanks, > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> Mayuresh > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> On Tue, Aug 9, 2016 at 10:49 AM, Joel Koshy < > >> > > >>>>>> jjkosh...@gmail.com <javascript:;>> > >> > > >>>>>>>>>>> wrote: > >> > > >>>>>>>>>>>>> &g
Re: [DISCUSS] KIP-73: Replication Quotas
;>>>>>>>>>>>>> for clarifying. For completeness, can we add this > > > >> detail > > > >>> to > > > >>>>> the > > > >>>>>>>> doc - > > > >>>>>>>>>>>>> say, > > > >>>>>>>>>>>>>> after the quote that I pasted earlier? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> From an implementation perspective though: I’m still > > > >>>>> interested > > > >>>>>>> in > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>> simplicity of not having to add separate replica > > > >>> fetchers, > > > >>>>>> delay > > > >>>>>>>>>>> queue > > > >>>>>>>>>>>> on > > > >>>>>>>>>>>>>> the leader, and “move” partitions from the throttled > > > >>>> replica > > > >>>>>>>> fetchers > > > >>>>>>>>>>>> to > &g
[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL
[ https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425756#comment-15425756 ] Todd Palino commented on KAFKA-4050: So first off, yes, the thread dump (which [~jjkoshy] posted) shows that the offending line of code is "NativePRNG.java:481". I checked, and that's very clearly in the non-blocking NativePRNG variant that explictly uses /dev/urandom. I had considered changing the default, [~ijuma], and I actually thought about adding a note to this ticket about it earlier today. Despite the fact that the default clearly has performance issues, I don't think we should change the default behavior, which is to let the JRE pick the PRNG implementation. The reason is that we can't be sure that on any given system, in any given JRE, that the new one we set explicitly will exist, and that would cause the default behavior to break. The SHA1PRNG implementation should exist everywhere, but I'd rather not take the risk. I think it's better to leave the default as is, and call out the issue very clearly in the documentation. > Allow configuration of the PRNG used for SSL > > > Key: KAFKA-4050 > URL: https://issues.apache.org/jira/browse/KAFKA-4050 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 0.10.0.1 >Reporter: Todd Palino >Assignee: Todd Palino > Labels: security, ssl > > This change will make the pseudo-random number generator (PRNG) > implementation used by the SSLContext configurable. The configuration is not > required, and the default is to use whatever the default PRNG for the JDK/JRE > is. Providing a string, such as "SHA1PRNG", will cause that specific > SecureRandom implementation to get passed to the SSLContext. > When enabling inter-broker SSL in our certification cluster, we observed > severe performance issues. For reference, this cluster can take up to 600 > MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close > to saturation, and the mirror maker normally produces about 400 MB/sec > (unless it is lagging). When we enabled inter-broker SSL, we saw persistent > replication problems in the cluster at any inbound rate of more than about 6 > or 7 MB/sec per-broker. This was narrowed down to all the network threads > blocking on a single lock in the SecureRandom code. > It turns out that the default PRNG implementation on Linux is NativePRNG. > This uses randomness from /dev/urandom (which, by itself, is a non-blocking > read) and mixes it with randomness from SHA1. The problem is that the entire > application shares a single SecureRandom instance, and NativePRNG has a > global lock within the implNextBytes method. Switching to another > implementation (SHA1PRNG, which has better performance characteristics and is > still considered secure) completely eliminated the bottleneck and allowed the > cluster to work properly at saturation. > The SSLContext initialization has an optional argument to provide a > SecureRandom instance, which the code currently sets to null. This change > creates a new config to specify an implementation, and instantiates that and > passes it to SSLContext if provided. This will also let someone select a > stronger source of randomness (obviously at a performance cost) if desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL
[ https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15423400#comment-15423400 ] Todd Palino commented on KAFKA-4050: It appears to be called every time something needs to be encrypted (have to get randomness to run the crypto routines), so yeah, every single packet being sent would need a call to get random bytes. > Allow configuration of the PRNG used for SSL > > > Key: KAFKA-4050 > URL: https://issues.apache.org/jira/browse/KAFKA-4050 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 0.10.0.1 > Reporter: Todd Palino >Assignee: Todd Palino > Labels: security, ssl > > This change will make the pseudo-random number generator (PRNG) > implementation used by the SSLContext configurable. The configuration is not > required, and the default is to use whatever the default PRNG for the JDK/JRE > is. Providing a string, such as "SHA1PRNG", will cause that specific > SecureRandom implementation to get passed to the SSLContext. > When enabling inter-broker SSL in our certification cluster, we observed > severe performance issues. For reference, this cluster can take up to 600 > MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close > to saturation, and the mirror maker normally produces about 400 MB/sec > (unless it is lagging). When we enabled inter-broker SSL, we saw persistent > replication problems in the cluster at any inbound rate of more than about 6 > or 7 MB/sec per-broker. This was narrowed down to all the network threads > blocking on a single lock in the SecureRandom code. > It turns out that the default PRNG implementation on Linux is NativePRNG. > This uses randomness from /dev/urandom (which, by itself, is a non-blocking > read) and mixes it with randomness from SHA1. The problem is that the entire > application shares a single SecureRandom instance, and NativePRNG has a > global lock within the implNextBytes method. Switching to another > implementation (SHA1PRNG, which has better performance characteristics and is > still considered secure) completely eliminated the bottleneck and allowed the > cluster to work properly at saturation. > The SSLContext initialization has an optional argument to provide a > SecureRandom instance, which the code currently sets to null. This change > creates a new config to specify an implementation, and instantiates that and > passes it to SSLContext if provided. This will also let someone select a > stronger source of randomness (obviously at a performance cost) if desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4050) Allow configuration of the PRNG used for SSL
Todd Palino created KAFKA-4050: -- Summary: Allow configuration of the PRNG used for SSL Key: KAFKA-4050 URL: https://issues.apache.org/jira/browse/KAFKA-4050 Project: Kafka Issue Type: Improvement Components: security Affects Versions: 0.10.0.1 Reporter: Todd Palino Assignee: Todd Palino This change will make the pseudo-random number generator (PRNG) implementation used by the SSLContext configurable. The configuration is not required, and the default is to use whatever the default PRNG for the JDK/JRE is. Providing a string, such as "SHA1PRNG", will cause that specific SecureRandom implementation to get passed to the SSLContext. When enabling inter-broker SSL in our certification cluster, we observed severe performance issues. For reference, this cluster can take up to 600 MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close to saturation, and the mirror maker normally produces about 400 MB/sec (unless it is lagging). When we enabled inter-broker SSL, we saw persistent replication problems in the cluster at any inbound rate of more than about 6 or 7 MB/sec per-broker. This was narrowed down to all the network threads blocking on a single lock in the SecureRandom code. It turns out that the default PRNG implementation on Linux is NativePRNG. This uses randomness from /dev/urandom (which, by itself, is a non-blocking read) and mixes it with randomness from SHA1. The problem is that the entire application shares a single SecureRandom instance, and NativePRNG has a global lock within the implNextBytes method. Switching to another implementation (SHA1PRNG, which has better performance characteristics and is still considered secure) completely eliminated the bottleneck and allowed the cluster to work properly at saturation. The SSLContext initialization has an optional argument to provide a SecureRandom instance, which the code currently sets to null. This change creates a new config to specify an implementation, and instantiates that and passes it to SSLContext if provided. This will also let someone select a stronger source of randomness (obviously at a performance cost) if desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15419365#comment-15419365 ] Todd Palino commented on KAFKA-3959: Agree with Onur 100% here. We've been running into this a lot lately, and we never know about it until we do a rolling restart of the cluster and consumers break when the topic goes offline. > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic
[ https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323930#comment-15323930 ] Todd Palino commented on KAFKA-3797: Obviously we can't do something like that with a running consumer. But we do it all the time with inactive consumers. I haven't looked at the most recent changes to the offset API yet, so I'm not sure whether or not there are restrictions in place that would allow a client that is not part of the group to commit offsets for the group. > Improve security of __consumer_offsets topic > > > Key: KAFKA-3797 > URL: https://issues.apache.org/jira/browse/KAFKA-3797 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Vahid Hashemian > > By default, we allow clients to override committed offsets and group metadata > using the Produce API as long as they have Write access to the > __consumer_offsets topic. From one perspective, this is fine: administrators > can restrict access to this topic to trusted users. From another, it seems > less than ideal for Write permission on that topic to subsume Group-level > permissions for the full cluster. With this access, a user can cause all > kinds of mischief including making the group "lose" data by setting offsets > ahead of the actual position. This is probably not obvious to administrators > who grant access to topics using a wildcard and it increases the risk from > incorrectly applying topic patterns (if we ever add support for them). It > seems reasonable to consider safer default behavior: > 1. A simple option to fix this would be to prevent wildcard topic rules from > applying to internal topics. To write to an internal topic, you need a > separate rule which explicitly grants authorization to that topic. > 2. A more extreme and perhaps safer option might be to prevent all writes to > this topic (and potentially other internal topics) through the Produce API. > Do we have any use cases which actually require writing to > __consumer_offsets? The only potential case that comes to mind is replication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic
[ https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318854#comment-15318854 ] Todd Palino commented on KAFKA-3797: I think the first option is more reasonable, and provides sufficient security. I can see cases where you might want to have an external tool for changing a group's committed offsets. > Improve security of __consumer_offsets topic > > > Key: KAFKA-3797 > URL: https://issues.apache.org/jira/browse/KAFKA-3797 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson > > By default, we allow clients to override committed offsets and group metadata > using the Produce API as long as they have Write access to the > __consumer_offsets topic. From one perspective, this is fine: administrators > can restrict access to this topic to trusted users. From another, it seems > less than ideal for Write permission on that topic to subsume Group-level > permissions for the full cluster. With this access, a user can cause all > kinds of mischief including making the group "lose" data by setting offsets > ahead of the actual position. This is probably not obvious to administrators > who grant access to topics using a wildcard and it increases the risk from > incorrectly applying topic patterns (if we ever add support for them). It > seems reasonable to consider safer default behavior: > 1. A simple option to fix this would be to prevent wildcard topic rules from > applying to internal topics. To write to an internal topic, you need a > separate rule which explicitly grants authorization to that topic. > 2. A more extreme and perhaps safer option might be to prevent all writes to > this topic (and potentially other internal topics) through the Produce API. > Do we have any use cases which actually require writing to > __consumer_offsets? The only potential case that comes to mind is replication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3725) Update documentation with regards to XFS
[ https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312288#comment-15312288 ] Todd Palino commented on KAFKA-3725: I'll take a look at this and put together a PR with some updates. > Update documentation with regards to XFS > > > Key: KAFKA-3725 > URL: https://issues.apache.org/jira/browse/KAFKA-3725 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma > Assignee: Todd Palino > > Our documentation currently states that only Ext4 has been tried (by > LinkedIn): > "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS > supposedly handle locking during fsync better. We have only tried Ext4, > though." > http://kafka.apache.org/documentation.html#ext4 > I think this is no longer true, so we should update the documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3725) Update documentation with regards to XFS
[ https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino reassigned KAFKA-3725: -- Assignee: Todd Palino > Update documentation with regards to XFS > > > Key: KAFKA-3725 > URL: https://issues.apache.org/jira/browse/KAFKA-3725 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma > Assignee: Todd Palino > > Our documentation currently states that only Ext4 has been tried (by > LinkedIn): > "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS > supposedly handle locking during fsync better. We have only tried Ext4, > though." > http://kafka.apache.org/documentation.html#ext4 > I think this is no longer true, so we should update the documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: reading the consumer offsets topic
The GroupMetadataManager one should be working for you with 0.9. I don’t have a 0.9 KCC set up at the moment, so I’m using an 0.8 version where it’s different (it’s the other class for me). The only thing I can offer now is did you put quotes around the arg to --formatter so you don’t get weird shell interference? -Todd On Mon, May 9, 2016 at 8:18 AM, Cliff Rhyne <crh...@signal.co> wrote: > Thanks, Todd. It's still not working unfortunately. > > This results in nothing getting printed to the console and requires kill -9 > in another window to stop (ctrl-c doesn't work): > > /kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper > --topic __consumer_offsets --formatter > kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter > > This results in a stack trace because it can't find the class: > > ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper > --topic __consumer_offsets --formatter > kafka.server.OffsetManager\$OffsetsMessageFormatter > > Exception in thread "main" java.lang.ClassNotFoundException: > kafka.server.OffsetManager$OffsetsMessageFormatter > > > I'm on 0.9.0.1. "broker-list" is invalid and zookeeper is required > regardless of the bootstrap-server parameter. > > > Thanks, > > Cliff > > On Sun, May 8, 2016 at 7:35 PM, Todd Palino <tpal...@gmail.com> wrote: > > > It looks like you’re just missing the proper message formatter. Of > course, > > that largely depends on your version of the broker. Try: > > > > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic > > __consumer_offsets > > --formatter > kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter > > > > > > If for some reason that doesn’t work, you can try > > "kafka.server.OffsetManager\$OffsetsMessageFormatter” instead. > > > > -Todd > > > > > > > > > > On Sun, May 8, 2016 at 1:26 PM, Cliff Rhyne <crh...@signal.co> wrote: > > > > > I'm having difficulty reading the consumer offsets topic from the > command > > > line. I try the following but it doesn't seem to work (along with a > few > > > related variants including specifying the zookeeper hosts): > > > > > > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic > > > __consumer_offsets > > > > > > Is there a special way to read the consumer offsets topic? > > > > > > Thanks, > > > Cliff > > > > > > -- > > > Cliff Rhyne > > > Software Engineering Manager > > > e: crh...@signal.co > > > signal.co > > > > > > > > > Cut Through the Noise > > > > > > This e-mail and any files transmitted with it are for the sole use of > the > > > intended recipient(s) and may contain confidential and privileged > > > information. Any unauthorized use of this email is strictly prohibited. > > > ©2016 Signal. All rights reserved. > > > > > > > > > > > -- > > *—-* > > *Todd Palino* > > Staff Site Reliability Engineer > > Data Infrastructure Streaming > > > > > > > > linkedin.com/in/toddpalino > > > > > > -- > Cliff Rhyne > Software Engineering Manager > e: crh...@signal.co > signal.co > > > Cut Through the Noise > > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. Any unauthorized use of this email is strictly prohibited. > ©2016 Signal. All rights reserved. > -- *—-* *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
Re: reading the consumer offsets topic
It looks like you’re just missing the proper message formatter. Of course, that largely depends on your version of the broker. Try: ./kafka-console-consumer.sh --broker-list localhost:9092 --topic __consumer_offsets --formatter kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter If for some reason that doesn’t work, you can try "kafka.server.OffsetManager\$OffsetsMessageFormatter” instead. -Todd On Sun, May 8, 2016 at 1:26 PM, Cliff Rhyne <crh...@signal.co> wrote: > I'm having difficulty reading the consumer offsets topic from the command > line. I try the following but it doesn't seem to work (along with a few > related variants including specifying the zookeeper hosts): > > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic > __consumer_offsets > > Is there a special way to read the consumer offsets topic? > > Thanks, > Cliff > > -- > Cliff Rhyne > Software Engineering Manager > e: crh...@signal.co > signal.co > > > Cut Through the Noise > > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. Any unauthorized use of this email is strictly prohibited. > ©2016 Signal. All rights reserved. > -- *—-* *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
[jira] [Commented] (KAFKA-3174) Re-evaluate the CRC32 class performance.
[ https://issues.apache.org/jira/browse/KAFKA-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15126292#comment-15126292 ] Todd Palino commented on KAFKA-3174: Yeah, definitely no problems with Java 1.8. We've been running 1.8 u5 for quite some time, and we're in the process of updating to u40. It's worth noting that we have been running into a number of SEGVs with mirror maker (but not the broker) under u5, but the problem is supposedly fixed in u40. > Re-evaluate the CRC32 class performance. > > > Key: KAFKA-3174 > URL: https://issues.apache.org/jira/browse/KAFKA-3174 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin > Fix For: 0.9.0.1 > > > We used org.apache.kafka.common.utils.CRC32 in clients because it has better > performance than java.util.zip.CRC32 in Java 1.6. > In a recent test I ran it looks in Java 1.8 the CRC32 class is 2x as fast as > the Crc32 class we are using now. We may want to re-evaluate the performance > of Crc32 class and see it makes sense to simply use java CRC32 instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
> returned from the interceptor. > > > > > > > > > > > > > > > > > > > > > > > > Please let me know if you have any suggestions or objections to > the > > > > > above. > > > > > > > > > > > > Thanks, > > > > > > Anna > > > > > > > > > > > > > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <a...@confluent.io > > > > > > wrote: > > > > > > > > > > > > > Hi Mayuresh, > > > > > > > > > > > > > > I see why you would want to check for messages left in the > > > > > > > RecordAccumulator. However, I don't think this will completely > > > solve > > > > > the > > > > > > > problem. Messages could be in-flight somewhere else, like in > the > > > > > socket, > > > > > > or > > > > > > > there maybe in-flight messages on the consumer side of the > > > > MirrorMaker. > > > > > > So, > > > > > > > if we go the route of checking whether there are any in-flight > > > > messages > > > > > > for > > > > > > > topic deletion use-case, maybe it is better count them with > > > onSend() > > > > > and > > > > > > > onAcknowledge() -- whether all messages sent were > acknowledged. I > > > > also > > > > > > > think that it would be better to solve this without > interceptors, > > > > such > > > > > as > > > > > > > fix error handling in this scenario. However, I do not have any > > > good > > > > > > > proposal right now, so these are just general thoughts. > > > > > > > > > > > > > > Thanks, > > > > > > > Anna > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat < > > > > > > > gharatmayures...@gmail.com> wrote: > > > > > > > > > > > > > >> Calling producer.flush(), flushes all the data. So this is OK. > > But > > > > > when > > > > > > >> you > > > > > > >> are running Mirror maker, I am not sure there is a way to > > flush() > > > > from > > > > > > >> outside. > > > > > > >> > > > > > > >> > > > > > > >> Thanks, > > > > > > >> > > > > > > >> Mayuresh > > > > > > >> > > > > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin < > > > becket@gmail.com> > > > > > > >> wrote: > > > > > > >> > > > > > > >> > Mayuresh, > > > > > > >> > > > > > > > >> > Regarding your use case about mirror maker. Is it good > enough > > as > > > > > long > > > > > > >> as we > > > > > > >> > know there is no message for the topic in the producer > > anymore? > > > If > > > > > > that > > > > > > >> is > > > > > > >> > the case, call producer.flush() is sufficient. > > > > > > >> > > > > > > > >> > Thanks, > > > > > > >> > > > > > > > >> > Jiangjie (Becket) Qin > > > > > > >> > > > > > > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat < > > > > > > >> > gharatmayures...@gmail.com > > > > > > >> > > wrote: > > > > > > >> > > > > > > > >> > > Hi Anna, > > > > > > >> > > > > > > > > >> > > Thanks a lot for summarizing the discussion on this kip. > > > > > > >> > > > > > > > > >> > > It LGTM. > > > > > > >> > > This is really nice : > > > > > > >> > > We decided not to add any callbacks to producer and > consumer > > > > > > >> > > interceptors t
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
gt; > > > and we won't need a console auditor. > > > > > > > > One potential issue in this approach and any elaborate on-arrival > > > > processing for that matter is that you may need to deserialize the > > > message > > > > as well which can drive up produce request handling times. However > I'm > > > not > > > > terribly concerned about that especially if the audit header can be > > > > separated out easily or even deserialized partially as this Avro > thread > > > > touches on > > > > > > > > > > > > > > http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ > > > > > > > > Thanks, > > > > > > > > Joel > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < > > > > gharatmayures...@gmail.com> wrote: > > > > > > > > > Nice KIP. Excellent idea. > > > > > Was just thinking if we can add onDequed() to the > ProducerIterceptor > > > > > interface. Since we have the onEnqueued(), it will help the client > or > > > the > > > > > tools to know how much time the message spent in the > > RecordAccumulator. > > > > > Also an API to check if there are any messages left for a > particular > > > > topic > > > > > in the RecordAccumulator would help. > > > > > > > > > > Thanks, > > > > > > > > > > Mayuresh > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino <tpal...@gmail.com> > > > wrote: > > > > > > > > > > > Great idea. I’ve been talking about this for 2 years, and I’m > glad > > > > > someone > > > > > > is finally picking it up. Will take a look at the KIP at some > point > > > > > > shortly. > > > > > > > > > > > > -Todd > > > > > > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps <j...@confluent.io> > > > wrote: > > > > > > > > > > > > > Hey Becket, > > > > > > > > > > > > > > Yeah this is really similar to the callback. The difference is > > > really > > > > > in > > > > > > > who sets the behavior. The idea of the interceptor is that it > > > doesn't > > > > > > > require any code change in apps so you can globally add > behavior > > to > > > > > your > > > > > > > Kafka usage without changing app code. Whereas the callback is > > > added > > > > by > > > > > > the > > > > > > > app. The idea is to kind of obviate the need for the wrapper > code > > > > that > > > > > > e.g. > > > > > > > LinkedIn maintains to hold this kind of stuff. > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin < > > becket@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > This could be a useful feature. And I think there are some > use > > > > cases > > > > > to > > > > > > > > mutate the data like rejected alternative one mentioned. > > > > > > > > > > > > > > > > I am wondering if there is functional overlapping between > > > > > > > > ProducerInterceptor.onAcknowledgement() and the producer > > > callback? > > > > I > > > > > > can > > > > > > > > see that the Callback could be a per record setting while > > > > > > > > onAcknowledgement() is a producer level setting. Other than > > that, > > > > is > > > > > > > there > > > > > > > > any difference between them? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede < > > > n...@confluent.io> > > > > > > > wrote: > > > > > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
, > > > > Mayuresh > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner <a...@confluent.io> > wrote: > > > > > Thanks Ismael and Todd for your feedback! > > > > > > I agree about coming up with lean, but useful interfaces that will be > > easy > > > to extend later. > > > > > > When we discuss the minimal set of producer and consumer interceptor > API > > in > > > today’s KIP meeting (discussion item #2 in my previous email), lets > > compare > > > two options: > > > > > > *1. Minimal set of immutable API for producer and consumer > interceptors* > > > > > > ProducerInterceptor: > > > public void onSend(ProducerRecord<K, V> record); > > > public void onAcknowledgement(RecordMetadata metadata, Exception > > > exception); > > > > > > ConsumerInterceptor: > > > public void onConsume(ConsumerRecords<K, V> records); > > > public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); > > > > > > Use-cases: > > > — end-to-end monitoring; custom tracing and logging > > > > > > > > > *2. Minimal set of mutable API for producer and consumer interceptors* > > > > > > ProducerInterceptor: > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); > > > void onAcknowledgement(RecordMetadata metadata, Exception exception); > > > > > > ConsumerInterceptor: > > > void onConsume(ConsumerRecords<K, V> records); > > > void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); > > > > > > Additional use-cases to #1: > > > — Ability to add metadata to a message or fill in standard fields for > > audit > > > and routing. > > > > > > Implications > > > — Partition assignment will be done based on modified key/value instead > > of > > > original key/value. If key/value transformation is not consistent (same > > key > > > and value does not mutate to the same, but modified, key/value), then > log > > > compaction would not work. However, audit and routing use-cases from > Todd > > > will likely do consistent transformation. > > > > > > > > > *Additional callbacks (discussion item #3 in my previous email):* > > > > > > If we want to support encryption, we would want to be able to modify > > > serialized key/value, rather than key and value objects. This will add > > the > > > following API to producer and consumer interceptors: > > > > > > ProducerInterceptor: > > > SerializedKeyValue onEnqueued(TopicPartition tp, ProducerRecord<K, V> > > > record, SerializedKeyValue serializedKeyValue); > > > > > > ConsumerInterceptor: > > > SerializedKeyValue onReceive(TopicPartition tp, SerializedKeyValue > > > serializedKeyValue); > > > > > > > > > I am leaning towards implementing the minimal set of immutable or > mutable > > > interfaces, making sure that we have a compatibility plan that allows > us > > to > > > add more callbacks in the future (per Ismael comment), and add more > APIs > > > later. E.g., for encryption use-case, there could be an argument in > doing > > > encryption after message compression vs. per-record encryption that > could > > > be done using the above additional API. There is also more implications > > for > > > every API that modifies records: modifying serialized key/value will > > again > > > impact partition assignment (we will likely do that after partition > > > assignment), which may impact log compaction and mirror maker > > partitioning. > > > > > > > > > Thanks, > > > Anna > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino <tpal...@gmail.com> > wrote: > > > > > > > Finally got a chance to take a look at this. I won’t be able to make > > the > > > > KIP meeting due to a conflict. > > > > > > > > I’m somewhat disappointed in this proposal. I think that the explicit > > > > exclusion of modification of the messages is short-sighted, and not > > > > accounting for it now is going to bite us later. Jay, aren’t you the > > one > > > > railing against public interfaces and how difficult they are to work > > with > > > > when you don’t get them right? The “simple” change to one of these > > > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Great idea. I’ve been talking about this for 2 years, and I’m glad someone is finally picking it up. Will take a look at the KIP at some point shortly. -Todd On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps <j...@confluent.io> wrote: > Hey Becket, > > Yeah this is really similar to the callback. The difference is really in > who sets the behavior. The idea of the interceptor is that it doesn't > require any code change in apps so you can globally add behavior to your > Kafka usage without changing app code. Whereas the callback is added by the > app. The idea is to kind of obviate the need for the wrapper code that e.g. > LinkedIn maintains to hold this kind of stuff. > > -Jay > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin <becket@gmail.com> wrote: > > > This could be a useful feature. And I think there are some use cases to > > mutate the data like rejected alternative one mentioned. > > > > I am wondering if there is functional overlapping between > > ProducerInterceptor.onAcknowledgement() and the producer callback? I can > > see that the Callback could be a per record setting while > > onAcknowledgement() is a producer level setting. Other than that, is > there > > any difference between them? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede <n...@confluent.io> > wrote: > > > > > James, > > > > > > That is one of the many monitoring use cases for the interceptor > > interface. > > > > > > Thanks, > > > Neha > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng <jch...@tivo.com> wrote: > > > > > > > Anna, > > > > > > > > I'm trying to understand a concrete use case. It sounds like producer > > > > interceptors could be used to implement part of LinkedIn's Kafak > Audit > > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale > > > > > > > > Part of that is done by a wrapper library around the kafka producer > > that > > > > keeps a count of the number of messages produced, and then sends that > > > count > > > > to a side-topic. It sounds like the producer interceptors could > > possibly > > > be > > > > used to implement that? > > > > > > > > -James > > > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner <a...@confluent.io> > wrote: > > > > > > > > > > Hi, > > > > > > > > > > I just created a KIP-42 for adding producer and consumer > interceptors > > > for > > > > > intercepting messages at different points on producer and consumer. > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > > > > > > > Comments and suggestions are welcome! > > > > > > > > > > Thanks, > > > > > Anna > > > > > > > > > > > > > > > > > > > > This email and any attachments may contain confidential and > privileged > > > > material for the sole use of the intended recipient. Any review, > > copying, > > > > or distribution of this email (or any attachments) by others is > > > prohibited. > > > > If you are not the intended recipient, please contact the sender > > > > immediately and permanently delete this email and any attachments. No > > > > employee or agent of TiVo Inc. is authorized to conclude any binding > > > > agreement on behalf of TiVo Inc. by email. Binding agreements with > TiVo > > > > Inc. may only be made by a signed written agreement. > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Neha > > > > > > -- *—-* *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
[jira] [Commented] (KAFKA-3015) Improve JBOD data balancing
[ https://issues.apache.org/jira/browse/KAFKA-3015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15065636#comment-15065636 ] Todd Palino commented on KAFKA-3015: [~jkreps] So yes, I'm essentially saying that I would prefer to see optimizations to the current partitioning scheme, and the addition of being able to handle single disk failures without terminating the entire broker. I would argue that this would have a higher payoff for people who do not have the ability to easily swap in new machines (as we do, or those in AWS would), because it will allow for more granular failures. Disks tend to fail more than any other component, so the ability to survive a disk failure should be attractive to anyone. As noted, there are a lot of benefits of using JBOD, and I would not argue against having good support for it. Especially as we've been moving to new hardware, we now can't take advantage of all the network we have with 10gig interfaces primarily due to limitations on disk capacity and throughput. Additionally, we'd like to move to RF=3, but can't stomach it because of the cost. If we drop the RAID 10 we will significantly increase throughput and double our storage capacity. Then we can easily move to RF=3 (using 50% more disk) with little impact. > Improve JBOD data balancing > --- > > Key: KAFKA-3015 > URL: https://issues.apache.org/jira/browse/KAFKA-3015 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps > > When running with multiple data directories (i.e. JBOD) we currently place > partitions entirely within one data directory. This tends to lead to poor > balancing across disks as some topics have more throughput/retention and not > all disks get data from all topics. You can't fix this problem with smarter > partition placement strategies because ultimately you don't know when a > partition is created when or how heavily it will be used (this is a subtle > point, and the tendency is to try to think of some more sophisticated way to > place partitions based on current data size but this is actually > exceptionally dangerous and can lead to much worse imbalance when creating > many partitions at once as they would all go to the disk with the least > data). We don't support online rebalancing across directories/disks so this > imbalance is a big problem and limits the usefulness of this configuration. > Implementing online rebalancing of data across disks without downtime is > actually quite hard and requires lots of I/O since you have to actually > rewrite full partitions of data. > An alternative would be to place each partition in *all* directories/drives > and round-robin *segments* within the partition across the directories. So > the layout would be something like: > drive-a/mytopic-0/ > 000.data > 000.index > 0024680.data > 0024680.index > drive-a/mytopic-0/ > 0012345.data > 0012345.index > 0036912.data > 0036912.index > This is a little harder to implement than the current approach but not very > hard, and it is a lot easier than implementing online data balancing across > disks while retaining the current approach. I think this could easily be done > in a backwards compatible way. > I think the balancing you would get from this in most cases would be good > enough to make JBOD the default configuration. Thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3015) Improve JBOD data balancing
[ https://issues.apache.org/jira/browse/KAFKA-3015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15065239#comment-15065239 ] Todd Palino commented on KAFKA-3015: While this seems good on the surface, it makes it impossible to continue running the broker on a single disk failure. This is one of our primary complaints about JBOD, and one of the main reasons we cannot use it (as much as we would like to). > Improve JBOD data balancing > --- > > Key: KAFKA-3015 > URL: https://issues.apache.org/jira/browse/KAFKA-3015 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps > > When running with multiple data directories (i.e. JBOD) we currently place > partitions entirely within one data directory. This tends to lead to poor > balancing across disks as some topics have more throughput/retention and not > all disks get data from all topics. You can't fix this problem with smarter > partition placement strategies because ultimately you don't know when a > partition is created when or how heavily it will be used (this is a subtle > point, and the tendency is to try to think of some more sophisticated way to > place partitions based on current data size but this is actually > exceptionally dangerous and can lead to much worse imbalance when creating > many partitions at once as they would all go to the disk with the least > data). We don't support online rebalancing across directories/disks so this > imbalance is a big problem and limits the usefulness of this configuration. > Implementing online rebalancing of data across disks without downtime is > actually quite hard and requires lots of I/O since you have to actually > rewrite full partitions of data. > An alternative would be to place each partition in *all* directories/drives > and round-robin *segments* within the partition across the directories. So > the layout would be something like: > drive-a/mytopic-0/ > 000.data > 000.index > 0024680.data > 0024680.index > drive-a/mytopic-0/ > 0012345.data > 0012345.index > 0036912.data > 0036912.index > This is a little harder to implement than the current approach but not very > hard, and it is a lot easier than implementing online data balancing across > disks while retaining the current approach. I think this could easily be done > in a backwards compatible way. > I think the balancing you would get from this in most cases would be good > enough to make JBOD the default configuration. Thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-30 Allow for brokers to have plug-able consensus and meta data storage sub systems
more pain. One of the things that > > > worked with ZK was to have a community and a group of people > responsible > > > for thinking about it and maintaining it. > > > > > > > > > > > I don't have a proposal for how this would work and it's some effort > to > > > > scope it out. The obvious thing to do would just be to keep the > > existing > > > > ISR/Controller setup and rebuild the controller etc on a RAFT/Paxos > > impl > > > > using the Kafka network/log/etc and have a replicated config database > > > > (maybe rocksdb) that was fed off the log and shared by all nodes. > > > > > > > > > > In principle, you need mechanisms like watches, ephemerals, and just > pure > > > storage. Just implement a replicated state machine with those > operations. > > > Watches and ephemerals make it a bit hard to do using a REST API, but > it > > is > > > fine if you don't care about doing long polls. Perhaps this is too much > > > detail for the status of this discussion. > > > > > > > > > > If done well this could have the advantage of potentially allowing us > > to > > > > scale the number of partitions quite significantly (the k/v store > would > > > not > > > > need to be all in memory), though you would likely still have limits > on > > > the > > > > number of partitions per machine. This would make the minimum Kafka > > > cluster > > > > size be just your replication factor. > > > > > > > > > > The idea of not having it in memory to scale isn't a bad one, specially > > if > > > you SSDs to cache data and such. > > > > > > > > > > People tend to feel that implementing things like RAFT or Paxos is > too > > > hard > > > > for mere mortals. But I actually think it is within our capabilities, > > and > > > > our testing capabilities as well as experience with this type of > thing > > > have > > > > improved to the point where we should not be scared off if it is the > > > right > > > > path. > > > > > > It is hard, but certainly not impossible. I have implemented quite a > few > > > Paxos prototypes over the years for experiments, and getting off the > > ground > > > isn't hard, but getting an implementation that really works and that it > > > maintainable is difficult. I've seen more than one instance where > people > > > got it wrong and caused pain. But again, maybe times have changed and > > that > > > won't happen here. > > > > > > -Flavio > > > > > > > > > > > This approach is likely more work then plugins (though maybe not, > once > > > you > > > > factor in all the docs, testing, etc) but if done correctly it would > be > > > an > > > > unambiguous step forward--a simpler, more scalable implementation > with > > no > > > > operational dependencies. > > > > > > > > Thoughts? > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 1, 2015 at 11:12 AM, Joe Stein <joe.st...@stealth.ly> > > wrote: > > > > > > > >> I would like to start a discussion around the work that has started > in > > > >> regards to KIP-30 > > > >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-30+-+Allow+for+brokers+to+have+plug-able+consensus+and+meta+data+storage+sub+systems > > > >> > > > >> The impetus for working on this came a lot from the community. For > the > > > last > > > >> year(~+) it has been the most asked question at any talk I have > given > > > >> (personally speaking). It has come up a bit also on the mailing list > > > >> talking about zkclient vs currator. A lot of folks want to use Kafka > > but > > > >> introducing dependencies are hard for the enterprise so the goals > > behind > > > >> this is making it so that using Kafka can be done as easy as > possible > > > for > > > >> the operations teams to-do when they do. If they are already > > supporting > > > >> ZooKeeper they can keep doing that but if not they want (users) to > use > > > >> something else they are already supporting that can plug-in to-do > the > > > same > > > >> things. > > > >> > > > >> For the core project I think we should leave in upstream what we > have. > > > This > > > >> gives a great baseline regression for folks and makes the work for > > > "making > > > >> what we have plug-able work" a good defined task (carve out, layer > in > > > API > > > >> impl, push back tests pass). From there then when folks want their > > > >> implementation to be something besides ZooKeeper they can develop, > > test > > > and > > > >> support that if they choose. > > > >> > > > >> We would like to suggest that we have the plugin interface be Java > > based > > > >> for minimizing depends for JVM impl. This could be in another > > directory > > > >> something TBD /. > > > >> > > > >> If you have a server you want to try to get it working but you > aren't > > on > > > >> the JVM don't be afraid just think about a REST impl and if you can > > work > > > >> inside of that you have some light RPC layers (this was the first > pass > > > >> prototype we did to flush-out the public api presented on the KIP). > > > >> > > > >> There are a lot of parts to working on this and the more > > > implementations we > > > >> have the better we can flush out the public interface. I will leave > > the > > > >> technical details and design to JIRA tickets that are linked through > > the > > > >> confluence page as these decisions come about and code starts for > > > reviews > > > >> and we can target the specific modules having the context separate > is > > > >> helpful especially if multiple folks are working on it. > > > >> https://issues.apache.org/jira/browse/KAFKA-2916 > > > >> > > > >> Do other folks want to build implementations? Maybe we should start > a > > > >> confluence page for those or use an existing one and add to it so we > > can > > > >> coordinate some there to. > > > >> > > > >> Thanks! > > > >> > > > >> ~ Joe Stein > > > >> - - - - - - - - - - - - - - - - - - - > > > >> [image: Logo-Black.jpg] > > > >> http://www.elodina.net > > > >>http://www.stealth.ly > > > >> - - - - - - - - - - - - - - - - - - - > > > >> > > > > > > > > > -- *—-* *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
Re: [DISCUSS] KIP-30 Allow for brokers to have plug-able consensus and meta data storage sub systems
Come on, Jay. Anyone can get up in the morning and run if they have the willpower :) Granted I do have some bias here, since we have tooling in place that makes deployments and monitoring easier. But even at that, I would not say Zookeeper is difficult to run or monitor. I’m not denying that there are complaints, but my experience has always been that complaints of that type are either related more to the specific way the dependency is implemented (that is, Kafka’s not using it correctly or is otherwise generating errors that say “Zookeeper” in them), or it’s related to a bias against the dependency (I don’t like Zookeeper, I have XYZ installed, etc.). The point that for a small installation Zookeeper can represent a large footprint is well made. I wonder how many of these people are being ill-served by recommendations from people like me that you should not run Kafka and Zookeeper on the same systems. Sure, we’d never do that at LinkedIn just because we are looking for high performance and a few more systems isn’t a big deal. But for a lower performance environment, it’s really not a problem to colocate the applications. As far as the controller goes, I’m perfectly willing to accept that my desire to get rid of it is from my bias against it because of how many problems we’ve run into with that code. We can probably both agree that the controller code needs an overhaul regardless. It’s stood up well, but as the clusters get larger it’s definitely shows cracks. -Todd On Thu, Dec 3, 2015 at 11:37 AM, Jay Kreps <j...@confluent.io> wrote: > Hey Todd, > > I actually agree on both counts. > > I would summarize the first comment as "Zookeeper is not hard to > operationalize if you are Todd Palino"--also in that category of > things that are not hard: running 13 miles at 5:00 am. Basically I > totally agree that ZK is now a solved problem at LinkedIn. :-) > > Empirically, though, it is really hard for a lot of our users. It is > one of the largest sources of problems we see in people's clusters. We > could perhaps get part of the way by improving our zk usage and > documentation, and it is certainly the case that we could potentially > make things worse in trying to make them better, but I don't think > that is the same as saying there isn't a problem. > > I totally agree with your second comment. In some sense what I was > sketching out is just replacing ZK. But part of the design of Kafka > was because we already had ZK. So there might be a way to further > rationalize the metadata log and the data logs if you kind of went > back to first principles and thought about it. I don't have any idea > how, but I share that intuition. > > I do think having the controller, though, is quite useful. I think > this pattern of avoiding many rounds of consensus by just doing one > round to pick a leader is a good one. If you think about it Paxos => > Multi-paxos is basically optimizing by lumping together consensus > rounds on a per message basis into a leader which then handles many > messages, and what Kafka does is kind of like Multi-multi-paxos in > that it lumps together many leadership elections into one central > controller election which then picks all the leaders. In some ways > having central decision makers seems inelegant (aren't we supposed to > be distributed?) but it does allow you to be both very very fast in > making lots of decisions (vs doing thousands of independent leadership > elections) and also to do things that require global knowledge (like > balancing leadership). > > Cheers, > > -Jay > > > > On Thu, Dec 3, 2015 at 10:05 AM, Todd Palino <tpal...@gmail.com> wrote: > > This kind of discussion always puts me in mind of stories that start “So > I > > wrote my own encryption. How hard can it be?” :) > > > > Joking aside, I do have a few thoughts on this. First I have to echo > Joel’s > > perspective on Zookeeper. Honestly, it is one of the few applications we > > can forget about, so I have a hard time understanding pain around running > > it. You set it up, and unless you have a hardware failure to deal with, > > that’s it. Yes, there are ways to abusively use it, just like with any > > application, but Kafka is definitely not one of those use cases. I also > > disagree that it’s hard to share the ZK cluster safely. We do it all the > > time. We share most often with other Kafka clusters, but we also share > with > > other applications. This goes back to the “abusive use pattern”. Yes, we > > don’t share with them. Nobody should. Yes, it requires monitoring and you > > have to configure it correctly. But configuration is a one-time charge > > (both in terms of capital expenses and knowledge acquisition) and as > > applications
[jira] [Commented] (KAFKA-2759) Mirror maker can leave gaps of missing messages if the process dies after a partition is reset and before the first offset commit
[ https://issues.apache.org/jira/browse/KAFKA-2759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14994590#comment-14994590 ] Todd Palino commented on KAFKA-2759: The thought we had internally on the situation in KAFKA-1006 was that we should differentiate the behavior between what happens if a consumer has no offsets at all for a partition and what happens if the consumer thinks it has offsets, but is out of range. These are two different situations. In the case of mirror maker, we would want "earliest" behavior for the first situation (whether default or configured, doesn't matter), as we never want to lose data from new topics or partitions. For the second situation we want "closest". That is, if your out of range offset is closer to the earliest offset, start there. Otherwise, if your out of range offset is closer to the latest offset, start there. There's a little more detail involved on this, as there are intricacies with the fetch response. [~jjkoshy] has more information on that discussion. > Mirror maker can leave gaps of missing messages if the process dies after a > partition is reset and before the first offset commit > - > > Key: KAFKA-2759 > URL: https://issues.apache.org/jira/browse/KAFKA-2759 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.2 >Reporter: Ewen Cheslack-Postava >Priority: Minor > > Based on investigation of KAFKA-2747. When mirror maker first starts or if it > picks up new topics/partitions, it will use the reset policy to choose where > to start. By default this uses 'latest'. If it starts reading messages and > then dies before committing offsets for the first time, then the mirror maker > that takes over that partition will also reset. This can result in some > messages making it to the consumer, then a gap that were skipped, and then > messages that get processed by the new MM process. > One solution to this problem would be to make sure that offsets are committed > after they are reset but before the first message is passed to the producer. > In other words, in the case of a reset, MM should record where it's going to > start reading data from before processing any messages. This guarantees all > messages after the first one delivered by MM will appear at least once. > This is minor since it should be very rare, but it does break an assumption > that people probably make about the output -- once you start receiving data, > you aren't guaranteed all subsequent messages will appear at least once. > This same issue could affect Copycat as well. In fact, it may be generally > useful to allow consumers to know when the offset was reset so they can > handle cases like this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2235) LogCleaner offset map overflow
[ https://issues.apache.org/jira/browse/KAFKA-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14971806#comment-14971806 ] Todd Palino commented on KAFKA-2235: I don't think we can. I have already increased it from 512MB to 1GB, and we still hit the same problems. That only provides a 2x increase in the size of the map, and I would need almost a 10x increase to solve the problem. > LogCleaner offset map overflow > -- > > Key: KAFKA-2235 > URL: https://issues.apache.org/jira/browse/KAFKA-2235 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 0.8.1, 0.8.2.0 >Reporter: Ivan Simoneko >Assignee: Ivan Simoneko > Fix For: 0.9.0.0 > > Attachments: KAFKA-2235_v1.patch, KAFKA-2235_v2.patch > > > We've seen log cleaning generating an error for a topic with lots of small > messages. It seems that cleanup map overflow is possible if a log segment > contains more unique keys than empty slots in offsetMap. Check for baseOffset > and map utilization before processing segment seems to be not enough because > it doesn't take into account segment size (number of unique messages in the > segment). > I suggest to estimate upper bound of keys in a segment as a number of > messages in the segment and compare it with the number of available slots in > the map (keeping in mind desired load factor). It should work in cases where > an empty map is capable to hold all the keys for a single segment. If even a > single segment no able to fit into an empty map cleanup process will still > fail. Probably there should be a limit on the log segment entries count? > Here is the stack trace for this error: > 2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] > kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to > java.lang.IllegalArgumentException: requirement failed: Attempt to add a new > entry to a full offset map. >at scala.Predef$.require(Predef.scala:233) >at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79) >at > kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543) >at > kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538) >at scala.collection.Iterator$class.foreach(Iterator.scala:727) >at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >at kafka.message.MessageSet.foreach(MessageSet.scala:67) >at > kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538) >at > kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515) >at > kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512) >at scala.collection.immutable.Stream.foreach(Stream.scala:547) >at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512) >at kafka.log.Cleaner.clean(LogCleaner.scala:307) >at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221) >at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199) >at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2235) LogCleaner offset map overflow
[ https://issues.apache.org/jira/browse/KAFKA-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14971740#comment-14971740 ] Todd Palino commented on KAFKA-2235: I'm sure [~jjkoshy] will follow along with more detail on this, but we've run into a serious problem with this check. Basically, it's impossible to perform this kind of check accurately before the offset map is built. We now have partitions that should be able to be compacted as the total number of unique keys is far below the size of the offset map (currently at ~39 million for our configuration) but the messages are very frequent and very small. Even at a segment size of 64 MB, we have over 300 million messages in those segments. So this check creates a situation where log compaction should succeed, but fails because of a speculative check. While I can play the game of trying to walk back segment sizes, there's no way to size segments by number of messages, so it's a guessing game. In addition, the check is clearly wrong in that case, so I shouldn't have to config around it. Lastly, the check causes the log cleaner thread to exit, which means log compaction on the broker fails entirely, rather than just skipping that partition. A better way to handle this would be to cleanly catch the original error you are seeing, generate a clear error message in the logs as to what the failure is, and allow the log cleaner to continue and handle other partitions. You could also maintain a blacklist of partitions in memory in the log cleaner to make sure you don't come back around and try and compact the partition again. > LogCleaner offset map overflow > -- > > Key: KAFKA-2235 > URL: https://issues.apache.org/jira/browse/KAFKA-2235 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 0.8.1, 0.8.2.0 >Reporter: Ivan Simoneko >Assignee: Ivan Simoneko > Fix For: 0.9.0.0 > > Attachments: KAFKA-2235_v1.patch, KAFKA-2235_v2.patch > > > We've seen log cleaning generating an error for a topic with lots of small > messages. It seems that cleanup map overflow is possible if a log segment > contains more unique keys than empty slots in offsetMap. Check for baseOffset > and map utilization before processing segment seems to be not enough because > it doesn't take into account segment size (number of unique messages in the > segment). > I suggest to estimate upper bound of keys in a segment as a number of > messages in the segment and compare it with the number of available slots in > the map (keeping in mind desired load factor). It should work in cases where > an empty map is capable to hold all the keys for a single segment. If even a > single segment no able to fit into an empty map cleanup process will still > fail. Probably there should be a limit on the log segment entries count? > Here is the stack trace for this error: > 2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] > kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to > java.lang.IllegalArgumentException: requirement failed: Attempt to add a new > entry to a full offset map. >at scala.Predef$.require(Predef.scala:233) >at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79) >at > kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543) >at > kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538) >at scala.collection.Iterator$class.foreach(Iterator.scala:727) >at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >at kafka.message.MessageSet.foreach(MessageSet.scala:67) >at > kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538) >at > kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515) >at > kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512) >at scala.collection.immutable.Stream.foreach(Stream.scala:547) >at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512) >at kafka.log.Cleaner.clean(LogCleaner.scala:307) >at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221) >at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199) >at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-38: ZooKeeper Authentication
There seems to be a bit of detail lacking in the KIP. Specifically, I'd like to understand: 1) What znodes are the brokers going to secure? Is this configurable? How? 2) What ACL is the broker going to apply? Is this configurable? 3) How will the admin tools (such as preferred replica election and partition reassignment) interact with this? -Todd On Wed, Oct 21, 2015 at 9:16 AM, Ismael Jumawrote: > On Wed, Oct 21, 2015 at 5:04 PM, Flavio Junqueira wrote: > > > Bringing the points Grant brought to this thread: > > > > > Is it worth mentioning the follow up steps that were discussed in the > KIP > > > call in this KIP document? Some of them were: > > > > > > - Adding SSL support for Zookeeper > > > - Removing the "world readable" assumption > > > > > > > Grant, how would you do it? I see three options: > > > > 1- Add to the existing KIP, but then the functionality we should be > > checking in soon won't include it, so the KIP will remain incomplete > > > > A "Future work" section would make sense to me, but I don't know how this > is normally handled. > > Ismael >
Re: [VOTE] KIP-38: ZooKeeper authentication
I've added a reply on the discuss thread already. However, the point is that if there were changes as a result of the KIP call (which I often can't make on Tuesdays), it should be updated on the wiki page so everyone is aware of what is being voted on. -Todd On Wed, Oct 21, 2015 at 9:47 AM, Flavio Junqueira <f...@apache.org> wrote: > Todd, > > There is a discuss thread for this KIP and we talked about it during the > KIP call yesterday. I'm more than happy to add detail to the KIP if you > bring it up in the discuss thread. I'd rather not stop the vote thread, > though. > > Thanks, > -Flavio > > > On 21 Oct 2015, at 17:41, Todd Palino <tpal...@gmail.com> wrote: > > > > While this is a great idea, is it really ready for vote? I don't see any > > detail in the wiki about what trees will be secured, and whether or not > > that is configurable. I also don't see anything about how the use of > admin > > tools is going to be addressed. > > > > -Todd > > > > On Wed, Oct 21, 2015 at 8:48 AM, Grant Henke <ghe...@cloudera.com> > wrote: > > > >> +1 > >> > >> Is it worth mentioning the follow up steps that were discussed in the > KIP > >> call in this KIP document? Some of them were: > >> > >> - Adding SSL support for Zookeeper > >> - Removing the "world readable" assumption > >> > >> Thank you, > >> Grant > >> > >> On Wed, Oct 21, 2015 at 10:23 AM, Onur Karaman < > >> okara...@linkedin.com.invalid> wrote: > >> > >>> +1 > >>> > >>> On Wed, Oct 21, 2015 at 8:17 AM, Flavio Junqueira <f...@apache.org> > >> wrote: > >>> > >>>> Thanks everyone for the feedback so far. At this point, I'd like to > >> start > >>>> a vote for KIP-38. > >>>> > >>>> Summary: Add support for ZooKeeper authentication > >>>> KIP page: > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-38%3A+ZooKeeper+Authentication > >>>> < > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-38:+ZooKeeper+Authentication > >>>>> > >>>> > >>>> Thanks, > >>>> -Flavio > >>> > >> > >> > >> > >> -- > >> Grant Henke > >> Software Engineer | Cloudera > >> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke > >> > >
Re: [VOTE] KIP-38: ZooKeeper authentication
While this is a great idea, is it really ready for vote? I don't see any detail in the wiki about what trees will be secured, and whether or not that is configurable. I also don't see anything about how the use of admin tools is going to be addressed. -Todd On Wed, Oct 21, 2015 at 8:48 AM, Grant Henkewrote: > +1 > > Is it worth mentioning the follow up steps that were discussed in the KIP > call in this KIP document? Some of them were: > >- Adding SSL support for Zookeeper >- Removing the "world readable" assumption > > Thank you, > Grant > > On Wed, Oct 21, 2015 at 10:23 AM, Onur Karaman < > okara...@linkedin.com.invalid> wrote: > > > +1 > > > > On Wed, Oct 21, 2015 at 8:17 AM, Flavio Junqueira > wrote: > > > > > Thanks everyone for the feedback so far. At this point, I'd like to > start > > > a vote for KIP-38. > > > > > > Summary: Add support for ZooKeeper authentication > > > KIP page: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-38%3A+ZooKeeper+Authentication > > > < > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-38:+ZooKeeper+Authentication > > > > > > > > > > Thanks, > > > -Flavio > > > > > > -- > Grant Henke > Software Engineer | Cloudera > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke >
Re: [DISCUSS] KIP-38: ZooKeeper Authentication
Comments inline. In addition, the documentation on the migration path is good, but do we really need a separate utility? Would it be better to have checking and setting the ACLs be a function of the controller, possibly as a separate thread either only at controller startup or periodically, with information available about when the check runs and when it completes (I would like to see this in a metric - we already have problems determining whether or not the log compaction thread is running). This would provide continuous coverage on the ACLs. Also, what is the downgrade plan? On Wed, Oct 21, 2015 at 9:56 AM, Flavio Junqueira <f...@apache.org> wrote: > > > On 21 Oct 2015, at 17:47, Todd Palino <tpal...@gmail.com> wrote: > > > > There seems to be a bit of detail lacking in the KIP. Specifically, I'd > > like to understand: > > > > 1) What znodes are the brokers going to secure? Is this configurable? > How? > > Currently it is securing all paths here except the consumers one: > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L56 > < > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L56 > > > > This isn't configurable at the moment. > That's fine. As long as the consumers tree is exempted, and the admin tools continue to work properly, I don't see any problems with this not being configurable. All of those paths are specific to the brokers. > > 2) What ACL is the broker going to apply? Is this configurable? > > The default is CREATOR_ALL_ACL + READ_ACL_UNSAFE, which means that an > authenticated client can manipulate secured znodes and everyone can read > znodes. The API of ZkUtils accommodates other ACLs, but the current code is > using the default. > I think we should consider making this configurable. A specific use case I can see is that in an environment where you have multiple users of the Zookeeper that your Kafka cluster is in, you will want to separately protect those applications and Kafka. We may not want to do this as part of the initial KIP work, as I think it's important to get this in as soon as possible in some form (we have problems that this will address), but what do you think about making this improvement shortly thereafter? > > 3) How will the admin tools (such as preferred replica election and > > partition reassignment) interact with this? > > > > Currently, you need to set a system property passing the login config file > to be able to authenticate the client and perform writes to ZK. > OK. I'll assume that your changes will include modifying the tools and script helpers to make this easy to do :) All of this should be clearly documented in the KIP wiki as well. > -Flavio > > > -Todd > > > > > > On Wed, Oct 21, 2015 at 9:16 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > > >> On Wed, Oct 21, 2015 at 5:04 PM, Flavio Junqueira <f...@apache.org> > wrote: > >> > >>> Bringing the points Grant brought to this thread: > >>> > >>>> Is it worth mentioning the follow up steps that were discussed in the > >> KIP > >>>> call in this KIP document? Some of them were: > >>>> > >>>> - Adding SSL support for Zookeeper > >>>> - Removing the "world readable" assumption > >>>> > >>> > >>> Grant, how would you do it? I see three options: > >>> > >>> 1- Add to the existing KIP, but then the functionality we should be > >>> checking in soon won't include it, so the KIP will remain incomplete > >>> > >> > >> A "Future work" section would make sense to me, but I don't know how > this > >> is normally handled. > >> > >> Ismael > >> > >
Re: [DISCUSS] KIP-38: ZooKeeper Authentication
Thanks for the clarification on that, Jun. Obviously, we haven't been doing much with ZK authentication around here yet. There is still a small concern there, mostly in that you should not share credentials any more than is necessary, which would argue for being able to use a different ACL than the default. I don't really like the idea of having to use the exact same credentials for executing the admin tools as we do for running the brokers. Given that we don't need to share the credentials with all consumers, I think we can work around it. This does bring up another good question, however. What will be the process for having to rotate the credentials? That is, if the credentials are compromised and need to be changed, how can that be accomplished with the cluster online. I'm guessing some combination of using skipAcl on the Zookeeper ensemble and config changes to the brokers will be required, but this is an important enough operation that we should make sure it's reasonable to perform and that it is documented. -Todd On Wed, Oct 21, 2015 at 1:23 PM, Jun Rao <j...@confluent.io> wrote: > Parth, > > For 2), in your approach, the broker/controller will then always have the > overhead of resetting the ACL on startup after zookeeper.set.acl is set to > true. The benefit of using a separate migration tool is that you paid the > cost only once during upgrade. It is an extra step during the upgrade. > However, given the other things that you need to do to upgrade to 0.9.0 > (e.g. two rounds of rolling upgrades on all brokers, etc), I am not sure if > it's worth to optimize away of this step. We probably just need to document > this clearly. > > Todd, > > Just to be clear about the shared ZK usage. Once you set CREATOR_ALL_ACL + > READ_ACL_UNSAFE on a path, only ZK clients with the same user as the > creator can modify the path. Other ZK clients authenticated with a > different user can read, but not modify the path. Are you concerned about > the reads or the writes to ZK? > > Thanks, > > Jun > > > > On Wed, Oct 21, 2015 at 10:46 AM, Flavio Junqueira <f...@apache.org> wrote: > > > > > > On 21 Oct 2015, at 18:07, Parth Brahmbhatt < > pbrahmbh...@hortonworks.com> > > wrote: > > > > > > I have 2 suggestions: > > > > > > 1) We need to document how does one move from secure to non secure > > > environment: > > > 1) change the config on all brokers to zookeeper.set.acl = false > > and do a > > > rolling upgrade. > > > 2) Run the migration script with the jass config file so it is > sasl > > > authenticated with zookeeper and change the acls on all subtrees back > to > > > World modifiable. > > > 3) remove the jaas config / or only the zookeeper section from > the > > jaas, > > > and restart all brokers. > > > > > > > Thanks for bringing it up, it makes sense to have a downgrade path and > > document it. > > > > > > > 2) I am not sure if we should force users trying to move from unsecure > to > > > secure environment to execute the migration script. In the second step > > > once the zookeeper.set.acl is set to true, we can secure all the > subtrees > > > by calling ensureCorrectAcls as part of broker initialization (just > after > > > makesurePersistentPathExists). Not sure why we want to add one more > > > manual/admin step when it can be automated. This also has the added > > > advantage that migration script will not have to take a flag as input > to > > > figure out if it should set the acls to secure or unsecure given it > will > > > always be used to move from secure to unsecure. > > > > > > > The advantage of the third step is to make a single traversal to change > > any remaining znodes with the open ACL. As you suggest, each broker would > > do it, so the overhead is much higher. I do agree that eliminating a step > > is an advantage, though. > > > > > Given we are assuming all the information in zookeeper is world > readable > > , > > > I don¹t see SSL support as a must have or a blocker for this KIP. > > > > OK, but keep in mind that SSL is only available in the 3.5 branch of ZK. > > > > -Flavio > > > > > > > > Thanks > > > Parth > > > > > > > > > > > > On 10/21/15, 9:56 AM, "Flavio Junqueira" <f...@apache.org> wrote: > > > > > >> > > >>> On 21 Oct 2015, at 17:47, Todd Palino <tpal...@gmail.com> wrote: > > >>> > > >>> There seems to be a bit of detail la
Re: [DISCUSS] KIP-38: ZooKeeper Authentication
On Wed, Oct 21, 2015 at 3:38 PM, Flavio Junqueira <f...@apache.org> wrote: > > > On 21 Oct 2015, at 21:54, Todd Palino <tpal...@gmail.com> wrote: > > > > Thanks for the clarification on that, Jun. Obviously, we haven't been > doing > > much with ZK authentication around here yet. There is still a small > concern > > there, mostly in that you should not share credentials any more than is > > necessary, which would argue for being able to use a different ACL than > the > > default. I don't really like the idea of having to use the exact same > > credentials for executing the admin tools as we do for running the > brokers. > > Given that we don't need to share the credentials with all consumers, I > > think we can work around it. > > > > Let me add that a feature to separate the sub-trees of users sharing an > ensemble is chroot. > > On different credentials for admin tools, this sounds doable by setting > the ACLs of znodes. For example, there could be an admin id and a broker > id, both with the ability of changing znodes, but different credentials. > Would something like that work for you? > It would be a nice option to have, as the credentials can be protected differently. I would consider this a nice to have, and not an "absolutely must have" feature at this point. > This does bring up another good question, however. What will be the > process > > for having to rotate the credentials? That is, if the credentials are > > compromised and need to be changed, how can that be accomplished with the > > cluster online. I'm guessing some combination of using skipAcl on the > > Zookeeper ensemble and config changes to the brokers will be required, > but > > this is an important enough operation that we should make sure it's > > reasonable to perform and that it is documented. > > Right now there is no kafka support in the plan for this. But this is > doable directly through the zk api. Would it be sufficient to write down > how to perform such an operation via the zk api or do we need a tool to do > it? > I think as long as there is a documented procedure for how to do it, that will be good enough. It's mostly about making sure that we can, and that we don't put something in place that would require downtime to a cluster in order to change credentials. We can always develop a tool later if it is a requested item. Thanks! -Todd > > -Flavio > > > > > > > On Wed, Oct 21, 2015 at 1:23 PM, Jun Rao <j...@confluent.io> wrote: > > > >> Parth, > >> > >> For 2), in your approach, the broker/controller will then always have > the > >> overhead of resetting the ACL on startup after zookeeper.set.acl is set > to > >> true. The benefit of using a separate migration tool is that you paid > the > >> cost only once during upgrade. It is an extra step during the upgrade. > >> However, given the other things that you need to do to upgrade to 0.9.0 > >> (e.g. two rounds of rolling upgrades on all brokers, etc), I am not > sure if > >> it's worth to optimize away of this step. We probably just need to > document > >> this clearly. > >> > >> Todd, > >> > >> Just to be clear about the shared ZK usage. Once you set > CREATOR_ALL_ACL + > >> READ_ACL_UNSAFE on a path, only ZK clients with the same user as the > >> creator can modify the path. Other ZK clients authenticated with a > >> different user can read, but not modify the path. Are you concerned > about > >> the reads or the writes to ZK? > >> > >> Thanks, > >> > >> Jun > >> > >> > >> > >> On Wed, Oct 21, 2015 at 10:46 AM, Flavio Junqueira <f...@apache.org> > wrote: > >> > >>> > >>>> On 21 Oct 2015, at 18:07, Parth Brahmbhatt < > >> pbrahmbh...@hortonworks.com> > >>> wrote: > >>>> > >>>> I have 2 suggestions: > >>>> > >>>> 1) We need to document how does one move from secure to non secure > >>>> environment: > >>>> 1) change the config on all brokers to zookeeper.set.acl = false > >>> and do a > >>>> rolling upgrade. > >>>> 2) Run the migration script with the jass config file so it is > >> sasl > >>>> authenticated with zookeeper and change the acls on all subtrees back > >> to > >>>> World modifiable. > >>>> 3) remove the jaas config / or only the zookeeper section from > >> the > >>> jaas, &g
Re: [VOTE] KIP-38: ZooKeeper authentication
+1 (non-binding) On Wed, Oct 21, 2015 at 6:53 PM, Jiangjie Qinwrote: > +1 (non-binding) > > On Wed, Oct 21, 2015 at 3:40 PM, Joel Koshy wrote: > > > +1 binding > > > > On Wed, Oct 21, 2015 at 8:17 AM, Flavio Junqueira > wrote: > > > > > Thanks everyone for the feedback so far. At this point, I'd like to > start > > > a vote for KIP-38. > > > > > > Summary: Add support for ZooKeeper authentication > > > KIP page: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-38%3A+ZooKeeper+Authentication > > > < > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-38:+ZooKeeper+Authentication > > > > > > > > > > Thanks, > > > -Flavio > > >
[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)
[ https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14964003#comment-14964003 ] Todd Palino commented on KAFKA-2580: It's about as graceful as an OOM, which is to say "not very". Essentially, it hits the limit and falls over and dies with an exception. We've run into it a bit with both leaking FDs from an implementation issue, and with runaway clients that don't do the right thing. In both situations, you are correct that you will generally end up seeing it as a cascading failure through the cluster. > Kafka Broker keeps file handles open for all log files (even if its not > written to/read from) > - > > Key: KAFKA-2580 > URL: https://issues.apache.org/jira/browse/KAFKA-2580 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.1 >Reporter: Vinoth Chandar >Assignee: Grant Henke > > We noticed this in one of our clusters where we stage logs for a longer > amount of time. It appears that the Kafka broker keeps file handles open even > for non active (not written to or read from) files. (in fact, there are some > threads going back to 2013 > http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) > Needless to say, this is a problem and forces us to either artificially bump > up ulimit (its already at 100K) or expand the cluster (even if we have > sufficient IO and everything). > Filing this ticket, since I could find anything similar. Very interested to > know if there are plans to address this (given how Samza's changelog topic is > meant to be a persistent large state use case). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)
[ https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14963986#comment-14963986 ] Todd Palino commented on KAFKA-2580: I agree with [~jkreps] here, that having a high FD limit is not a bad thing. As [~jjkoshy] noted, we're already running at 400k internally (recently increased from 200k). Part of that is to handle growth, and part of that is to have a good bit of headroom if something starts to leak FDs so we have some time to address it before it kills the process (we alert at 50% utilization). The LRU cache option is probably the best. You can set it to an arbitrarily high number (the best option here might be to cap it near, but below, your per-process limit) if you want to effectively disable it, and it would generally avoid the process of having to check and act on expiring the FDs in the timer option. I can see arguments for setting the default either high or low (and I consider 10k to be low). Regardless, as long as it's configurable and documented it will be fine. > Kafka Broker keeps file handles open for all log files (even if its not > written to/read from) > - > > Key: KAFKA-2580 > URL: https://issues.apache.org/jira/browse/KAFKA-2580 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.1 >Reporter: Vinoth Chandar >Assignee: Grant Henke > > We noticed this in one of our clusters where we stage logs for a longer > amount of time. It appears that the Kafka broker keeps file handles open even > for non active (not written to or read from) files. (in fact, there are some > threads going back to 2013 > http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) > Needless to say, this is a problem and forces us to either artificially bump > up ulimit (its already at 100K) or expand the cluster (even if we have > sufficient IO and everything). > Filing this ticket, since I could find anything similar. Very interested to > know if there are plans to address this (given how Samza's changelog topic is > meant to be a persistent large state use case). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14964228#comment-14964228 ] Todd Palino commented on KAFKA-2017: I think we definitely need to maintain the ability to get that type of information, whether it's within the protocol or via an admin endpoint. Being able to tell what consumers exist in a group, as well as what partitions each of them owns, is important information to have. And it should be available not just from the consumers, but from the coordinator itself. That way you can debug issues where they have gone out of sync, and you can provide tools (such as burrow) to provide consumer status information independently. > Persist Coordinator State for Coordinator Failover > -- > > Key: KAFKA-2017 > URL: https://issues.apache.org/jira/browse/KAFKA-2017 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Onur Karaman >Assignee: Guozhang Wang > Fix For: 0.9.0.0 > > Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, > KAFKA-2017_2015-05-21_19:02:47.patch > > > When a coordinator fails, the group membership protocol tries to failover to > a new coordinator without forcing all the consumers rejoin their groups. This > is possible if the coordinator persists its state so that the state can be > transferred during coordinator failover. This state consists of most of the > information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960809#comment-14960809 ] Todd Palino commented on KAFKA-2017: Just to throw in my 2 cents here, I don't think that persisting this state in a special topic in Kafka is a bad idea. My only concern is that we have seen issues with the offsets already from time to time, and we'll want to make sure we take those lessons learned and handle them from the start. The ones I am aware of are: 1) Creation of the special topic at cluster initialization. If we specify an RF of N for the special topic, then the brokers must make this happen. The first broker that comes up can't create it with an RF of 1 and own all the partitions. Either it must reject all operations that would use the special topic until N brokers are members of the cluster and the it can be created, or it must create the topic in such a way that as soon as there are N brokers available the RF is corrected to the configured number. 2) Load of the special topic into local cache. Whenever a coordinator loads the special topic, there is a period of time while it is loading state where it cannot service requests. We've seen problems with this related to log compaction, where the partitions were excessively large, but I can see as we move an increasing number of (group, partition) tuples over to Kafka-committed offsets it could become a scale issue very easily. This should not be a big deal for group state information, as that should always be smaller than the offset information for the group, but we may want to create a longer term plan for handling auto-scaling of the special topics (the ability to increase the number of partitions and move group information from the partition it used to hash to to the one it hashes to after scaling). > Persist Coordinator State for Coordinator Failover > -- > > Key: KAFKA-2017 > URL: https://issues.apache.org/jira/browse/KAFKA-2017 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Onur Karaman >Assignee: Guozhang Wang > Fix For: 0.9.0.0 > > Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, > KAFKA-2017_2015-05-21_19:02:47.patch > > > When a coordinator fails, the group membership protocol tries to failover to > a new coordinator without forcing all the consumers rejoin their groups. This > is possible if the coordinator persists its state so that the state can be > transferred during coordinator failover. This state consists of most of the > information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-35 - Retrieve protocol version
We should also consider what else should be negotiated between the broker and the client as this comes together. The version is definitely first, but there are other things, such as the max message size, that should not need to be replicated on both the broker and the client. Granted, max message size has per-topic overrides as well, but that should also be considered (possibly as an addition to the topic metadata response). Ideally you never want a requirement that is enforced by the broker to be a surprise to the client, whether that's a supported version or a configuration parameter. The client should not have to know it in advance (except for the most basic of connection parameters), and even if it does have it as a configuration option, it should be able to know before it even starts running that what it has configured is in conflict with the server. -Todd On Tue, Sep 29, 2015 at 11:08 AM, Mayuresh Gharat < gharatmayures...@gmail.com> wrote: > Right. But there should be a max old version that the broker should support > to avoid these incompatibility issues. > For example, if the broker is at version X, it should be able to support > the versions (clients and interbroker) till X-2. In case we have brokers > and clients older than that it can send a response warning them to upgrade > till X-2 minimum. > The backward compatibility limit can be discussed further. This will help > for rolling upgrades. > > Thanks, > > Mayuresh > > On Tue, Sep 29, 2015 at 8:25 AM, Grant Henkewrote: > > > If we create a protocol version negotiation api for clients, can we use > it > > to replace or improve the ease of upgrades that break inter-broker > > messaging? > > > > Currently upgrades that break the wire protocol take 2 rolling restarts. > > The first restart we set inter.broker.protocol.version telling all > brokers > > to communicate on the old version, and then we restart again removing the > > inter.broker.protocol.version. With this api the brokers could agree on a > > version to communicate with, and when bounced re-negotiate to the new > > version. > > > > > > On Mon, Sep 28, 2015 at 10:26 PM, Mayuresh Gharat < > > gharatmayures...@gmail.com> wrote: > > > > > Nice write-up. > > > > > > Just had a question, instead of returning an empty response back to the > > > client, would it be better for the broker to return a response that > gives > > > some more info to the client regarding the min version they need to > > upgrade > > > to in order to communicate with the broker. > > > > > > > > > Thanks, > > > > > > Mayuresh > > > > > > On Mon, Sep 28, 2015 at 6:36 PM, Jiangjie Qin > > > > > > wrote: > > > > > > > Thanks for the writeup. I also think having a specific protocol for > > > > client-broker version negotiation is better. > > > > > > > > I'm wondering is it better to let the broker to decide the version to > > > use? > > > > It might have some value If brokers have preference for a particular > > > > version. > > > > Using a global version is a good approach. For the client-broker > > > > negotiation, I am thinking about something like: > > > > > > > > ProtocolSyncRequest => ClientType [ProtocolVersion] > > > > ClientType => int8 > > > > ProtocolVersion => int16 > > > > > > > > ProtocolSyncResponse => PreferredVersion > > > > PreferredVersion => int16 > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Mon, Sep 28, 2015 at 11:59 AM, Jun Rao wrote: > > > > > > > > > I agree with Ewen that having the keys explicitly specified in the > > > > response > > > > > is better. > > > > > > > > > > In addition to the supported protocol version, there are other > > > > interesting > > > > > metadata at the broker level that could be of interest to things > like > > > > admin > > > > > tools (e.g., used disk space, remaining disk space, etc). I am > > > wondering > > > > if > > > > > we should separate those into different requests. For inquiring the > > > > > protocol version, we can have a separate BrokerProtocolRequest. The > > > > > response will just include the broker version and perhaps a list of > > > > > supported requests and versions? > > > > > > > > > > As for sending an empty response for unrecognized requests, do you > > how > > > is > > > > > that handled in other similar systems? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Mon, Sep 28, 2015 at 10:47 AM, Jason Gustafson < > > ja...@confluent.io> > > > > > wrote: > > > > > > > > > > > Having the version API can make clients more robust, so I'm in > > favor. > > > > One > > > > > > note on the addition of the "rack" field. Since this is a > > > > broker-specific > > > > > > setting, the client would have to query BrokerMetadata for every > > new > > > > > broker > > > > > > it connects to (unless we also expose rack in TopicMetadata). > This > > is > > > > > also > > > > > > kind of unfortunate for admin
Re: [DISCUSS] KIP-36 - Rack aware replica assignment
I tend to like the idea of a pluggable locator. For example, we already have an interface for discovering information about the physical location of servers. I don't relish the idea of having to maintain data in multiple places. -Todd On Mon, Sep 28, 2015 at 4:48 PM, Aditya Auradkar < aaurad...@linkedin.com.invalid> wrote: > Thanks for starting this KIP Allen. > > I agree with Gwen that having a RackLocator class that is pluggable seems > to be too complex. The KIP refers to potentially non-ZK storage for the > rack info which I don't think is necessary. > > Perhaps we can persist this info in zk under /brokers/ids/ > similar to other broker properties and add a config in KafkaConfig called > "rack". > {"jmx_port":-1,"endpoints":[...],"host":"xxx","port":yyy, "rack": "abc"} > > Aditya > > On Mon, Sep 28, 2015 at 2:30 PM, Gwen Shapirawrote: > > > Hi, > > > > First, thanks for putting out a KIP for this. This is super important for > > production deployments of Kafka. > > > > Few questions: > > > > 1) Are we sure we want "as many racks as possible"? I'd want to balance > > between safety (more racks) and network utilization (traffic within a > rack > > uses the high-bandwidth TOR switch). One replica on a different rack and > > the rest on same rack (if possible) sounds better to me. > > > > 2) Rack-locator class seems overly complex compared to adding a > rack.number > > property to the broker properties file. Why do we want that? > > > > Gwen > > > > > > > > On Mon, Sep 28, 2015 at 12:15 PM, Allen Wang > wrote: > > > > > Hello Kafka Developers, > > > > > > I just created KIP-36 for rack aware replica assignment. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment > > > > > > The goal is to utilize the isolation provided by the racks in data > center > > > and distribute replicas to racks to provide fault tolerance. > > > > > > Comments are welcome. > > > > > > Thanks, > > > Allen > > > > > >
Re: [VOTE] KIP-31 - Move to relative offsets in compressed message sets.
+1000 ! -Todd On Wednesday, September 23, 2015, Jiangjie Qinwrote: > Hi, > > Thanks a lot for the reviews and feedback on KIP-31. It looks all the > concerns of the KIP has been addressed. I would like to start the voting > process. > > The short summary for the KIP: > We are going to use the relative offset in the message format to avoid > server side recompression. > > In case you haven't got a chance to check, here is the KIP link. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets > > Thanks, > > Jiangjie (Becket) Qin >
Re: [DISCUSS] KIP-31 - Message format change proposal
So, with regards to why you want to search by timestamp, the biggest problem I've seen is with consumers who want to reset their timestamps to a specific point, whether it is to replay a certain amount of messages, or to rewind to before some problem state existed. This happens more often than anyone would like. To handle this now we need to constantly export the broker's offset for every partition to a time-series database and then use external processes to query this. I know we're not the only ones doing this. The way the broker handles requests for offsets by timestamp is a little obtuse (explain it to anyone without intimate knowledge of the internal workings of the broker - every time I do I see this). In addition, as Becket pointed out, it causes problems specifically with retention of messages by time when you move partitions around. I'm deliberately avoiding the discussion of what timestamp to use. I can see the argument either way, though I tend to lean towards the idea that the broker timestamp is the only viable source of truth in this situation. -Todd On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postavawrote: > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps wrote: > > > > > 2. Nobody cares what time it is on the server. > > > > This is a good way of summarizing the issue I was trying to get at, from an > app's perspective. Of the 3 stated goals of the KIP, #2 (lot retention) is > reasonably handled by a server-side timestamp. I really just care that a > message is there long enough that I have a chance to process it. #3 > (searching by timestamp) only seems useful if we can guarantee the > server-side timestamp is close enough to the original client-side > timestamp, and any mirror maker step seems to break that (even ignoring any > issues with broker availability). > > I'm also wondering whether optimizing for search-by-timestamp on the broker > is really something we want to do given that messages aren't really > guaranteed to be ordered by application-level timestamps on the broker. Is > part of the need for this just due to the current consumer APIs being > difficult to work with? For example, could you implement this pretty easily > client side just the way you would broker-side? I'd imagine a couple of > random seeks + reads during very rare occasions (i.e. when the app starts > up) wouldn't be a problem performance-wise. Or is it also that you need the > broker to enforce things like monotonically increasing timestamps since you > can't do the query properly and efficiently without that guarantee, and > therefore what applications are actually looking for *is* broker-side > timestamps? > > -Ewen > > > > > Consider cases where data is being copied from a database or from log > > files. In steady-state the server time is very close to the client time > if > > their clocks are sync'd (see 1) but there will be times of large > divergence > > when the copying process is stopped or falls behind. When this occurs it > is > > clear that the time the data arrived on the server is irrelevant, it is > the > > source timestamp that matters. This is the problem you are trying to fix > by > > retaining the mm timestamp but really the client should always set the > time > > with the use of server-side time as a fallback. It would be worth talking > > to the Samza folks and reading through this blog post ( > > > http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html > > ) > > on this subject since we went through similar learnings on the stream > > processing side. > > > > I think the implication of these two is that we need a proposal that > > handles potentially very out-of-order timestamps in some kind of sanish > way > > (buggy clients will set something totally wrong as the time). > > > > -Jay > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps wrote: > > > > > The magic byte is used to version message format so we'll need to make > > > sure that check is in place--I actually don't see it in the current > > > consumer code which I think is a bug we should fix for the next release > > > (filed KAFKA-2523). The purpose of that field is so there is a clear > > check > > > on the format rather than the scrambled scenarios Becket describes. > > > > > > Also, Becket, I don't think just fixing the java client is sufficient > as > > > that would break other clients--i.e. if anyone writes a v1 messages, > even > > > by accident, any non-v1-capable consumer will break. I think we > probably > > > need a way to have the server ensure a particular message format either > > at > > > read or write time. > > > > > > -Jay > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin > > > > wrote: > > > > > >> Hi Guozhang, > > >> > > >> I checked the code again. Actually CRC check probably won't fail. The > > >> newly > > >> added timestamp field might be treated as keyLength instead, so we are > > >> likely
[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)
[ https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708282#comment-14708282 ] Todd Palino commented on KAFKA-1566: This makes a lot of sense. We don't use any of the standard start/stop components or scripts, just the admin CLI tools, so I don't see this having an effect on us. Kafka environment configuration (kafka-env.sh) -- Key: KAFKA-1566 URL: https://issues.apache.org/jira/browse/KAFKA-1566 Project: Kafka Issue Type: Improvement Components: tools Reporter: Cosmin Lehene Assignee: Sriharsha Chintalapani Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch It would be useful (especially for automated deployments) to have an environment configuration file that could be sourced from the launcher files (e.g. kafka-run-server.sh). This is how this could look like kafka-env.sh {code} export KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseCompressedOops -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35' % export KAFKA_HEAP_OPTS='-Xmx1G -Xms1G' % export KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=/var/log/kafka {code} kafka-server-start.sh {code} ... source $base_dir/config/kafka-env.sh ... {code} This approach is consistent with Hadoop and HBase. However the idea here is to be able to set these values in a single place without having to edit startup scripts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: kafka - tcp ports
It does not. That last connection, the one in CLOSE_WAIT, is an outbound connection from the broker you are looking at to one of the other brokers. 57821 is the source TCP port, and it is selected (somewhat) randomly from a range of high port numbers. Note that the other end of the connection is 9092, which is the destination port on the remote host. This is likely one of the replica fetchers. -Todd On Sun, Aug 2, 2015 at 6:58 AM, madhavan kumar madhavan020...@gmail.com wrote: hi all, i am monitoring the ports that are used by kafka server on my machine. When i do, sudo netstat -ntp | grep 9092 it shows, tcp0 0 192.168.122.1:9092 192.168.122.6:59158 ESTABLISHED 27413/java tcp0 0 192.168.122.1:9092 192.168.122.1:58338 ESTABLISHED 27413/java *tcp1 0 192.168.122.1:57821 http://192.168.122.1:57821 192.168.122.1:9092 http://192.168.122.1:9092 CLOSE_WAIT 27413/java * The last entry in CLOSE_WAIT state surprises me because sudo lsof -i tcp:57821 shows COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java27413 root 42u IPv4 3266648 0t0 TCP sk-box:57821-sk-box:9092 (CLOSE_WAIT) indicates, that the port 57821 is owned by kafka process (pid - 27413) . Does kafka use extra ports other than listening port, 9092 to talk to itself? thanks, saravana
Re: Gauging Interest in adding Encryption to Kafka
1 - Yes, authorization combined with encryption does get us most of the way there. However, depending on the auditor it might not be good enough. The problem is that if you are encrypting at the broker, then by definition anyone who has access to the broker (i.e. operations staff) have access to the data. Consider the case where you are passing salary and other information through the system, and those people do not need a view of it. I admit, the 90% solution might be better here than going for a perfect solution, but it is something to think about. 2 - My worry is people wanting to integrate with different key systems. For example, one person may be fine with providing it in a config file, while someone else may want to use the solution from vendor A, someone else wants vendor B, and yet another person wants this obscure hardware-based solution that exists elsewhere. The compaction concern is definitely a good one I hadn't thought of. I'm wondering if it's reasonable to just say that compaction will not work properly with encrypted keys if you do not have consistent encryption (that is, the same string encrypts to the same string every time). Ultimately I don't like the idea of the broker doing any encrypt/decrypt steps OR compression/decompression. This is all CPU overhead that you're concentrating in one place instead of distributing the load out to the clients. Now yes, I know that the broker decompresses to check the CRC and assign offsets and then compresses, and we can potentially avoid the compression step with assigning the batch an offset and a count instead but we still need to consider the CRC. Adding encrypt/decrypt steps adds even more overhead and it's going to get very difficult to handle even 2 Gbits worth of traffic at that rate. There are other situations that concern me, such as revocation of keys, and I'm not sure whether it is better with client-based or server-based encryption. For example, if I want to revoke a key with client-based encryption it becomes similar to how we handle Avro schemas (internally) now - you change keys, and depending on what your desire is you either expire out the data for some period of time with the older keys, or you just let it sit there and your consuming clients won't have an issue. With broker-based encryption, the broker has to work with the multiple keys per-topic. -Todd On Fri, Jul 31, 2015 at 2:38 PM, Gwen Shapira gshap...@cloudera.com wrote: Good points :) 1) Kafka already (pending commit) has an authorization layer, so theoretically we are good for SOX, HIPAA, PCI, etc. Transparent broker encryption will support PCI never-let-unencrypted-card-number-hit-disk. 2) Agree on Key Management being complete PITA. It may better to centralize this pain in the broker rather than distributing it to clients. Any reason you think its better to let the clients handle it? The way I see it, we'll need to handle key management the way we did authorization - give an API for interfacing with existing systems. More important, we need the broker to be able to decrypt and encrypt in order to support compaction (unless we can find a cool key-uniqueness-preserving encryption algorithm, but this may not be as secure). I think we also need the broker to be able to re-compress data, and since we always encrypt compressed bits (compressing encrypted bits doesn't compress), we need the broker to decrypt before re-compressing. On Fri, Jul 31, 2015 at 2:27 PM, Todd Palino tpal...@gmail.com wrote: It does limit it to clients that have an implementation for encryption, however encryption on the client side is better from an auditing point of view (whether that is SOX, HIPAA, PCI, or something else). Most of those types of standards are based around allowing visibility of data to just the people who need it. That includes the admins of the system (who are often not the people who use the data). Additionally, key management is a royal pain, and there are lots of different types of systems that one may want to use. This is a pretty big complication for the brokers. -Todd On Fri, Jul 31, 2015 at 2:21 PM, Gwen Shapira gshap...@cloudera.com wrote: I've seen interest in HDFS-like encryption zones in Kafka. This has the advantage of magically encrypting data at rest regardless of which client is used as a producer. Adding it on the client side limits the feature to the java client. Gwen On Fri, Jul 31, 2015 at 1:20 PM, eugene miretsky eugene.miret...@gmail.com wrote: I think that Hadoop and Cassandra do [1] (Transparent Encryption) We're doing [2] (on a side note, for [2] you still need authentication on the producer side - you don't want an unauthorized user writing garbage). Right now we have the 'user' doing the encryption and submitting raw bytes to the producer. I was suggesting implementing an encryptor in the producer itself - I think it's cleaner and can be reused
Re: Gauging Interest in adding Encryption to Kafka
It does limit it to clients that have an implementation for encryption, however encryption on the client side is better from an auditing point of view (whether that is SOX, HIPAA, PCI, or something else). Most of those types of standards are based around allowing visibility of data to just the people who need it. That includes the admins of the system (who are often not the people who use the data). Additionally, key management is a royal pain, and there are lots of different types of systems that one may want to use. This is a pretty big complication for the brokers. -Todd On Fri, Jul 31, 2015 at 2:21 PM, Gwen Shapira gshap...@cloudera.com wrote: I've seen interest in HDFS-like encryption zones in Kafka. This has the advantage of magically encrypting data at rest regardless of which client is used as a producer. Adding it on the client side limits the feature to the java client. Gwen On Fri, Jul 31, 2015 at 1:20 PM, eugene miretsky eugene.miret...@gmail.com wrote: I think that Hadoop and Cassandra do [1] (Transparent Encryption) We're doing [2] (on a side note, for [2] you still need authentication on the producer side - you don't want an unauthorized user writing garbage). Right now we have the 'user' doing the encryption and submitting raw bytes to the producer. I was suggesting implementing an encryptor in the producer itself - I think it's cleaner and can be reused by other users (instead of having to do their own encryption) Cheers, Eugene On Fri, Jul 31, 2015 at 4:04 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I think the goal here is to make the actual message stored on broker to be encrypted, because after we have SSL, the transmission would be encrypted. In general there might be tow approaches: 1. Broker do the encryption/decryption 2. Client do the encryption/decryption From performance point of view, I would prefer [2]. It is just in that case, maybe user does not necessarily need to use SSL anymore because the data would be encrypted anyway. If we let client do the encryption, there are also two ways to do so - either we let producer take an encryptor or users can do serialization/encryption outside the producer and send raw bytes. The only difference between the two might be flexibility. For example, if someone wants to know the actual bytes of a message that got sent over the wire, doing it outside the producer would probably more preferable. Jiangjie (Becket) Qin On Thu, Jul 30, 2015 at 12:16 PM, eugene miretsky eugene.miret...@gmail.com wrote: Hi, Based on the security wiki page https://cwiki.apache.org/confluence/display/KAFKA/Security encryption of data at rest is out of scope for the time being. However, we are implementing encryption in Kafka and would like to see if there is interest in submitting a patch got it. I suppose that one way to implement encryption would be to add an 'encrypted key' field to the Message/MessageSet structures in the wire protocole - however, this is a very big and fundamental change. A simpler way to add encryption support would be: 1) Custom Serializer, but it wouldn't be compatible with other custom serializers (Avro, etc. ) 2) Add a step in KafkaProducer after serialization to encrypt the data before it's being submitted to the accumulator (encryption is done in the submitting thread, not in the producer io thread) Is there interest in adding #2 to Kafka? Cheers, Eugene
Re: Review Request 36664: Patch for KAFKA-2353
Since I've been dealing with the fallout of this particular problem all week, I'll add a few thoughts... On Wed, Jul 22, 2015 at 10:51 AM, Gwen Shapira gshap...@cloudera.com wrote: On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 465 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465 Turns out that catching Throwable is a really bad idea: https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ Jiangjie Qin wrote: Ah... Didn't know that before. I explicitly listed the exceptions. Guozhang Wang wrote: Searching : Throwable gives me 180+ cases in code base :P Though many of them are from unit tests (which, arguably maybe OK) there are still a lot in the core package. I agree that we should avoid catching Throwable whenever possible, which will also help enforcing the developers to think about possible checked exceptions in the calling trace. I know :( I'm not sure if going over and converting everything is worth the effort. Although it can be a nice newbie jira. On July 21, 2015, 11:15 p.m., Gwen Shapira wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 400 https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400 So in case of unexpected exception, we log an error and keep running? Isn't it better to kill the processor, since we don't know what's the state of the system? If the acceptor keeps placing messages in the queue for a dead processor, isn't it a separate issue? Jiangjie Qin wrote: This part I'm not quite sure. I am not very experienced in the error handling in such case, so please correct me if I missed something. Here is what I thought. The way it currently works is that the acceptor will 1. accept new connection request and create new socket channel 2. choose a processor and put the socket channel into the processor's new connection queue The processor will just take the socket channels from the queue and register it to the selector. If the processor runs and get an uncaught exception, there are several possibilities. Case 1: The exception was from one socket channel. Case 2: The exception was associated with a bad request. In case 1, ideally we should just disconnect that socket channel without affecting other socket channels. In case 2, I think we should log the error and skip the message - assuming client will retry sending data if no response was received for a given peoriod of time. I am not sure if letting processor exit is a good idea because this will lead to the result of a badly behaving client screw the entire cluster - it might screw processors one by one. Comparing with that, I kind of leaning towards keeping the processor running and serving other normal TCP connections if possible, but log the error so monitoring system can detect and see if human intervention is needed. Also, I don't know what to do here to prevent the thread from exiting without catching all the throwables. According to this blog http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/ I guess I can rethrow all the ControlThrowables, but intercept the rests? Guozhang Wang wrote: I would also prefer not to close the Processor thread upon exceptions, mainly for avoid one bad client killing a shared Kafka cluster. In the past we have seen such issues like DDoS MetadataRequest killing the cluster and all other clients gets affected, etc, and the quota work is towards preventing it. Since Processor threads are shared (8 by default on a broker), it should not be closed by a single socket / bad client request. I like your thinking around cases #1 and #2. I think this should go as a code comment somewhere, so when people improve / extend SocketServer they will keep this logic in mind. Maybe even specify in specific catch clauses if they are handling possible errors in request level or channel level. My concern is with possible case #3: Each processor has an o.a.k.common.network.Selector. I'm concerned about the possibility of something going wrong in the state of the selector, which will possibly be an issue for all channels. For example failure to register could be an issue with the channel.register call, but also perhaps an issue with keys.put (just an example - I'm not sure something can actually break keys table). I'd like to be able to identify cases where the Selector state may have gone wrong and close the processor in that case. Does that make any sense? Or am I being too paranoid? If there are error cases that are not associated with a specific connection or request, then I agree that we should handle that a little differently. What we need to keep in mind is that close the processor is actually terminate the
Re: [Discussion] Limitations on topic names
find dots more common in my customer base, so I will definitely feel the pain of removing them. However, . are already used in metrics, file names, directories, etc - so if we keep the dots, we need to keep code that translates them and document the translation. Just banning . seems more natural. Also, as Grant mentioned, we'll probably have our own special usage for . down the line. On Fri, Jul 10, 2015 at 2:12 PM, Todd Palino tpal...@gmail.com wrote: I absolutely disagree with #2, Neha. That will break a lot of infrastructure within LinkedIn. That said, removing . might break other people as well, but I think we should have a clearer idea of how much usage there is on either side. -Todd On Fri, Jul 10, 2015 at 2:08 PM, Neha Narkhede n...@confluent.io wrote: . seems natural for grouping topic names. +1 for 2) going forward only without breaking previously created topics with _ though that might require us to patch the code somewhat awkwardly till we phase it out a couple (purposely left vague to stay out of Ewen's wrath :-)) versions later. On Fri, Jul 10, 2015 at 2:02 PM, Gwen Shapira gshap...@cloudera.com wrote: I don't think we should break existing topics. Just disallow new topics going forward. Agree that having both is horrible, but we should have a solution that fails when you run kafka_topics.sh --create, not when you configure Ganglia. Gwen On Fri, Jul 10, 2015 at 1:53 PM, Jay Kreps j...@confluent.io wrote: Unfortunately '.' is pretty common too. I agree that it is perverse, but people seem to do it. Breaking all the topics with '.' in the name seems like it could be worse than combining metrics for people who have a 'foo_bar' AND 'foo.bar' (and after all, having both is DEEPLY perverse, no?). Where is our Dean of Compatibility, Ewen, on this? -Jay On Fri, Jul 10, 2015 at 1:32 PM, Todd Palino tpal...@gmail.com wrote: My selfish point of view is that we do #1, as we use _ extensively in topic names here :) I also happen to think it's the right choice, specifically because . has more special meanings, as you noted. -Todd On Fri, Jul 10, 2015 at 1:30 PM, Gwen Shapira gshap...@cloudera.com wrote: Unintentional side effect from allowing IP addresses in consumer client IDs :) So the question is, what do we do now? 1) disallow . 2) disallow _ 3) find a reversible way to encode . and _ that won't break existing metrics 4) all of the above? btw. it looks like . and .. are currently valid. Topic names are used for directories, right? this sounds like fun :) I vote for option #1, although if someone has a good idea for #3 it will be even better. Gwen On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com wrote: Found it was added here: https://issues.apache.org/jira/browse/KAFKA-697 On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com wrote: This was definitely changed at some point after KAFKA-495. The question is when and why. Here's the relevant code from that patch: === --- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178) +++ core/src/main/scala/kafka/utils/Topic.scala (working copy) @@ -21,24 +21,21 @@ import util.matching.Regex object Topic { + val legalChars = [a-zA-Z0-9_-] -Todd On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com wrote: kafka.common.Topic shows that currently period is a valid character and I have verified I can use kafka-topics.sh to create a new topic with a period. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently uses Topic.validate before writing to Zookeeper. Should period character support be removed? I was under the same impression as Gwen, that a period was used by many as a way to group topics. The code is pasted below since its small: object Topic { val legalChars = [a-zA-Z0-9\\._\\-] private val maxNameLength = 255 private val rgx = new Regex(legalChars + +) val InternalTopics = Set(OffsetManager.OffsetsTopicName) def validate(topic: String) { if (topic.length = 0) throw new InvalidTopicException(topic name is illegal, can't be empty) else if (topic.equals(.) || topic.equals(..)) throw new InvalidTopicException(topic name cannot be \.\ or \..\) else if (topic.length maxNameLength) throw new InvalidTopicException(topic name is illegal, can't be longer than + maxNameLength + characters) rgx.findFirstIn(topic) match { case Some(t) = if (!t.equals(topic)) throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') case None = throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics
Re: [Discussion] Limitations on topic names
I had to go look this one up again to make sure - https://issues.apache.org/jira/browse/KAFKA-495 The only valid character names for topics are alphanumeric, underscore, and dash. A period is not supposed to be a valid character to use. If you're seeing them, then one of two things have happened: 1) You have topic names that are grandfathered in from before that patch 2) The patch is not working properly and there is somewhere in the broker that the standard is not being enforced. -Todd On Fri, Jul 10, 2015 at 12:13 PM, Brock Noland br...@apache.org wrote: On Fri, Jul 10, 2015 at 11:34 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Fans, If you have one topic named kafka_lab_2 and the other named kafka.lab.2, the topic level metrics will be named kafka_lab_2 for both, effectively making it impossible to monitor them properly. The reason this happens is that using . in topic names is pretty common, especially as a way to group topics into data centers, relevant apps, etc - basically a work-around to our current lack of name spaces. However, most metric monitoring systems using . to annotate hierarchy, so to avoid issues around metric names, Kafka replaces the . in the name with an underscore. This generates good metric names, but creates the problem with name collisions. I'm wondering if it makes sense to simply limit the range of characters permitted in a topic name and disallow _? Obviously existing topics will need to remain as is, which is a bit awkward. Interesting problem! Many if not most users I personally am aware of use _ as a separator in topic names. I am sure that many users would be quite surprised by this limitation. With that said, I am sure they'd transition accordingly. If anyone has better backward-compatible solutions to this, I'm all ears :) Gwen
Re: [Discussion] Limitations on topic names
This was definitely changed at some point after KAFKA-495. The question is when and why. Here's the relevant code from that patch: === --- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178) +++ core/src/main/scala/kafka/utils/Topic.scala (working copy) @@ -21,24 +21,21 @@ import util.matching.Regex object Topic { + val legalChars = [a-zA-Z0-9_-] -Todd On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com wrote: kafka.common.Topic shows that currently period is a valid character and I have verified I can use kafka-topics.sh to create a new topic with a period. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently uses Topic.validate before writing to Zookeeper. Should period character support be removed? I was under the same impression as Gwen, that a period was used by many as a way to group topics. The code is pasted below since its small: object Topic { val legalChars = [a-zA-Z0-9\\._\\-] private val maxNameLength = 255 private val rgx = new Regex(legalChars + +) val InternalTopics = Set(OffsetManager.OffsetsTopicName) def validate(topic: String) { if (topic.length = 0) throw new InvalidTopicException(topic name is illegal, can't be empty) else if (topic.equals(.) || topic.equals(..)) throw new InvalidTopicException(topic name cannot be \.\ or \..\) else if (topic.length maxNameLength) throw new InvalidTopicException(topic name is illegal, can't be longer than + maxNameLength + characters) rgx.findFirstIn(topic) match { case Some(t) = if (!t.equals(topic)) throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') case None = throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') } } } On Fri, Jul 10, 2015 at 2:50 PM, Todd Palino tpal...@gmail.com wrote: I had to go look this one up again to make sure - https://issues.apache.org/jira/browse/KAFKA-495 The only valid character names for topics are alphanumeric, underscore, and dash. A period is not supposed to be a valid character to use. If you're seeing them, then one of two things have happened: 1) You have topic names that are grandfathered in from before that patch 2) The patch is not working properly and there is somewhere in the broker that the standard is not being enforced. -Todd On Fri, Jul 10, 2015 at 12:13 PM, Brock Noland br...@apache.org wrote: On Fri, Jul 10, 2015 at 11:34 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Fans, If you have one topic named kafka_lab_2 and the other named kafka.lab.2, the topic level metrics will be named kafka_lab_2 for both, effectively making it impossible to monitor them properly. The reason this happens is that using . in topic names is pretty common, especially as a way to group topics into data centers, relevant apps, etc - basically a work-around to our current lack of name spaces. However, most metric monitoring systems using . to annotate hierarchy, so to avoid issues around metric names, Kafka replaces the . in the name with an underscore. This generates good metric names, but creates the problem with name collisions. I'm wondering if it makes sense to simply limit the range of characters permitted in a topic name and disallow _? Obviously existing topics will need to remain as is, which is a bit awkward. Interesting problem! Many if not most users I personally am aware of use _ as a separator in topic names. I am sure that many users would be quite surprised by this limitation. With that said, I am sure they'd transition accordingly. If anyone has better backward-compatible solutions to this, I'm all ears :) Gwen -- Grant Henke Solutions Consultant | Cloudera ghe...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: [Discussion] Limitations on topic names
Thanks, Grant. That seems like a bad solution to the problem that John ran into in that ticket. It's entirely reasonable to have separate validators for separate things, but it seems like the choice was made to try and mash it all into a single validator. And it appears that despite the commentary in the ticket at the time, Gwen's identified a very good reason to be restrictive about topic naming. -Todd On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com wrote: Found it was added here: https://issues.apache.org/jira/browse/KAFKA-697 On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com wrote: This was definitely changed at some point after KAFKA-495. The question is when and why. Here's the relevant code from that patch: === --- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178) +++ core/src/main/scala/kafka/utils/Topic.scala (working copy) @@ -21,24 +21,21 @@ import util.matching.Regex object Topic { + val legalChars = [a-zA-Z0-9_-] -Todd On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com wrote: kafka.common.Topic shows that currently period is a valid character and I have verified I can use kafka-topics.sh to create a new topic with a period. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently uses Topic.validate before writing to Zookeeper. Should period character support be removed? I was under the same impression as Gwen, that a period was used by many as a way to group topics. The code is pasted below since its small: object Topic { val legalChars = [a-zA-Z0-9\\._\\-] private val maxNameLength = 255 private val rgx = new Regex(legalChars + +) val InternalTopics = Set(OffsetManager.OffsetsTopicName) def validate(topic: String) { if (topic.length = 0) throw new InvalidTopicException(topic name is illegal, can't be empty) else if (topic.equals(.) || topic.equals(..)) throw new InvalidTopicException(topic name cannot be \.\ or \..\) else if (topic.length maxNameLength) throw new InvalidTopicException(topic name is illegal, can't be longer than + maxNameLength + characters) rgx.findFirstIn(topic) match { case Some(t) = if (!t.equals(topic)) throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') case None = throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') } } } On Fri, Jul 10, 2015 at 2:50 PM, Todd Palino tpal...@gmail.com wrote: I had to go look this one up again to make sure - https://issues.apache.org/jira/browse/KAFKA-495 The only valid character names for topics are alphanumeric, underscore, and dash. A period is not supposed to be a valid character to use. If you're seeing them, then one of two things have happened: 1) You have topic names that are grandfathered in from before that patch 2) The patch is not working properly and there is somewhere in the broker that the standard is not being enforced. -Todd On Fri, Jul 10, 2015 at 12:13 PM, Brock Noland br...@apache.org wrote: On Fri, Jul 10, 2015 at 11:34 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Fans, If you have one topic named kafka_lab_2 and the other named kafka.lab.2, the topic level metrics will be named kafka_lab_2 for both, effectively making it impossible to monitor them properly. The reason this happens is that using . in topic names is pretty common, especially as a way to group topics into data centers, relevant apps, etc - basically a work-around to our current lack of name spaces. However, most metric monitoring systems using . to annotate hierarchy, so to avoid issues around metric names, Kafka replaces the . in the name with an underscore. This generates good metric names, but creates the problem with name collisions. I'm wondering if it makes sense to simply limit the range of characters permitted in a topic name and disallow _? Obviously existing topics will need to remain as is, which is a bit awkward. Interesting problem! Many if not most users I personally am aware of use _ as a separator in topic names. I am sure that many users would be quite surprised by this limitation. With that said, I am sure they'd transition accordingly. If anyone has better backward-compatible solutions to this, I'm all ears :) Gwen -- Grant Henke Solutions
Re: [Discussion] Limitations on topic names
I absolutely disagree with #2, Neha. That will break a lot of infrastructure within LinkedIn. That said, removing . might break other people as well, but I think we should have a clearer idea of how much usage there is on either side. -Todd On Fri, Jul 10, 2015 at 2:08 PM, Neha Narkhede n...@confluent.io wrote: . seems natural for grouping topic names. +1 for 2) going forward only without breaking previously created topics with _ though that might require us to patch the code somewhat awkwardly till we phase it out a couple (purposely left vague to stay out of Ewen's wrath :-)) versions later. On Fri, Jul 10, 2015 at 2:02 PM, Gwen Shapira gshap...@cloudera.com wrote: I don't think we should break existing topics. Just disallow new topics going forward. Agree that having both is horrible, but we should have a solution that fails when you run kafka_topics.sh --create, not when you configure Ganglia. Gwen On Fri, Jul 10, 2015 at 1:53 PM, Jay Kreps j...@confluent.io wrote: Unfortunately '.' is pretty common too. I agree that it is perverse, but people seem to do it. Breaking all the topics with '.' in the name seems like it could be worse than combining metrics for people who have a 'foo_bar' AND 'foo.bar' (and after all, having both is DEEPLY perverse, no?). Where is our Dean of Compatibility, Ewen, on this? -Jay On Fri, Jul 10, 2015 at 1:32 PM, Todd Palino tpal...@gmail.com wrote: My selfish point of view is that we do #1, as we use _ extensively in topic names here :) I also happen to think it's the right choice, specifically because . has more special meanings, as you noted. -Todd On Fri, Jul 10, 2015 at 1:30 PM, Gwen Shapira gshap...@cloudera.com wrote: Unintentional side effect from allowing IP addresses in consumer client IDs :) So the question is, what do we do now? 1) disallow . 2) disallow _ 3) find a reversible way to encode . and _ that won't break existing metrics 4) all of the above? btw. it looks like . and .. are currently valid. Topic names are used for directories, right? this sounds like fun :) I vote for option #1, although if someone has a good idea for #3 it will be even better. Gwen On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com wrote: Found it was added here: https://issues.apache.org/jira/browse/KAFKA-697 On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com wrote: This was definitely changed at some point after KAFKA-495. The question is when and why. Here's the relevant code from that patch: === --- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178) +++ core/src/main/scala/kafka/utils/Topic.scala (working copy) @@ -21,24 +21,21 @@ import util.matching.Regex object Topic { + val legalChars = [a-zA-Z0-9_-] -Todd On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com wrote: kafka.common.Topic shows that currently period is a valid character and I have verified I can use kafka-topics.sh to create a new topic with a period. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently uses Topic.validate before writing to Zookeeper. Should period character support be removed? I was under the same impression as Gwen, that a period was used by many as a way to group topics. The code is pasted below since its small: object Topic { val legalChars = [a-zA-Z0-9\\._\\-] private val maxNameLength = 255 private val rgx = new Regex(legalChars + +) val InternalTopics = Set(OffsetManager.OffsetsTopicName) def validate(topic: String) { if (topic.length = 0) throw new InvalidTopicException(topic name is illegal, can't be empty) else if (topic.equals(.) || topic.equals(..)) throw new InvalidTopicException(topic name cannot be \.\ or \..\) else if (topic.length maxNameLength) throw new InvalidTopicException(topic name is illegal, can't be longer than + maxNameLength + characters) rgx.findFirstIn(topic) match { case Some(t) = if (!t.equals(topic)) throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') case None = throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics
Re: [Discussion] Limitations on topic names
My selfish point of view is that we do #1, as we use _ extensively in topic names here :) I also happen to think it's the right choice, specifically because . has more special meanings, as you noted. -Todd On Fri, Jul 10, 2015 at 1:30 PM, Gwen Shapira gshap...@cloudera.com wrote: Unintentional side effect from allowing IP addresses in consumer client IDs :) So the question is, what do we do now? 1) disallow . 2) disallow _ 3) find a reversible way to encode . and _ that won't break existing metrics 4) all of the above? btw. it looks like . and .. are currently valid. Topic names are used for directories, right? this sounds like fun :) I vote for option #1, although if someone has a good idea for #3 it will be even better. Gwen On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com wrote: Found it was added here: https://issues.apache.org/jira/browse/KAFKA-697 On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com wrote: This was definitely changed at some point after KAFKA-495. The question is when and why. Here's the relevant code from that patch: === --- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178) +++ core/src/main/scala/kafka/utils/Topic.scala (working copy) @@ -21,24 +21,21 @@ import util.matching.Regex object Topic { + val legalChars = [a-zA-Z0-9_-] -Todd On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com wrote: kafka.common.Topic shows that currently period is a valid character and I have verified I can use kafka-topics.sh to create a new topic with a period. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently uses Topic.validate before writing to Zookeeper. Should period character support be removed? I was under the same impression as Gwen, that a period was used by many as a way to group topics. The code is pasted below since its small: object Topic { val legalChars = [a-zA-Z0-9\\._\\-] private val maxNameLength = 255 private val rgx = new Regex(legalChars + +) val InternalTopics = Set(OffsetManager.OffsetsTopicName) def validate(topic: String) { if (topic.length = 0) throw new InvalidTopicException(topic name is illegal, can't be empty) else if (topic.equals(.) || topic.equals(..)) throw new InvalidTopicException(topic name cannot be \.\ or \..\) else if (topic.length maxNameLength) throw new InvalidTopicException(topic name is illegal, can't be longer than + maxNameLength + characters) rgx.findFirstIn(topic) match { case Some(t) = if (!t.equals(topic)) throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') case None = throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') } } } On Fri, Jul 10, 2015 at 2:50 PM, Todd Palino tpal...@gmail.com wrote: I had to go look this one up again to make sure - https://issues.apache.org/jira/browse/KAFKA-495 The only valid character names for topics are alphanumeric, underscore, and dash. A period is not supposed to be a valid character to use. If you're seeing them, then one of two things have happened: 1) You have topic names that are grandfathered in from before that patch 2) The patch is not working properly and there is somewhere in the broker that the standard is not being enforced. -Todd On Fri, Jul 10, 2015 at 12:13 PM, Brock Noland br...@apache.org wrote: On Fri, Jul 10, 2015 at 11:34 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Fans, If you have one topic named kafka_lab_2 and the other named kafka.lab.2, the topic level metrics will be named kafka_lab_2 for both, effectively making it impossible to monitor them properly. The reason this happens is that using . in topic names is pretty common, especially as a way to group topics into data centers, relevant apps, etc - basically a work-around to our current lack of name spaces. However, most metric monitoring systems using . to annotate hierarchy, so to avoid issues around metric names, Kafka replaces the . in the name with an underscore. This generates good metric names, but creates the problem with name collisions. I'm wondering if it makes sense to simply limit the range of characters permitted in a topic name and disallow _? Obviously existing topics will need to remain as is, which is a bit awkward. Interesting problem! Many if not most users I personally am aware
Re: [Discussion] Limitations on topic names
Yes, agree here. While it can be a little confusing, I think it's better to just disallow the character for all creation steps so you can't create more bad topic names, but not try and enforce it for topics that already exist. Anyone who is in that situation is already there with regards to metrics, and so they are probably making sure they don't collide names that only differ in the use of _ and .. However, we don't want a new user to accidentally do it. -Todd On Fri, Jul 10, 2015 at 2:02 PM, Gwen Shapira gshap...@cloudera.com wrote: I don't think we should break existing topics. Just disallow new topics going forward. Agree that having both is horrible, but we should have a solution that fails when you run kafka_topics.sh --create, not when you configure Ganglia. Gwen On Fri, Jul 10, 2015 at 1:53 PM, Jay Kreps j...@confluent.io wrote: Unfortunately '.' is pretty common too. I agree that it is perverse, but people seem to do it. Breaking all the topics with '.' in the name seems like it could be worse than combining metrics for people who have a 'foo_bar' AND 'foo.bar' (and after all, having both is DEEPLY perverse, no?). Where is our Dean of Compatibility, Ewen, on this? -Jay On Fri, Jul 10, 2015 at 1:32 PM, Todd Palino tpal...@gmail.com wrote: My selfish point of view is that we do #1, as we use _ extensively in topic names here :) I also happen to think it's the right choice, specifically because . has more special meanings, as you noted. -Todd On Fri, Jul 10, 2015 at 1:30 PM, Gwen Shapira gshap...@cloudera.com wrote: Unintentional side effect from allowing IP addresses in consumer client IDs :) So the question is, what do we do now? 1) disallow . 2) disallow _ 3) find a reversible way to encode . and _ that won't break existing metrics 4) all of the above? btw. it looks like . and .. are currently valid. Topic names are used for directories, right? this sounds like fun :) I vote for option #1, although if someone has a good idea for #3 it will be even better. Gwen On Fri, Jul 10, 2015 at 1:22 PM, Grant Henke ghe...@cloudera.com wrote: Found it was added here: https://issues.apache.org/jira/browse/KAFKA-697 On Fri, Jul 10, 2015 at 3:18 PM, Todd Palino tpal...@gmail.com wrote: This was definitely changed at some point after KAFKA-495. The question is when and why. Here's the relevant code from that patch: === --- core/src/main/scala/kafka/utils/Topic.scala (revision 1390178) +++ core/src/main/scala/kafka/utils/Topic.scala (working copy) @@ -21,24 +21,21 @@ import util.matching.Regex object Topic { + val legalChars = [a-zA-Z0-9_-] -Todd On Fri, Jul 10, 2015 at 1:02 PM, Grant Henke ghe...@cloudera.com wrote: kafka.common.Topic shows that currently period is a valid character and I have verified I can use kafka-topics.sh to create a new topic with a period. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK currently uses Topic.validate before writing to Zookeeper. Should period character support be removed? I was under the same impression as Gwen, that a period was used by many as a way to group topics. The code is pasted below since its small: object Topic { val legalChars = [a-zA-Z0-9\\._\\-] private val maxNameLength = 255 private val rgx = new Regex(legalChars + +) val InternalTopics = Set(OffsetManager.OffsetsTopicName) def validate(topic: String) { if (topic.length = 0) throw new InvalidTopicException(topic name is illegal, can't be empty) else if (topic.equals(.) || topic.equals(..)) throw new InvalidTopicException(topic name cannot be \.\ or \..\) else if (topic.length maxNameLength) throw new InvalidTopicException(topic name is illegal, can't be longer than + maxNameLength + characters) rgx.findFirstIn(topic) match { case Some(t) = if (!t.equals(topic)) throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') case None = throw new InvalidTopicException(topic name + topic + is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-') } } } On Fri, Jul 10, 2015 at 2:50 PM, Todd Palino tpal...@gmail.com wrote: I had to go look this one up again to make sure - https://issues.apache.org/jira/browse/KAFKA-495 The only valid character names for topics are alphanumeric, underscore, and dash. A period
Re: [ANNOUNCE] New Committer
Congrats, Gwen! It's definitely deserved. -Todd On Jul 6, 2015, at 6:08 PM, Joe Stein joe.st...@stealth.ly wrote: I am pleased to announce that the Apache Kafka PMC has voted to invite Gwen Shapira as a committer and Gwen has accepted. Please join me on welcoming and congratulating Gwen. Thanks for the contribution both in the project (code, email, etc, etc, etc) and in throughout the community too(other projects, conferences, etc, etc, etc). I look forward to your continued contributions and much more to come! ~ Joe Stein - - - - - - - - - - - - - - - - - - - [image: Logo-Black.jpg] http://www.elodina.net http://www.stealth.ly - - - - - - - - - - - - - - - - - - -
[jira] [Commented] (KAFKA-2252) Socket connection closing is logged, but not corresponding opening of socket
[ https://issues.apache.org/jira/browse/KAFKA-2252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14586437#comment-14586437 ] Todd Palino commented on KAFKA-2252: I moved the normal connection closed message to DEBUG level in KAFKA-2175. In general, we should not log either normal connection close or open at INFO level. There is far too much noise due to these messages. Socket connection closing is logged, but not corresponding opening of socket Key: KAFKA-2252 URL: https://issues.apache.org/jira/browse/KAFKA-2252 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg (using 0.8.2.1) We see a large number of Closing socket connection logging to the broker logs, e.g.: {code} 2015-06-04 16:49:30,262 INFO [kafka-network-thread-27330-2] network.Processor - Closing socket connection to /1.2.3.4. 2015-06-04 16:49:30,262 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /5.6.7.8. 2015-06-04 16:49:30,695 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /9.10.11.12. 2015-06-04 16:49:31,465 INFO [kafka-network-thread-27330-1] network.Processor - Closing socket connection to /13.14.15.16. 2015-06-04 16:49:31,806 INFO [kafka-network-thread-27330-0] network.Processor - Closing socket connection to /17.18.19.20. 2015-06-04 16:49:31,842 INFO [kafka-network-thread-27330-2] network.Processor - Closing socket connection to /21.22.23.24. {code} However, we have no corresponding logging for when these connections are established. Consequently, it's not very useful to see a flood of closed connections, etc. I'd think we'd want to see the corresponding 'connection established' messages, also logged as INFO. Occasionally, we see a flood of the above messages, and have no idea as to whether it indicates a problem, etc. (Sometimes it might be due to an ongoing rolling restart, or a change in the Zookeeper cluster). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-21 Dynamic Configuration
TopicConfigManager to be ConfigOverrideManager and have it handle all the override types we will have? I think I may just be unclear on what you are proposing... -Jay On Mon, May 18, 2015 at 1:34 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Yeah, that was just a typo. I've fixed it. Thanks for calling it out. In KIP-4, I believe we have 3 types of requests: CreateTopic, AlterTopic and DeleteTopic. The topic configs are a sub-type of the Create and Alter commands. I think it would be nice to simply have a AlterConfig command that can alter any type of config rather than having a specific ClientConfig. AlterConfig = [ConfigType [AddedConfigEntry] [DeletedConfig]] ConfigType = string AddedConfigEntry = ConfigKey ConfigValue ConfigKey = string ConfigValue = string DeletedConfig = string The downside of this approach is that we will have 2 separate ways of changing topic configs (AlterTopic and AlterConfig). While a general AlterConfig only makes sense if we plan to have more than two types of entity configs.. it's definitely more future proof. Thoughts? Aditya From: Todd Palino [tpal...@gmail.com] Sent: Monday, May 18, 2015 12:39 PM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-21 Dynamic Configuration Agree with Jun here on the JSON format. I think your intention was likely to have actual JSON here and it was just a typo in the wiki? -Todd On Mon, May 18, 2015 at 12:07 PM, Jun Rao j...@confluent.io wrote: Aditya, Another thing to consider. In KIP-4, we are adding a new RPC request to change and retrieve topic configs. Do we want to add a similar RPC request to change configs per client id? If so, do we want to introduce a separate new request or have a combined new request for both topic and client id level config changes? A minor point in the wiki, for the json format in ZK, we should change {X1=Y1, X2=Y2..} to a json map, right? Thanks, Jun On Mon, May 18, 2015 at 9:48 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration Aditya
Re: Can we throw away ResponsesBeingSent Gauge?
I don't believe we're using the ResponsesBeingSent information at all. I know we use NetworkProcessorAvgIdlePercent to keep track of the utilization of the pool. -Todd On Thu, May 14, 2015 at 11:36 AM, Joel Koshy jjkosh...@gmail.com wrote: I'm also not sure how useful it is, but there is some discussion on it here: https://issues.apache.org/jira/browse/KAFKA-1597 On Thu, May 14, 2015 at 09:26:02PM +0300, Gwen Shapira wrote: Hi, As part of KAFKA-1928, we need to consolidate existing SocketServer metrics into Selector metrics. The two metrics are: 1. NetworkProcessorAvgIdlePercent 2. ResponsesBeingSent NetworkProcessorAvgIdlePercent is currently implemented as Meter, which seems to translate nicely into KafkaMetrics Rate. ResponsesBeingSent is a gauge, which doesn't have a direct KafkaMetrics equivalent, but should be easy to add. However, I'm considering dropping ResponsesBeingSent gauge completely - I find it useless. Selector currently has Rates for requests-sent, responses-sent, bytes-sent, bytes-received. Anyone finds ResponsesBeingSent useful and wants to make a passionate plea to keep it around? (I CCed the people I think will care, in case they are not watching the list) Gwen
Re: [DISCUSS] KIP-21 Configuration Management
I've been watching this discussion for a while, and I have to jump in and side with Gwen here. I see no benefit to putting the configs into Zookeeper entirely, and a lot of downside. The two biggest problems I have with this are: 1) Configuration management. OK, so you can write glue for Chef to put configs into Zookeeper. You also need to write glue for Puppet. And Cfengine. And everything else out there. Files are an industry standard practice, they're how just about everyone handles it, and there's reasons for that, not just it's the way it's always been done. 2) Auditing. Configuration files can easily be managed in a source repository system which tracks what changes were made and who made them. It also easily allows for rolling back to a previous version. Zookeeper does not. I see absolutely nothing wrong with putting the quota (client) configs and the topic config overrides in Zookeeper, and keeping everything else exactly where it is, in the configuration file. To handle configurations for the broker that can be changed at runtime without a restart, you can use the industry standard practice of catching SIGHUP and rereading the configuration file at that point. -Todd On Sun, May 10, 2015 at 4:00 AM, Gwen Shapira gshap...@cloudera.com wrote: I am still not clear about the benefits of managing configuration in ZooKeeper vs. keeping the local file and adding a refresh mechanism (signal, protocol, zookeeper, or other). Benefits of staying with configuration file: 1. In line with pretty much any Linux service that exists, so admins have a lot of related experience. 2. Much smaller change to our code-base, so easier to patch, review and test. Lower risk overall. Can you walk me over the benefits of using Zookeeper? Especially since it looks like we can't get rid of the file entirely? Gwen On Thu, May 7, 2015 at 3:33 AM, Jun Rao j...@confluent.io wrote: One of the Chef users confirmed that Chef integration could still work if all configs are moved to ZK. My rough understanding of how Chef works is that a user first registers a service host with a Chef server. After that, a Chef client will be run on the service host. The user can then push config changes intended for a service/host to the Chef server. The server is then responsible for pushing the changes to Chef clients. Chef clients support pluggable logic. For example, it can generate a config file that Kafka broker will take. If we move all configs to ZK, we can customize the Chef client to use our config CLI to make the config changes in Kafka. In this model, one probably doesn't need to register every broker in Chef for the config push. Not sure if Puppet works in a similar way. Also for storing the configs, we probably can't store the broker/global level configs in Kafka itself (e.g. in a special topic). The reason is that in order to start a broker, we likely need to make some broker level config changes (e.g., the default log.dir may not be present, the default port may not be available, etc). If we need a broker to be up to make those changes, we get into this chicken and egg problem. Thanks, Jun On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com wrote: Sorry I missed the call today :) I think an additional requirement would be: Make sure that traditional deployment tools (Puppet, Chef, etc) are still capable of managing Kafka configuration. For this reason, I'd like the configuration refresh to be pretty close to what most Linux services are doing to force a reload of configuration. AFAIK, this involves handling HUP signal in the main thread to reload configuration. Then packaging scripts can add something nice like service kafka reload. (See Apache web server: https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101) Gwen On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote: Good discussion. Since we will be talking about this at 11am, I wanted to organize these comments into requirements to see if we are all on the same page. REQUIREMENT 1: Needs to accept dynamic config changes. This needs to be general enough to work for all configs that we envision may need to accept changes at runtime. e.g., log (topic), broker, client (quotas), etc.. possible options include: - ZooKeeper watcher - Kafka topic - Direct RPC to controller (or config coordinator) The current KIP is really focused on REQUIREMENT 1 and I think that is reasonable as long as we don't come up with something that requires significant re-engineering to support the other requirements. REQUIREMENT 2: Provide consistency of configs across brokers (modulo per-broker overrides) or at least be able to verify consistency. What this effectively means is that config changes must be seen by all brokers eventually and
[jira] [Updated] (KAFKA-2175) Reduce server log verbosity at info level
[ https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-2175: --- Status: Patch Available (was: Open) Reduce server log verbosity at info level - Key: KAFKA-2175 URL: https://issues.apache.org/jira/browse/KAFKA-2175 Project: Kafka Issue Type: Improvement Components: controller, zkclient Affects Versions: 0.8.3 Reporter: Todd Palino Assignee: Todd Palino Priority: Minor Labels: newbie Attachments: KAFKA-2175.patch Currently, the broker logs two messages at INFO level that should be at a lower level. This serves only to fill up log files on disk, and can cause performance issues due to synchronous logging as well. The first is the Closing socket connection message when there is no error. This should be reduced to debug level. The second is the message that ZkUtil writes when updating the partition reassignment JSON. This message contains the entire JSON blob and should never be written at info level. In addition, there is already a message in the controller log stating that the ZK node has been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2175) Reduce server log verbosity at info level
[ https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino reassigned KAFKA-2175: -- Assignee: Todd Palino (was: Neha Narkhede) Reduce server log verbosity at info level - Key: KAFKA-2175 URL: https://issues.apache.org/jira/browse/KAFKA-2175 Project: Kafka Issue Type: Improvement Components: controller, zkclient Affects Versions: 0.8.3 Reporter: Todd Palino Assignee: Todd Palino Priority: Minor Labels: newbie Attachments: KAFKA-2175.patch Currently, the broker logs two messages at INFO level that should be at a lower level. This serves only to fill up log files on disk, and can cause performance issues due to synchronous logging as well. The first is the Closing socket connection message when there is no error. This should be reduced to debug level. The second is the message that ZkUtil writes when updating the partition reassignment JSON. This message contains the entire JSON blob and should never be written at info level. In addition, there is already a message in the controller log stating that the ZK node has been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2175) Reduce server log verbosity at info level
Todd Palino created KAFKA-2175: -- Summary: Reduce server log verbosity at info level Key: KAFKA-2175 URL: https://issues.apache.org/jira/browse/KAFKA-2175 Project: Kafka Issue Type: Improvement Components: controller, zkclient Affects Versions: 0.8.3 Reporter: Todd Palino Assignee: Neha Narkhede Priority: Minor Currently, the broker logs two messages at INFO level that should be at a lower level. This serves only to fill up log files on disk, and can cause performance issues due to synchronous logging as well. The first is the Closing socket connection message when there is no error. This should be reduced to debug level. The second is the message that ZkUtil writes when updating the partition reassignment JSON. This message contains the entire JSON blob and should never be written at info level. In addition, there is already a message in the controller log stating that the ZK node has been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-11- Authorization design for kafka security
I tend to agree with Parth's point here. Most ACL systems I run into have deny and allow. In general, you have a default policy of allow, then you follow your rules stopping at the first line that matches. If you would like a default deny policy, you have a bunch of allow rules and your last rule is deny all. This says everyone listed is allowed. Everyone else is denied. If you instead want a default allow, you have a list of deny rules and the last rule is allow all. This says everyone listed is denied. Everyone else is allowed. I think leaving out a full rule set would be a mistake, as it makes the assumption that you know what all the use cases are. I think all it will really mean is that we will see a KIP before long to fix it. -Todd On Apr 20, 2015, at 7:13 PM, Gwen Shapira gshap...@cloudera.com wrote: Thanks for clarifying the logic. I'm +0 on the deny thing. IMO, its not really needed, but if you think its important, I don't object to having it in. Gwen On Mon, Apr 20, 2015 at 7:07 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: The iptables on unix supports the DENY operator, not that it should matter. The deny operator can also be used to specify ³allow user1 to READ from topic1 from all hosts but host1,host2². Again we could add a host group semantic and extra complexity around that, not sure if its worth it. In addition with DENY operator you are now not forced to create a special group just to support the authorization use case. I am not convinced that the operator it self is really all that confusing. There are 3 practical use cases: - Resource with no acl what so ever - allow access to everyone ( just for backward compatibility, I would much rather fail close and force users to explicitly grant acls that allows access to all users.) - Resource with some acl attached - only users that have a matching allow acl are allowed (i.e. ³allow READ access to topic1 to user1 from all hosts², only user1 has READ access and no other user has access of any kind) - Resource with some allow and some deny acl attached - users are allowed to perform operation only when they satisfy allow acl and do not have conflicting deny acl. Users that have no acl(allow or deny) will still not have any access. (i.e. ³allow READ access to topic1 to user1 from all hosts except host1 and host², only user1 has access but not from host1 an host2) I think we need to make a decision on deny primarily because with introduction of acl management API, Acl is now a public class that will be used by Ranger/Santry and other authroization providers. In Current design the acl has a permissionType enum field with possible values of Allow and Deny. If we chose to remove deny we can assume all acls to be of allow type and remove the permissionType field completely. Thanks Parth On 4/20/15, 6:12 PM, Gwen Shapira gshap...@cloudera.com wrote: I think thats how its done in pretty much any system I can think of.
Re: [DISCUSS] KIP-18 - JBOD Support
I'm with you on that, Jay, although I don't consider your case common. More common is that there are things on all the disks at their normal retention, and you add something new. That said, it doesn't really matter because what you're illustrating is a valid concern. Automatic balancing would probable alleviate any issues coming from a bad initial placement. Jumping back an email, yes, it is a really big deal that the entire broker fails when one mount point fails. It is much better to run with degraded performance than it is to run with degraded replication, and disks fail constantly. If I have 10% of my machines offline, Kafka's not going to last very long at LinkedIn ;) -Todd On Sat, Apr 11, 2015 at 11:58 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Todd, The problem you pointed out is real. Unfortunately, placing by available size at creation time actually makes things worse. The original plan was to place new partitions on the disk with the most space, but consider a common case: disk 1: 500M disk 2: 0M Now say you are creating 10 partitions for what will be a massively large topic. You will place them all on disk 2 as it has the most space, but then immediately you will discover that that was a bad idea as those partitions get huge. I think the current balancing by number of partitions is better than this because at least you get basically random assignment. I think to solve the problem you describe we need to do active rebalancing--predicting the size of partitions ahead of time is basically impossible. I think having the controller be unaware of disks is probably a good thing. So the controller would balance partitions over servers and the server would be responsible for balancing over disks. I think this kind of balancing is possible though not totally trivial. Basically a background thread in LogManager would decide that there was too much skew in data assignment (actually debatable whether it is data size or I/O throughput that you should optimize) and would then try to rebalance. To do the rebalance it would do a background copy of the log from the current disk to the new disk, then it would take the partition offline and delete the old log, then bring the partition back using the new log and catch it back up off the leader. -Jay On Thu, Apr 9, 2015 at 8:19 AM, Todd Palino tpal...@gmail.com wrote: I think this is a good start. We've been discussing JBOD internally, so it's good to see a discussion going externally about it as well. The other big blocker to using JBOD is the lack of intelligent partition assignment logic, and the lack of tools to adjust it. The controller is not smart enough to take into account disk usage when deciding to place a partition, which may not be a big concern (at the controller level, you worry about broker usage, not individual mounts). However, the broker is not smart enough to do it either, when looking at the local directories. It just round robins. In addition, there is no tool available to move a partition from one mount point to another. So in the case that you do have a hot disk, you cannot do anything about it without shutting down the broker and doing a manual move of the log segments. -Todd On Thu, Apr 9, 2015 at 5:36 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hi, Let me start discussion thread for KIP-18 - JBOD Support. Link to wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-18+-+JBOD+Support Thanks, Andrii Biletskyi
Re: [DISCUSS] KIP-18 - JBOD Support
I think this is a good start. We've been discussing JBOD internally, so it's good to see a discussion going externally about it as well. The other big blocker to using JBOD is the lack of intelligent partition assignment logic, and the lack of tools to adjust it. The controller is not smart enough to take into account disk usage when deciding to place a partition, which may not be a big concern (at the controller level, you worry about broker usage, not individual mounts). However, the broker is not smart enough to do it either, when looking at the local directories. It just round robins. In addition, there is no tool available to move a partition from one mount point to another. So in the case that you do have a hot disk, you cannot do anything about it without shutting down the broker and doing a manual move of the log segments. -Todd On Thu, Apr 9, 2015 at 5:36 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hi, Let me start discussion thread for KIP-18 - JBOD Support. Link to wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-18+-+JBOD+Support Thanks, Andrii Biletskyi
[jira] [Commented] (KAFKA-1342) Slow controlled shutdowns can result in stale shutdown requests
[ https://issues.apache.org/jira/browse/KAFKA-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14484278#comment-14484278 ] Todd Palino commented on KAFKA-1342: Bump I think we need to revive this. We have a safe shutdown bit of wrapper code we use which relies on an external resource that we should eliminate. It would be better to provide a safe shutdown option within Kafka itself without that wrapper (i.e. do not shut down unless your under replicated count is 0). However, this is not possible without serialized shutdown at the controller level. We can't allow a second broker to shut down until the first broker has completed its shutdown process. Then the second broker can check the URP count and be allowed to proceed. Slow controlled shutdowns can result in stale shutdown requests --- Key: KAFKA-1342 URL: https://issues.apache.org/jira/browse/KAFKA-1342 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Joel Koshy Priority: Blocker Fix For: 0.9.0 I don't think this is a bug introduced in 0.8.1., but triggered by the fact that controlled shutdown seems to have become slower in 0.8.1 (will file a separate ticket to investigate that). When doing a rolling bounce, it is possible for a bounced broker to stop all its replica fetchers since the previous PID's shutdown requests are still being shutdown. - 515 is the controller - Controlled shutdown initiated for 503 - Controller starts controlled shutdown for 503 - The controlled shutdown takes a long time in moving leaders and moving follower replicas on 503 to the offline state. - So 503's read from the shutdown channel times out and a new channel is created. It issues another shutdown request. This request (since it is a new channel) is accepted at the controller's socket server but then waits on the broker shutdown lock held by the previous controlled shutdown which is still in progress. - The above step repeats for the remaining retries (six more requests). - 503 hits SocketTimeout exception on reading the response of the last shutdown request and proceeds to do an unclean shutdown. - The controller's onBrokerFailure call-back fires and moves 503's replicas to offline (not too important in this sequence). - 503 is brought back up. - The controller's onBrokerStartup call-back fires and moves its replicas (and partitions) to online state. 503 starts its replica fetchers. - Unfortunately, the (phantom) shutdown requests are still being handled and the controller sends StopReplica requests to 503. - The first shutdown request finally finishes (after 76 minutes in my case!). - The remaining shutdown requests also execute and do the same thing (sends StopReplica requests for all partitions to 503). - The remaining requests complete quickly because they end up not having to touch zookeeper paths - no leaders left on the broker and no need to shrink ISR in zookeeper since it has already been done by the first shutdown request. - So in the end-state 503 is up, but effectively idle due to the previous PID's shutdown requests. There are some obvious fixes that can be made to controlled shutdown to help address the above issue. E.g., we don't really need to move follower partitions to Offline. We did that as an optimization so the broker falls out of ISR sooner - which is helpful when producers set required.acks to -1. However it adds a lot of latency to controlled shutdown. Also, (more importantly) we should have a mechanism to abort any stale shutdown process. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-17 - Add HighwaterMarkOffset to OffsetFetchResponse
I agree with Jun here, that it would make it easier to do lag checking. However, for individual checks it's really not that much trouble to do the second request. If you're doing a lot of lag checking (like every consumer and every topic) where the scale would start to make a difference, I would argue that you should not be using individual OffsetFetchRequests to do it. You should instead consume the __consumer_offsets topic to get the committed offsets, in which case you're still back to getting the high watermark another way (either through an OffsetRequest or through JMX). -Todd On Thu, Mar 26, 2015 at 7:54 AM, Jun Rao j...@confluent.io wrote: Grant, In addition to FetchRequest, currently we have another way to get the high watermark through OffsetRequest ( https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest ). OffsetRequest is a read-only request and is much lighter than FetchRequest. This is what monitoring tools like ConsumerOffsetChecker is using now. By returning the high watermark in the OffsetFetchRequest, we can implement tools like ConsumerOffsetChecker a bit simpler: instead of making two requests, the tool just needs to make one request. However, I am not sure if it makes a big difference. Thanks, Jun On Tue, Mar 24, 2015 at 8:13 PM, Grant Henke ghe...@cloudera.com wrote: Here is an initial proposal to add HighwaterMarkOffset to the OffsetFetchResponse: https://cwiki.apache.org/confluence/display/KAFKA/KIP-17+-+Add+HighwaterMarkOffset+to+OffsetFetchResponse I can add a jira and more implementation details if the initial proposal has interest. Thanks, Grant -- Grant Henke Solutions Consultant | Cloudera ghe...@cloudera.com | 920-980-8979 twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke
Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing
I understand the desire to not bloat this one change with too much more work, and it's a good change to start with. That said, I have one note on your comments: I don't agree with this because right now you get back the current state of the partitions so you can (today) write whatever logic you want (with the information that is there). (with regards to pluggable schemes) I think this is a really bad place to be. While we're in agreement that reshuffling the cluster from one scheme to another should not be automated, the creation and placement of new topics and partitions must be, and you can't rely on an external process to handle that for you. That leaves gaps in what is getting done, and a large failure point. Where to put a partition has to be a decision that the controller makes correctly (where correctly is defined as the way I want the cluster balanced) upon creation, and not something that we come in and fix after the fact. We're in agreement that this should be a new KIP, and that the sourcing and handling of metadata for something like rack-awareness is non-trivial and is going to require a lot of discussion. Plus, we're going to not only want to be rack-aware but also balanced by partition size and/or count. That's going to be somewhat tricky to get right. -Todd On Wed, Mar 11, 2015 at 12:12 PM, Joe Stein joe.st...@stealth.ly wrote: Sorry for not catching up on this thread earlier, I wanted to-do this before the KIP got its updates so we could discuss if need be and not waste more time re-writing/working things that folks have issues with or such. I captured all the comments so far here with responses. So fair assignment by count (taking into account the current partition count of each broker) is very good. However, it's worth noting that all partitions are not created equal. We have actually been performing more rebalance work based on the partition size on disk, as given equal retention of all topics, the size on disk is a better indicator of the amount of traffic a partition gets, both in terms of storage and network traffic. Overall, this seems to be a better balance. Agreed though this is out of scope (imho) for what the motivations for the KIP were. The motivations section is blank (that is on me) but honestly it is because we did all the development, went back and forth with Neha on the testing and then had to back it all into the KIP process... Its a time/resource/scheduling and hope to update this soon on the KIP ... all of this is in the JIRA and code patch so its not like it is not there just not in the place maybe were folks are looking since we changed where folks should look. Initial cut at Motivations: the --generate is not used by a lot of folks because they don't trust it. Issues such as giving different results sometimes when you run it. Also other feedback from the community that it does not account for specific uses cases like adding new brokers and removing brokers (which is where that patch started https://issues.apache.org/jira/browse/KAFKA-1678 but then we changed it after review into just --rebalance https://issues.apache.org/jira/browse/KAFKA-1792). The use case for add and remove brokers is one that happens in AWS and auto scailing. There are other reasons for this too of course. The goal originally was to make what folks are already coding today (with the output of available in the project for the community. Based on the discussion in the JIRA with Neha we all agreed that making it be a faire rebalance would fulfill both uses cases. In addition to this, I think there is very much a need to have Kafka be rack-aware. That is, to be able to assure that for a given cluster, you never assign all replicas for a given partition in the same rack. This would allow us to guard against maintenances or power failures that affect a full rack of systems (or a given switch). Agreed, this though I think is out of scope for this change and something we can also do in the future. There is more that we have to figure out for rack aware specifically answering how do we know what rack the broker is on. I really really (really) worry that we keep trying to put too much into a single change the discussions go into rabbit holes and good important features (that are community driven) that could get out there will get bogged down with different uses cases and scope creep. So, I think rack awareness is its own KIP that has two parts... setting broker rack and rebalancing for that. That features doesn't invalidate the need for --rebalance but can be built on top of it. I think it would make sense to implement the reassignment logic as a pluggable component. That way it would be easy to select a scheme when performing a reassignment (count, size, rack aware). Configuring a default scheme for a cluster would allow for the brokers to create new topics and partitions in compliance with the requested policy. I don't agree with
Re: [KIP-DISCUSSION] KIP-13 Quotas
We're getting off the KIP a little bit here, but while I understand the idea of not honoring the request timeout, I think that the broker ignoring it without telling the client is not a good implementation. It gets back to a larger discussion, which I was mentioning, of just negotiating with (or at least notifying) the client of important configuration details for the cluster. Any configuration, like a minimum timeout value or the maximum message size, which have to be configured the same on both the broker and the client, or where the client has a required minimum value for a setting (like fetch size), should be clearly stated by the broker in a handshake. Don't you get frustrated when you sign up for an account on a website, select a nice secure password, and then get told after submission Your password is invalid - it must be between 6 and 14 characters. Then on the next submission you get told Your password is invalid - it can only contain certain symbols, and they don't tell you what symbols are allowed? Why didn't they just tell you all that up front so you could get it right the first time? -Todd On Wed, Mar 11, 2015 at 10:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Todd, Yeah it is kind of weird to do the quota check after taking a request, but since the penalty is applied during that request and it just delays you to the right rate, I think it isn't exactly wrong. I admit it is weird, though. What you say about closing the connection makes sense. The issue is that our current model for connections is totally transient. The clients are supposed to handle any kind of transient connection loss and just re-establish. So basically all existing clients would likely just retry all the same whether you closed the connection or not, so at the moment there would be no way to know a retried request is actually a retry. Your point about the REST proxy is a good one, I don't think we had considered that. Currently the java producer just has a single client.id for all requests so the rest proxy would be a single client. But actually what you want is the original sender to be the client. This is technically very hard to do because the client will actually be batching records from all senders together into one request so the only way to get the client id right would be to make a new producer for each rest proxy client and this would mean a lot of memory and connections. This needs thought, not sure what solution there is. I am not 100% convinced we need to obey the request timeout. The configuration issue actually isn't a problem because the request timeout is sent with the request so the broker actually knows it now even without a handshake. However the question is, if someone sets a pathologically low request timeout do we need to obey it? and if so won't that mean we can't quota them? I claim the answer is no! I think we should redefine request timeout to mean replication timeout, which is actually what it is today. Even today if you interact with a slow server it may take longer than that timeout (say because the fs write queues up for a long-ass time). I think we need a separate client timeout which should be fairly long and unlikely to be hit (default to 30 secs or something). -Jay On Tue, Mar 10, 2015 at 10:12 AM, Todd Palino tpal...@gmail.com wrote: Thanks, Jay. On the interface, I agree with Aditya (and you, I believe) that we don't need to expose the public API contract at this time, but structuring the internal logic to allow for it later with low cost is a good idea. Glad you explained the thoughts on where to hold requests. While my gut reaction is to not like processing a produce request that is over quota, it makes sense to do it that way if you are going to have your quota action be a delay. On the delay, I see your point on the bootstrap cases. However, one of the places I differ, and part of the reason that I prefer the error, is that I would never allow a producer who is over quota to resend a produce request. A producer should identify itself at the start of it's connection, and at that point if it is over quota, the broker would return an error and close the connection. The same goes for a consumer. I'm a fan, in general, of pushing all error cases and handling down to the client and doing as little special work to accommodate those cases on the broker side as possible. A case to consider here is what does this mean for REST endpoints to Kafka? Are you going to hold the HTTP connection open as well? Is the endpoint going to queue and hold requests? I think the point that we can only delay as long as the producer's timeout is a valid one, especially given that we do not have any means for the broker and client to negotiate settings, whether that is timeouts or message sizes or anything else. There are a lot of things that you have to know when setting up a Kafka client about what
Re: [KIP-DISCUSSION] KIP-13 Quotas
First, a couple notes on this... 3 - I generally agree with the direction of not pre-optimizing. However, in this case I'm concerned about the calculation of the cost of doing plugins now vs. trying to refactor the code to do it later. It would seem to me that doing it up front will have less friction. If we wait to do plugins later, it will probably mean changing a lot of related code which will be significantly more work. We've spent a lot of time talking about various implementations, and I think it not unreasonable to believe that what one group wants initially is not going to solve even most cases, as it will vary by use case. 4 - I really disagree with this. Slowing down a request means that you're going to hold onto it in the broker. This takes up resources and time, and is generally not the way other services handle quota violations. In addition you are causing potential problems with the clients by taking a call that's supposed to return as quickly as possible and making it take a long time. This increases latency and deprives the client of the ability to make good decisions about what to do. By sending an error back to the client you inform them of what the problem is, and you allow the client to make an intelligent decision, such as queuing to send later, sending to another resource, or handling anything from their upstreams differently. You're absolutely right that throwing back an immediate error has the potential to turn a quota violation into a different problem for a badly behaved client. But OS and upstream networking tools can see a problem based on a layer 4 issue (rapidly reconnecting client) rather than layers above. Out of the options provided, I think A is the correct choice. B seems to be the most work (you have the delay, and the client still has to handle errors and backoff), and C is what I disagree with doing. I would also like to see a provision for allowing the client to query its quota status within the protocol. I think we should allow for a request (or information within an existing response) where the client can ask what its current quota status is. This will allow for the clients to manage their quotas, and it will allow for emitting metrics on the client side for quota status (rather than relying on the server-side metrics, which tends to put the responsibility in the wrong place). -Todd On Sun, Mar 8, 2015 at 10:52 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Adi, Great write-up. Here are some comments: 1. I don't think you need a way to disable quotas on a per-client basis, that is just the equivalent of setting the quota to be infinite, right? 2. I agree that the configuration problem is a general part of doing dynamic configuration, and it is right to separate that into the config KIP. But Joe's proposal currently doesn't provide nearly what you need in its current form--it doesn't even handle client-id based configuration, let alone the notification mechanism you would need to update your quota--so we really need to give completely explicitly how that KIP is going to solve this problem. 3. Custom quota implementations: let's do this later. Pluggability comes with a high cost and we want to try really hard to avoid it. So in the future if we have a really solid case for an alternative quota approach let's see if we can't improve the current approach and stick with one good implementation. If we really can't then let's add a plugin system. I think doing it now is premature. 4. I think the ideal quota action from the users point of view is just to slow down the writer or reader transparently to match their capacity allocation. Let's try to see if we can make that work. I think immediate error can be ruled out entirely because it depends on the client properly backing off. In cases where they don't we may actually make things worse. Given the diversity of clients I think this is probably not going to happen. The only downside to just delaying the request that was pointed out was that if the delay exceeded the request timeout the user might retry. This is true but it applies to any approach that delays requests (both B and C). I think with any sane request timeout and quota the per request delay we induce will be way lower (otherwise you would be hitting the timeout all the time just due to linux I/O variance, in which case you can't really complain). 5. We need to explain the relationship between the quota stuff in the metrics package and this. We need to either remove that stuff or use it. We can't have two quota things. Since quota fundamentally apply to windowed metrics, I would suggest doing whatever improvements to that to make it usable for quotas. 6. I don't think the quota manager interface is really what we need if I'm understanding it correctly. You give a method T extends RequestOrResponse boolean check(T request); But how would you implement this method? It seems like it would basically internally
[jira] [Created] (KAFKA-2012) Broker should automatically handle corrupt index files
Todd Palino created KAFKA-2012: -- Summary: Broker should automatically handle corrupt index files Key: KAFKA-2012 URL: https://issues.apache.org/jira/browse/KAFKA-2012 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Todd Palino We had a bunch of unclean system shutdowns (power failure), which caused corruption on our disks holding log segments in many cases. While the broker is handling the log segment corruption properly (truncation), it is having problems with corruption in the index files. Additionally, this only seems to be happening on some startups (while we are upgrading). The broker should just do what I do when I hit a corrupt index file - remove it and rebuild it. 2015/03/09 17:58:53.873 FATAL [KafkaServerStartable] [main] [kafka-server] [] Fatal error during KafkaServerStartable startup. Prepare to shutdown java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (/export/content/kafka/i001_caches/__consumer_offsets-39/.index) has non-zero size but the last offset is -2121629628 and the base offset is 0 at scala.Predef$.require(Predef.scala:233) at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352) at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:185) at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:184) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.loadSegments(Log.scala:184) at kafka.log.Log.init(Log.scala:82) at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:141) at kafka.utils.Utils$$anon$1.run(Utils.scala:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing
I would not think that partitions moving would cause any orphaned messages like that. I would be more concerned about what happens when you change the default on a running cluster from one scheme to another. Would we want to support some kind of automated reassignment of existing partitions (personally - no. I want to trigger that manually because it is a very disk and network intensive process)? -Todd On Wed, Mar 4, 2015 at 7:33 PM, Tong Li liton...@us.ibm.com wrote: Todd, I think plugable design is good with solid default. The only issue I feel is when you use one and switch to another, will we end up with some unread messages hanging around and no one thinks or knows it is their responsibility to take care of them? Thanks. Tong Sent from my iPhone On Mar 5, 2015, at 10:46 AM, Todd Palino tpal...@gmail.com wrote: Apologize for the late comment on this... So fair assignment by count (taking into account the current partition count of each broker) is very good. However, it's worth noting that all partitions are not created equal. We have actually been performing more rebalance work based on the partition size on disk, as given equal retention of all topics, the size on disk is a better indicator of the amount of traffic a partition gets, both in terms of storage and network traffic. Overall, this seems to be a better balance. In addition to this, I think there is very much a need to have Kafka be rack-aware. That is, to be able to assure that for a given cluster, you never assign all replicas for a given partition in the same rack. This would allow us to guard against maintenances or power failures that affect a full rack of systems (or a given switch). I think it would make sense to implement the reassignment logic as a pluggable component. That way it would be easy to select a scheme when performing a reassignment (count, size, rack aware). Configuring a default scheme for a cluster would allow for the brokers to create new topics and partitions in compliance with the requested policy. -Todd On Thu, Jan 22, 2015 at 10:13 PM, Joe Stein joe.st...@stealth.ly wrote: I will go back through the ticket and code and write more up. Should be able to-do that sometime next week. The intention was to not replace existing functionality by issue a WARN on use. The following version it is released we could then deprecate it... I will fix the KIP for that too. On Fri, Jan 23, 2015 at 12:34 AM, Neha Narkhede n...@confluent.io wrote: Hey Joe, 1. Could you add details to the Public Interface section of the KIP? This should include the proposed changes to the partition reassignment tool. Also, maybe the new option can be named --rebalance instead of --re-balance? 2. It makes sense to list --decommission-broker as part of this KIP. Similarly, shouldn't we also have an --add-broker option? The way I see this is that there are several events when a partition reassignment is required. Before this functionality is automated on the broker, the tool will generate an ideal replica placement for each such event. The users should merely have to specify the nature of the event e.g. adding a broker or decommissioning an existing broker or merely rebalancing. 3. If I understand the KIP correctly, the upgrade plan for this feature includes removing the existing --generate option on the partition reassignment tool in 0.8.3 while adding all the new options in the same release. Is that correct? Thanks, Neha On Thu, Jan 22, 2015 at 9:23 PM, Jay Kreps jay.kr...@gmail.com wrote: Ditto on this one. Can you give the algorithm we want to implement? Also I think in terms of scope this is just proposing to change the logic in ReassignPartitionsCommand? I think we've had the discussion various times on the mailing list that what people really want is just for Kafka to do it's best to balance data in an online fashion (for some definition of balance). i.e. if you add a new node partitions would slowly migrate to it, and if a node dies, partitions slowly migrate off it. This could potentially be more work, but I'm not sure how much more. Has anyone thought about how to do it? -Jay On Wed, Jan 21, 2015 at 10:11 PM, Joe Stein joe.st...@stealth.ly wrote: Posted a KIP for --re-balance for partition assignment in reassignment tool. https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New +reassignment+partition+logic+for+re-balancing JIRA https://issues.apache.org/jira/browse/KAFKA-1792 While going through the KIP I thought of one thing from the JIRA that we should change. We should preserve --generate to be existing functionality for the next release
Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing
Apologize for the late comment on this... So fair assignment by count (taking into account the current partition count of each broker) is very good. However, it's worth noting that all partitions are not created equal. We have actually been performing more rebalance work based on the partition size on disk, as given equal retention of all topics, the size on disk is a better indicator of the amount of traffic a partition gets, both in terms of storage and network traffic. Overall, this seems to be a better balance. In addition to this, I think there is very much a need to have Kafka be rack-aware. That is, to be able to assure that for a given cluster, you never assign all replicas for a given partition in the same rack. This would allow us to guard against maintenances or power failures that affect a full rack of systems (or a given switch). I think it would make sense to implement the reassignment logic as a pluggable component. That way it would be easy to select a scheme when performing a reassignment (count, size, rack aware). Configuring a default scheme for a cluster would allow for the brokers to create new topics and partitions in compliance with the requested policy. -Todd On Thu, Jan 22, 2015 at 10:13 PM, Joe Stein joe.st...@stealth.ly wrote: I will go back through the ticket and code and write more up. Should be able to-do that sometime next week. The intention was to not replace existing functionality by issue a WARN on use. The following version it is released we could then deprecate it... I will fix the KIP for that too. On Fri, Jan 23, 2015 at 12:34 AM, Neha Narkhede n...@confluent.io wrote: Hey Joe, 1. Could you add details to the Public Interface section of the KIP? This should include the proposed changes to the partition reassignment tool. Also, maybe the new option can be named --rebalance instead of --re-balance? 2. It makes sense to list --decommission-broker as part of this KIP. Similarly, shouldn't we also have an --add-broker option? The way I see this is that there are several events when a partition reassignment is required. Before this functionality is automated on the broker, the tool will generate an ideal replica placement for each such event. The users should merely have to specify the nature of the event e.g. adding a broker or decommissioning an existing broker or merely rebalancing. 3. If I understand the KIP correctly, the upgrade plan for this feature includes removing the existing --generate option on the partition reassignment tool in 0.8.3 while adding all the new options in the same release. Is that correct? Thanks, Neha On Thu, Jan 22, 2015 at 9:23 PM, Jay Kreps jay.kr...@gmail.com wrote: Ditto on this one. Can you give the algorithm we want to implement? Also I think in terms of scope this is just proposing to change the logic in ReassignPartitionsCommand? I think we've had the discussion various times on the mailing list that what people really want is just for Kafka to do it's best to balance data in an online fashion (for some definition of balance). i.e. if you add a new node partitions would slowly migrate to it, and if a node dies, partitions slowly migrate off it. This could potentially be more work, but I'm not sure how much more. Has anyone thought about how to do it? -Jay On Wed, Jan 21, 2015 at 10:11 PM, Joe Stein joe.st...@stealth.ly wrote: Posted a KIP for --re-balance for partition assignment in reassignment tool. https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New+reassignment+partition+logic+for+re-balancing JIRA https://issues.apache.org/jira/browse/KAFKA-1792 While going through the KIP I thought of one thing from the JIRA that we should change. We should preserve --generate to be existing functionality for the next release it is in. If folks want to use --re-balance then great, it just won't break any upgrade paths, yet. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / -- Thanks, Neha
[jira] [Created] (KAFKA-1987) Potential race condition in partition creation
Todd Palino created KAFKA-1987: -- Summary: Potential race condition in partition creation Key: KAFKA-1987 URL: https://issues.apache.org/jira/browse/KAFKA-1987 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1.1 Reporter: Todd Palino Assignee: Neha Narkhede I am finding that there appears to be a race condition when creating partitions, with replication factor 2 or higher, between the creation of the partition on the leader and the follower. What appears to be happening is that the follower is processing the command to create the partition before the leader does, and when the follower starts the replica fetcher, it fails with an UnknownTopicOrPartitionException. The situation is that I am creating a large number of partitions on a cluster, preparing it for data being mirrored from another cluster. So there are a sizeable number of create and alter commands being sent sequentially. Eventually, the replica fetchers start up properly. But it seems like the controller should issue the command to create the partition to the leader, wait for confirmation, and then issue the command to create the partition to the followers. 2015/02/26 21:11:50.413 INFO [LogManager] [kafka-request-handler-12] [kafka-server] [] Created log for partition [topicA,30] in /path_to/i001_caches with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 268435456, flush.ms - 1, delete.retention.ms - 8640, index.interval.bytes - 4096, retention.bytes - -1, min.insync.replicas - 1, cleanup.policy - delete, unclean.leader.election.enable - true, segment.ms - 4320, max.message.bytes - 100, flush.messages - 2, min.cleanable.dirty.ratio - 0.5, retention.ms - 8640, segment.jitter.ms - 0}. 2015/02/26 21:11:50.418 WARN [Partition] [kafka-request-handler-12] [kafka-server] [] Partition [topicA,30] on broker 1551: No checkpointed highwatermark is found for partition [topicA,30] 2015/02/26 21:11:50.418 INFO [ReplicaFetcherManager] [kafka-request-handler-12] [kafka-server] [] [ReplicaFetcherManager on broker 1551] Removed fetcher for partitions [topicA,30] 2015/02/26 21:11:50.418 INFO [Log] [kafka-request-handler-12] [kafka-server] [] Truncating log topicA-30 to offset 0. 2015/02/26 21:11:50.450 INFO [ReplicaFetcherManager] [kafka-request-handler-12] [kafka-server] [] [ReplicaFetcherManager on broker 1551] Added fetcher for partitions List([[topicA,30], initOffset 0 to broker id:1555,host:host1555.example.com,port:10251] ) 2015/02/26 21:11:50.615 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2015/02/26 21:11:50.616 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2015/02/26 21:11:50.618 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2015/02/26 21:11:50.620 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2015/02/26 21:11:50.621 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints
The idea is more about isolating the intra-cluster traffic from the normal clients as much as possible. There's a couple situations we've seen where this would be useful that I can think of immediately: 1) Normal operation - just having the intra-cluster traffic on a separate network interface would allow it to not get overwhelmed by something like a bootstrapping client who is saturating the network interface. We see this fairly often, where the replication falls behind because of heavy traffic from one application. We can always adjust the network threads, but segregating the traffic is the first step. 2) Isolation in case of an error - We have had situations, more than once, where we are needing to rebuild a cluster after a catastrophic problem and the clients are causing that process to take too long, or are causing additional failures. This has mostly come into play with file descriptor limits in the past, but it's certainly not the only situation. Constantly reconnecting clients continue to cause the brokers to fall over while we are trying to recover a down cluster. The only solution was to firewall off all the clients temporarily. This is a great deal more complicated if the brokers and the clients are all operating over the same port. Now, that said, quotas can be a partial solution to this. I don't want to jump the gun on that discussion (because it's going to come up separately and in more detail), but it is possible to structure quotas in a way that will allow the intra-cluster replication to continue to function in the case of high load. That would partially address case 1, but it does nothing for case 2. Additionally, I think it is also desirable to segregate the traffic even with quotas, so that regardless of the client load, the cluster itself is able to be healthy. -Todd On Thu, Feb 12, 2015 at 11:38 AM, Jun Rao j...@confluent.io wrote: Todd, Could you elaborate on the benefit for having a separate endpoint for intra-cluster communication? Is it mainly for giving intra-cluster requests a high priority? At this moment, having a separate endpoint just means that the socket connection for the intra-cluster communication is handled by a separate acceptor thread. The processing of the requests from the network and the handling of the requests are still shared by a single thread pool. So, if any of the thread pool is exhausted, the intra-cluster requests will still be delayed. We can potentially change this model, but this requires more work. An alternative is to just rely on quotas. Intra-cluster requests will be exempt from any kind of throttling. Gwen, I agree that defaulting wire.protocol.version to the current version is probably better. It just means that we need to document the migration path for previous versions. Thanks, Jun On Wed, Feb 11, 2015 at 6:33 PM, Todd Palino tpal...@gmail.com wrote: Thanks, Gwen. This looks good to me as far as the wire protocol versioning goes. I agree with you on defaulting to the new wire protocol version for new installs. I think it will also need to be very clear (to general installer of Kafka, and not just developers) in documentation when the wire protocol version changes moving forwards, and what the risk/benefit of changing to the new version is. Since a rolling upgrade of the intra-cluster protocol is supported, will a rolling downgrade work as well? Should a flaw (bug, security, or otherwise) be discovered after upgrade, is it possible to change the wire.protocol.version back to 0.8.2 and do a rolling bounce? On the host/port/protocol specification, specifically the ZK config format, is it possible to have an un-advertised endpoint? I would see this as potentially useful if you wanted to have an endpoint that you are reserving for intra-cluster communication, and you would prefer to not have it advertised at all. Perhaps it is blocked by a firewall rule or other authentication method. This could also allow you to duplicate a security protocol type but segregate it on a different port or interface (if it is unadvertised, there is no ambiguity to the clients as to which endpoint should be selected). I believe I asked about that previously, and I didn't track what the final outcome was or even if it was discussed further. -Todd On Wed, Feb 11, 2015 at 4:38 PM, Gwen Shapira gshap...@cloudera.com wrote: Added Jun's notes to the KIP (Thanks for explaining so clearly, Jun. I was clearly struggling with this...) and removed the reference to use.new.wire.protocol. On Wed, Feb 11, 2015 at 4:19 PM, Joel Koshy jjkosh...@gmail.com wrote: The description that Jun gave for (2) was the detail I was looking for - Gwen can you update the KIP with that for completeness/clarity? I'm +1 as well overall. However, I think it would be good if we also get an ack from someone who is more experienced on the operations side
Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints
Thanks, Gwen. This looks good to me as far as the wire protocol versioning goes. I agree with you on defaulting to the new wire protocol version for new installs. I think it will also need to be very clear (to general installer of Kafka, and not just developers) in documentation when the wire protocol version changes moving forwards, and what the risk/benefit of changing to the new version is. Since a rolling upgrade of the intra-cluster protocol is supported, will a rolling downgrade work as well? Should a flaw (bug, security, or otherwise) be discovered after upgrade, is it possible to change the wire.protocol.version back to 0.8.2 and do a rolling bounce? On the host/port/protocol specification, specifically the ZK config format, is it possible to have an un-advertised endpoint? I would see this as potentially useful if you wanted to have an endpoint that you are reserving for intra-cluster communication, and you would prefer to not have it advertised at all. Perhaps it is blocked by a firewall rule or other authentication method. This could also allow you to duplicate a security protocol type but segregate it on a different port or interface (if it is unadvertised, there is no ambiguity to the clients as to which endpoint should be selected). I believe I asked about that previously, and I didn't track what the final outcome was or even if it was discussed further. -Todd On Wed, Feb 11, 2015 at 4:38 PM, Gwen Shapira gshap...@cloudera.com wrote: Added Jun's notes to the KIP (Thanks for explaining so clearly, Jun. I was clearly struggling with this...) and removed the reference to use.new.wire.protocol. On Wed, Feb 11, 2015 at 4:19 PM, Joel Koshy jjkosh...@gmail.com wrote: The description that Jun gave for (2) was the detail I was looking for - Gwen can you update the KIP with that for completeness/clarity? I'm +1 as well overall. However, I think it would be good if we also get an ack from someone who is more experienced on the operations side (say, Todd) to review especially the upgrade plan. On Wed, Feb 11, 2015 at 09:40:50AM -0800, Jun Rao wrote: +1 for proposed changes in 1 and 2. 1. The impact is that if someone uses SimpleConsumer and references Broker explicitly, the application needs code change to compile with 0.8.3. Since SimpleConsumer is not widely used, breaking the API in SimpleConsumer but maintaining overall code cleanness seems to be a better tradeoff. 2. For clarification, the issue is the following. In 0.8.3, we will be evolving the wire protocol of UpdateMedataRequest (to send info about endpoints for different security protocols). Since this is used in intra-cluster communication, we need to do the upgrade in two steps. The idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step 1, all brokers will be capable for processing the new protocol in 0.8.3, but without actually using it. In step 2, we configure wire.protocol.version to 0.8.3 in each broker and do another rolling restart. After step 2, all brokers will start using the new protocol in 0.8.3. Let's say that in the next release 0.9, we are changing the intra-cluster wire protocol again. We will do the similar thing: defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2 to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2 first and then do the two-step upgrade to 0.9. Gwen, In KIP2, there is still a reference to use.new.protocol. This needs to be removed. Also, would it be better to use intra.cluster.wire.protocol.version since this only applies to the wire protocol among brokers? Others, The patch in KAFKA-1809 is almost ready. It would be good to wrap up the discussion on KIP2 soon. So, if you haven't looked at this KIP, please take a look and send your comments. Thanks, Jun On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Kafka Devs, While reviewing the patch for KAFKA-1809, we came across two questions that we are interested in hearing the community out on. 1. This patch changes the Broker class and adds a new class BrokerEndPoint that behaves like the previous broker. While technically kafka.cluster.Broker is not part of the public API, it is returned by javaapi, used with the SimpleConsumer. Getting replicas from PartitionMetadata will now return BrokerEndPoint instead of Broker. All method calls remain the same, but since we return a new type, we break the API. Note that this breakage does not prevent upgrades - existing SimpleConsumers will continue working (because we are wire-compatible). The only thing that won't work is
Re: [SECURITY DISCUSSION] Refactoring Brokers to support multiple ports
Leaving aside the rest of this, on #1, while I consider being able to advertise the ports a good idea, I don't want to lose the ability for maintaining multiple ports with the same protocol. For example, being able to have 2 plaintext ports, one that only brokers communicate over, and one that general clients use. The ability to segregate this traffic is useful in a number of situations, over and above other controls like quotas, and is relatively easy to do once we support multiple ports. -Todd On Tue, Dec 2, 2014 at 1:58 PM, Jun Rao jun...@gmail.com wrote: Hi, Gwen, Thanks for writing up the wiki. Some comments below. 1. To make it more general, should we support a binding and an advertised host for each protocol (e.g. plaintext, ssl, etc)? We will also need to figure out how to specify the wildcard binding host. 2. Broker format change in ZK The broker registration in ZK needs to store the host/port for all protocols. We will need to bump up the version of the broker registration data. Since this is an intra-cluster protocol change, we need an extra config for rolling upgrades. So, in the first step, each broker is upgraded and is ready to parse brokers registered in the new format, but not registering using the new format yet. In the second step, when that new config is enabled, the broker will register using the new format. 3. Wire protocol changes. Currently, the broker info is used in the following requests/responses: TopicMetadataResponse , ConsumerMetadataResponse, LeaderAndIsrRequest and UpdateMetadataRequest. 3.1 TopicMetadataResponse and ConsumerMetadataResponse: These two are used between the clients and the broker. I am not sure that we need to make a wire protocol change for them. Currently, the protocol includes a single host/port pair in those responses. Based on the type of the port on which the request is sent, it seems that we can just pick the corresponding host and port to include in the response. 3.2 UpdateMetadataRequest: This is used between the controller and the broker. Since each broker needs to cache the host/port of all protocols, we need to make a wire protocol change. We also need to change the broker format in MetadataCache accordingly. This is also an intra-cluster protocol change. So the upgrade path will need to follow that in item 2. 3.3 LeaderAndIsrRequest: This is also used between the controller and the broker. The receiving broker uses the host/port of the leader replica to send the fetch request. I am not sure if we need a wire protocol change in this case. I was imagining that we will just add a new broker config, sth like replication.socket.protocol. Base on this config, the controller will pick the right host/port to include in the request. 4. Should we plan to support security just on the new java clients? Supporting security in both the old and the new clients adds more work and gives people less incentive to migrate off the old clients. Thanks, Jun On Tue, Nov 25, 2014 at 11:13 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Everyone, One of the pre-requisites we have for supporting multiple security protocols (SSL, Kerberos) is to support them on separate ports. This is done in KAFKA-1684 (The SSL Patch), but that patch addresses several different issues - Multiple ports, enriching the channels, SSL implementation - which makes it more challenging to review and to test. I'd like to split this into 3 separate patches: multi-port brokers, enriching SocketChannel, and the actual security implementations. Since even just adding support for multiple listeners per broker is somewhat involved and touches multiple components, I wrote a short design document that covers the necessary changes and the upgrade process: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers Comments are more than welcome :) If this is acceptable, hope to have a patch ready in few days. Gwen Shapira
Re: [SECURITY DISCUSSION] Refactoring Brokers to support multiple ports
Thanks. Just to add more detail as to why I think it's a good idea to be able to segregate traffic like that... One reason would just be to separate out the intra-cluster communication to a separate port. This can allow you to run it over a different interface (for example, you could have dedicated links for the brokers to do replication), though you could do that with a wildcard bind and multiple interfaces with a little care on the broker config. It also allows for firewalling off clients from a cluster while leaving the broker communication intact. We've run into this situation a couple times where it was advantageous to do this during recovery to prevent things like runaway file descriptor allocation. It also gives the ability to use QOS tools to work with networking guarantees for broker traffic. Maybe it's enough to be able to specify a special protocol name to do this. Meaning you could configure a port with protocol broker (or something like that) which could be used by the brokers if it exists. Otherwise they would default back to something else. -Todd -TOdd On Tue, Dec 2, 2014 at 3:23 PM, Gwen Shapira gshap...@cloudera.com wrote: The good news is that I'm not using a map to represent protocol list. The bad news is that as mentioned in the wiki: producers, consumers and broker configuration specify security protocol, so we'll know which host/port pair to use for communication. This implicitly assumes there will be only one host/port per protocol. I'll think a bit on how this assumption can be relaxed. Gwen On Tue, Dec 2, 2014 at 3:14 PM, Todd Palino tpal...@gmail.com wrote: Gwen - just my reading of what we could expect from what you had described so far. Without having gone into implementation details, there didn't seem to be anything that would block the ability to run two ports with the same protocol configuration, at least from the way you proposed to represent it in Zookeeper. I'd just like to not go down the path of using something like a map for representing the protocol list that would eliminate this possibility, unless there's a pretty good reason to do so. -Todd On Tue, Dec 2, 2014 at 3:00 PM, Gwen Shapira gshap...@cloudera.com wrote: Hey Todd, You say lose the ability - you mean this ability actually exist now? Or is this something you hope the new patch will support? Gwen On Tue, Dec 2, 2014 at 2:08 PM, Todd Palino tpal...@gmail.com wrote: Leaving aside the rest of this, on #1, while I consider being able to advertise the ports a good idea, I don't want to lose the ability for maintaining multiple ports with the same protocol. For example, being able to have 2 plaintext ports, one that only brokers communicate over, and one that general clients use. The ability to segregate this traffic is useful in a number of situations, over and above other controls like quotas, and is relatively easy to do once we support multiple ports. -Todd On Tue, Dec 2, 2014 at 1:58 PM, Jun Rao jun...@gmail.com wrote: Hi, Gwen, Thanks for writing up the wiki. Some comments below. 1. To make it more general, should we support a binding and an advertised host for each protocol (e.g. plaintext, ssl, etc)? We will also need to figure out how to specify the wildcard binding host. 2. Broker format change in ZK The broker registration in ZK needs to store the host/port for all protocols. We will need to bump up the version of the broker registration data. Since this is an intra-cluster protocol change, we need an extra config for rolling upgrades. So, in the first step, each broker is upgraded and is ready to parse brokers registered in the new format, but not registering using the new format yet. In the second step, when that new config is enabled, the broker will register using the new format. 3. Wire protocol changes. Currently, the broker info is used in the following requests/responses: TopicMetadataResponse , ConsumerMetadataResponse, LeaderAndIsrRequest and UpdateMetadataRequest. 3.1 TopicMetadataResponse and ConsumerMetadataResponse: These two are used between the clients and the broker. I am not sure that we need to make a wire protocol change for them. Currently, the protocol includes a single host/port pair in those responses. Based on the type of the port on which the request is sent, it seems that we can just pick the corresponding host and port to include in the response. 3.2 UpdateMetadataRequest: This is used between the controller and the broker. Since each broker needs to cache the host/port of all protocols, we need to make a wire protocol change. We also need to change the broker format in MetadataCache accordingly. This is also an intra-cluster protocol change. So the upgrade path will need to follow that in item 2. 3.3 LeaderAndIsrRequest: This is also used
Re: [SECURITY DISCUSSION] Refactoring Brokers to support multiple ports
I don't think it necessarily has to be. I was thinking about that while I was typing it out, and I realized that as well. With a special broker port, my biggest concern is making sure that nothing other than a broker uses it (and so cannot bypass security controls like authentication and authorization). Outside of that assurance, I don't see a reason to add overhead like TLS to the broker communication. Someone else might have one, though. -Todd On Tue, Dec 2, 2014 at 4:12 PM, Jun Rao j...@confluent.io wrote: Todd, Does that imply the intra-broker protocol is always plaintext? Thanks, Jun On Tue, Dec 2, 2014 at 3:31 PM, Todd Palino tpal...@gmail.com wrote: Thanks. Just to add more detail as to why I think it's a good idea to be able to segregate traffic like that... One reason would just be to separate out the intra-cluster communication to a separate port. This can allow you to run it over a different interface (for example, you could have dedicated links for the brokers to do replication), though you could do that with a wildcard bind and multiple interfaces with a little care on the broker config. It also allows for firewalling off clients from a cluster while leaving the broker communication intact. We've run into this situation a couple times where it was advantageous to do this during recovery to prevent things like runaway file descriptor allocation. It also gives the ability to use QOS tools to work with networking guarantees for broker traffic. Maybe it's enough to be able to specify a special protocol name to do this. Meaning you could configure a port with protocol broker (or something like that) which could be used by the brokers if it exists. Otherwise they would default back to something else. -Todd -TOdd On Tue, Dec 2, 2014 at 3:23 PM, Gwen Shapira gshap...@cloudera.com wrote: The good news is that I'm not using a map to represent protocol list. The bad news is that as mentioned in the wiki: producers, consumers and broker configuration specify security protocol, so we'll know which host/port pair to use for communication. This implicitly assumes there will be only one host/port per protocol. I'll think a bit on how this assumption can be relaxed. Gwen On Tue, Dec 2, 2014 at 3:14 PM, Todd Palino tpal...@gmail.com wrote: Gwen - just my reading of what we could expect from what you had described so far. Without having gone into implementation details, there didn't seem to be anything that would block the ability to run two ports with the same protocol configuration, at least from the way you proposed to represent it in Zookeeper. I'd just like to not go down the path of using something like a map for representing the protocol list that would eliminate this possibility, unless there's a pretty good reason to do so. -Todd On Tue, Dec 2, 2014 at 3:00 PM, Gwen Shapira gshap...@cloudera.com wrote: Hey Todd, You say lose the ability - you mean this ability actually exist now? Or is this something you hope the new patch will support? Gwen On Tue, Dec 2, 2014 at 2:08 PM, Todd Palino tpal...@gmail.com wrote: Leaving aside the rest of this, on #1, while I consider being able to advertise the ports a good idea, I don't want to lose the ability for maintaining multiple ports with the same protocol. For example, being able to have 2 plaintext ports, one that only brokers communicate over, and one that general clients use. The ability to segregate this traffic is useful in a number of situations, over and above other controls like quotas, and is relatively easy to do once we support multiple ports. -Todd On Tue, Dec 2, 2014 at 1:58 PM, Jun Rao jun...@gmail.com wrote: Hi, Gwen, Thanks for writing up the wiki. Some comments below. 1. To make it more general, should we support a binding and an advertised host for each protocol (e.g. plaintext, ssl, etc)? We will also need to figure out how to specify the wildcard binding host. 2. Broker format change in ZK The broker registration in ZK needs to store the host/port for all protocols. We will need to bump up the version of the broker registration data. Since this is an intra-cluster protocol change, we need an extra config for rolling upgrades. So, in the first step, each broker is upgraded and is ready to parse brokers registered in the new format, but not registering using the new format yet. In the second step, when that new config is enabled, the broker will register using the new format. 3. Wire protocol changes. Currently, the broker info is used in the following requests/responses: TopicMetadataResponse , ConsumerMetadataResponse
Re: Security JIRAS
For the moment, consumers still need to write under the /consumers tree. Even if they are committing offsets to Kafka instead of ZK, they will need to write owner information there when they are balancing. Eventually, you are correct, this is going away with the new consumer. -Todd On Fri, Oct 17, 2014 at 10:09 AM, Arvind Mani am...@linkedin.com.invalid wrote: I'm looking at Kafka Brokers authentication with ZooKeeper since this looks independent of other tasks. [AM] 1) Is authentication required only between kafka broker and zookeeper? Can we assume world read so that consumers don't have to be authenticated (I believe in any case kafka is planning to change in such that consumers don't have to interact with zk)? In this case I assume kafka broker can I think easily create the znode with appropriate acl list - broker can be admin. 2) Zookeeper supports Kerberos authentication. Zookeeper supports SSL connections (version 3.4 or later) but I don't see an x509 authentication provider. Do we want to support x509 cert based authentication for zk? - Arvind