Hi all,
Not sure if there is further concern with this KIP. And the vote for
KIP-232 is also blocked on the discussion of this KIP. I am happy to
continue the discussion and move forward the progress of this KIP.
Thanks,
Dong
On Tue, Apr 17, 2018 at 8:58 PM, Jeff Chao wrote:
> Hi Dong, I took
Hi Dong, I took a look at KIP-287. Producers writing using the new scheme
prior to processor catch up and cut-over makes sense. Thanks.
On Sat, Apr 14, 2018 at 7:09 PM, Dong Lin wrote:
> Hey Jeff,
>
> Thanks for the review. The scheme for expanding processors of the
Hey Jeff,
Thanks for the review. The scheme for expanding processors of the stateful
processing job is described in "Support processor expansion" section in
KIP-287 (link
Hi, I had a question from Monday's meeting. The current mechanism, as Ray's
notes points out, is to create a copy consumer in which a switch over
happens when it catches up. Meanwhile, it looks like a producer would still
be writing using the old partitioning scheme. Wouldn't there be a case
where
Thanks for the notes by Jun and Ray. I have read through the notes. It
seems that there are a few questions for the alternative solution by Jan
and maybe Jan will answer these questions later?
I have summarized the solution, which I previously provided in this thread,
in KIP-287, to hopefully
My notes from today's meeting. Sorry if I got anyone's name wrong. Plus
I missed a few moments with noise at home and/or dropped video.
-Ray
=
KIP-253 Discussion
- Currently, adding partitions can cause keys to be read out-of-order.
This KIP is trying to preserve the key ordering when
Hi Jun, please add me to the invitation as well. If this is happening
near Palo Alto, let me know if I can join in person. Thanks.
-Ray
On 4/4/18 1:34 PM, Jun Rao wrote:
Hi, Jan, Dong, John, Guozhang,
Perhaps it will be useful to have a KIP meeting to discuss this together as
a group. Would
Hey John,
Thanks much for your super-detailed explanation. This is very helpful.
Now that I have finished reading through your email, I think the proposed
solution in my previous email probably meets the requirement #6 without
requiring additional coordination (w.r.t. partition function) among
Yes I can be there.
On 04.04.2018 22:34, Jun Rao wrote:
Hi, Jan, Dong, John, Guozhang,
Perhaps it will be useful to have a KIP meeting to discuss this together as
a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out
an invite to the mailing list.
Thanks,
Jun
On Wed,
Thanks Jun, That time also works for me.
-john
On Wed, Apr 4, 2018 at 6:28 PM, Guozhang Wang wrote:
> @Jun, yeah that works for me too.
>
> @Jan, just to clarify on my previous email: assuming we do the reshuffling
> out of the user input topics, Streams has the advantage
@Jun, yeah that works for me too.
@Jan, just to clarify on my previous email: assuming we do the reshuffling
out of the user input topics, Streams has the advantage that the
repartition topic is purely owned by itself: the only producer writing to
this repartition topic is Streams itself; so then
Thanks Jun! The time works for me.
On Thu, 5 Apr 2018 at 4:34 AM Jun Rao wrote:
> Hi, Jan, Dong, John, Guozhang,
>
> Perhaps it will be useful to have a KIP meeting to discuss this together as
> a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out
> an
Hi, Jan, Dong, John, Guozhang,
Perhaps it will be useful to have a KIP meeting to discuss this together as
a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out
an invite to the mailing list.
Thanks,
Jun
On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak
Want to quickly step in here again because it is going places again.
The last part of the discussion is just a pain to read and completely
diverged from what I suggested without making the reasons clear to me.
I don't know why this happens here are my comments anyway.
@Guozhang: That
Hi Dong,
Thanks for the response. I think I'm broadly in agreement with your
statement that with KIP-253 implemented as written, KStreams would be able
to use partition expansion on its various internal topics and processor
threads.
As you say, the reason we can do this is that for internal
Hey John,
Thanks much for your comments!!
I have yet to go through the emails of John/Jun/Guozhang in detail. But let
me present my idea for how to minimize the delay for state loading for
stream use-case.
For ease of understanding, let's assume that the initial partition number
of input topics
Hey guys, just sharing my two cents here (I promise it will be shorter than
John's article :).
0. Just to quickly recap, the main discussion point now is how to support
"key partitioning preservation" (John's #4 in topic characteristics above)
beyond the "single-key ordering preservation" that
Hi, John,
Thanks for the reply. It summarizes the tradeoffs well. At the high level,
there are two general approaches for KStreams. (a) KStreams completely
decouples from the input partitions by over-partitioning. Inside KStreams,
partition expansion is never needed. (b) Over-partitioning is not
Hey Dong,
Congrats on becoming a committer!!!
Since I just sent a novel-length email, I'll try and keep this one brief ;)
Regarding producer coordination, I'll grant that in that case, producers
may coordinate among themselves to produce into the same topic or to
produce co-partitioned topics.
Hi Jun,
Thanks for the response. I'm very new to this project, but I will share my
perspective. I'm going to say a bunch of stuff that I know you know
already, but just so we're on the same page...
This may also be a good time to get feedback from the other KStreams folks.
Using KStreams as a
Hi Dong.
I do disagree here. I think we are fooling ourselfs!
This KIP lays the foundation for the wrong path and
I am heavily against it. We should have state full producers in mind at this
very moment.
One can clearly see that you are on a wrong path when you talk about taking
away custom
Hi, John,
I actually think it's important to think through how KStreams handles
partition expansion in this KIP. If we do decide that we truly need
backfilling, it's much better to think through how to add it now, instead
of retrofitting it later. It would be useful to outline how both existing
Hey John,
Great! Thanks for all the comment. It seems that we agree that the current
KIP is in good shape for core Kafka. IMO, what we have been discussing in
the recent email exchanges is mostly about the second step, i.e. how to
address problem for the stream use-case (or stateful processing in
Thank you so much John Roesler! <3
Thank you for also seeing the core strengths of apache kafka!
We just cannot make sacrifices like this to the architecture for the
benefits of streams.
Streams somehow got lost in the whole "Interactive Query" idea that is
spreading
like cancer across the
Hi Jun,
That's a good point.
Yeah, I don't think it would work too well for existing consumers in the
middle of gen 0 to try and switch to a newly backfilled prefix of gen 1.
They probably just need to finish up until they get to the end of gen 0 and
transition just as if there were no backfill
Thanks for the response, Dong.
Here are my answers to your questions:
- "Asking producers and consumers, or even two different producers, to
> share code like the partition function is a pretty huge ask. What if they
> are using different languages?". It seems that today we already require
>
Hi, John,
Thanks for the reply. I agree that the backfill approach works cleaner for
newly started consumers. I am just not sure if it's a good primitive to
support for existing consumers. One of the challenges that I see is the
remapping of the offsets. In your approach, we need to copy the
BTW, here is my understanding of the scope of this KIP. We want to allow
consumers to always consume messages with the same key from the same
producer in the order they are produced. And we need to provide a way for
stream use-case to be able to flush/load state when messages with the same
key are
Hey John,
Thanks much for the detailed comments. Here are my thoughts:
- The need to delete messages from log compacted topics is mainly for
performance (e.g. storage space) optimization than for correctness for this
KIP. I agree that we probably don't need to focus on this in our discussion
Hey Dong and Jun,
Thanks for the thoughtful responses. If you don't mind, I'll mix my replies
together to try for a coherent response. I'm not too familiar with
mailing-list etiquette, though.
I'm going to keep numbering my points because it makes it easy for you all
to respond.
Point 1:
As I
On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin wrote:
> Hey Jan,
>
> Thanks for the enthusiasm in improving Kafka's design. Now that I have
> read through your discussion with Jun, here are my thoughts:
>
> - The latest proposal should with log compacted topics by properly
>
Hey Jan,
Thanks for the enthusiasm in improving Kafka's design. Now that I have read
through your discussion with Jun, here are my thoughts:
- The latest proposal should with log compacted topics by properly deleting
old messages after a new message with the same key is produced. So it is
Hey John,
Thanks much for the detailed comment. Here is my thought:
Regarding thought 1: I agree. The current proposal seems to have addressed
core problem. It has also provided the extra interface to support the
stream use-case. I have yet to come up with the solution to do the 2-phase
Hi, John,
Thanks for the comments.
For your thought 1, I agree that it's better to decouple the design of core
and Streams with respect to re-partitioning. From the state management
perspective, a streaming application can be (1) stateless, (2) stateful and
maintaining the states in a global
Hi, Jan,
Thanks for the reply. A few more comments inlined below.
On Fri, Mar 23, 2018 at 7:15 AM, Jan Filipiak
wrote:
>
>> I agree that decoupling the number of tasks in a consumer group from the
>> number of partitions in the input topic is a good idea. This allows
Hi all,
I hope you don't mind if I throw some thoughts in...
For some reason, Jan's proposal wasn't really clear to me until this last
message, and now that (I think) I understand it, I actually like it quite a
lot.
Thought 1:
As much as I like Streams, I don't think it should become a
Hi Jun,
thanks for your great reply, even though I only managed to throw a few
sentences in, because of time last time. I think you are starting to
understand me pretty well. There is still some minor things that need to
be flattened out. I already had these points previously but I try to
Hi, Jan,
Thanks for the reply.
Ok. So your idea is to have a special consumer reshuffle the data to a new
set of partitions. Each consumer group will not increase its tasks
immediately. We just need to make sure that each consumer instance is
assigned the set of new partitions covering the same
Hi Jun,
I was really seeing progress in our conversation but your latest reply
is just devastating.
I though we were getting close being on the same page now it feels like
we are in different libraries.
I just quickly slam my answers in here. If they are to brief I am sorry
give me a ping
Hi, Jan,
Thanks for the discussion. Great points.
Let me try to summarize the approach that you are proposing. On the broker
side, we reshuffle the existing data in a topic from current partitions to
the new partitions. Once the reshuffle fully catches up, switch the
consumers to start consuming
Hi Jun,
thank you for following me on these thoughts. It was important to me to
feel that kind of understanding for my arguments.
What I was hoping for (I mentioned this earlier) is that we can model
the case where we do not want to copy the data the exact same way as the
case when we do
Hi, Jan,
Thanks for sharing your view.
I agree with you that recopying the data potentially makes the state
management easier since the consumer can just rebuild its state from
scratch (i.e., no need for state reshuffling).
On the flip slide, I saw a few disadvantages of the approach that you
Hi Jun,
I will focus on point 61 as I think its _the_ fundamental part that I
cant get across at the moment.
Kafka is the platform to have state materialized multiple times from one
input. I emphasize this: It is the building block in architectures that
allow you to
have your state
Hi, Jan,
Thanks for the reply. A few more comments below.
50. Ok, we can think a bit harder for supporting compacted topics.
51. This is a fundamental design question. In the more common case, the
reason why someone wants to increase the number of partitions is that the
consumer application is
angerous.
So in your case we go from
part1: A B C D
part2: E F G H
to
part1: A C
part2: B D
part3: E F
part4: G H
From: Matthias J. Sax <matth...@confluent.io>
Sent: 09 March 2018 07:53
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-253: Support
still get 67% of that load which is still way too
> dangerous.
> > >
> > >
> > > So in your case we go from
> > >
> > > part1: A B C D
> > >
> > > part2: E F G H
> > >
> > >
> > > to
> > >
> >
>
> > part4: G H
> >
> >
> >
> > From: Matthias J. Sax <matth...@confluent.io>
> > Sent: 09 March 2018 07:53
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-253: Support in-order message de
Hi Jun, thanks for your mail.
Thank you for your questions!
I think they are really good and tackle the core of the problem I see.
I will answer inline, mostly but still want to set the tone here.
The core strength of kafka is what Martin once called the
kappa-Architecture. How does this
t;
>
> part1: A C
>
> part2: B D
>
> part3: E F
>
> part4: G H
>
>
>
> From: Matthias J. Sax <matth...@confluent.io>
> Sent: 09 March 2018 07:53
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-253: Support
March 2018 07:53
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-253: Support in-order message delivery with
partition expansion
@Jan: You suggest to copy the data from one topic to a new topic, and
provide an "offset mapping" from the old to the new topic for the
consumer
@Jan: You suggest to copy the data from one topic to a new topic, and
provide an "offset mapping" from the old to the new topic for the
consumers. I don't quite understand how this would work.
Let's say there are 2 partitions in the original topic and 3 partitions
in the new topic. If we assume
As I just mentioned joins:
For Kafka Streams it might also be required to change the partition
count for multiple topics in a coordinated way that allows to maintain
the co-partitioning property that Kafka Streams uses to computed joins.
Any thoughts how this could be handled?
-Matthias
On
Jun,
There is one more case: non-windowed aggregations. For windowed
aggregation, the changelog topic will be compact+delete. However, for
non-windowed aggregation the policy is compact only.
Even if we assume that windowed aggregations are dominant and
non-windowed aggregation are used rarely,
Hi, Matthis,
My understanding is that in KStream, the only case when a changelog topic
needs to be compacted is when the corresponding input is a KTable. In all
other cases, the changelog topics are of compacted + deletion. So, if most
KTables are not high volume, there may not be a need to
Hi, Jan,
Thanks for the feedback. Just some comments on the earlier points that you
mentioned.
50. You brought up the question of whether existing data needs to be copied
during partition expansion. My understand of your view is that avoid
copying existing data will be more efficient, but it
Hey Jan,
Sorry for the frustration. I haven't finished replying to all comments. For
example in my last email it is mentioned that "I will reply after I finish
reading the documentation and code". It takes time to think through
comments thoroughly. I have been busy with my daily work and haven't
Hi Dong,
are you actually reading my emails, or are you just using the thread I
started for general announcements regarding the KIP?
I tried to argue really hard against linear hashing. Growing the topic
by an integer factor does not require any state redistribution at all. I
fail to see
Jun,
thanks for your comment. This should actually work for Streams, because
we don't rely on producer "hashing" but specify the partition number
explicitly on send().
About not allowing to change the number of partition for changelog
topics: for Streams, this seems to imply that we need to
Hey Jason,
I agree. Even apart from this proposal the partitioning strategy is really
an essential part of the metadata for a topic and had we been less lazy we
probably would have included it with the topic metadata.
I think in terms of grandfathering this in you could have existing topics
just
If we want to maintain ordering than for a pluggable producer-side
partitioner, it needs to coherent with the linear hashing scheme, i.e. we
effectively restrict what kind of customization users can do. However such
restrictions are hard to enforce programmatically unless we change the API,
so
Hi Dong,
What is not clear to me is how the use of linear hashing affects the
partitioning logic in the producer, which the user is currently allowed to
customize through the Partitioner interface. It sounds like we are
effectively deprecating that interface since we will only provide ordering
Hi, Mattias,
Regarding your comment "If it would be time-delay based, it might be
problematic
for Kafka Streams: if we get the information that the new input partitions
are available for producing, we need to enable the new changelog partitions
for producing, too. If those would not be available
Hi everyone,
Thanks for all the comments! It appears that everyone prefers linear
hashing because it reduces the amount of state that needs to be moved
between consumers (for stream processing). The KIP has been updated to use
linear hashing.
Regarding the migration endeavor: it seems that
Great discussion. I think I'm wondering whether we can continue to leave
Kafka agnostic to the partitioning strategy. The challenge is communicating
the partitioning logic from producers to consumers so that the dependencies
between each epoch can be determined. For the sake of discussion, imagine
Hi Dong
thank you very much for your questions.
regarding the time spend copying data across:
It is correct that copying data from a topic with one partition mapping
to a topic with a different partition mapping takes way longer than we
can stop producers. Tens of minutes is a very optimistic
Hello Dong, thanks a lot for proposing the KIP!
I'm in favor of partition splitting, or more generally consistent hashing
mechanism over partition rehashing for the reasons that Matthias as
summarized well. Another point is that for the downstream stateful
consumers that need to reshard their
About partition splitting and load balancing:
I think there are two cases:
1) one partition is a hot-spot: for this case, splitting a partitions is
actually more desirable than adding a new partitions as it ensure the
the load of the hot partitions is reduced. Adding partitions does not
help
Hey Jan,
In the current proposal, the consumer will be blocked on waiting for other
consumers of the group to consume up to a given offset. In most cases, all
consumers should be close to the LEO of the partitions when the partition
expansion happens. Thus the time waiting should not be long e.g.
Hey Jay, Allen, Jason, Jun,
Thanks much for the valuable feedback. It is very good point that we can
reduce the state that needs to be stored/loaded by using linear hashing. I
have updated the KIP to use linear hashing as suggested. The benefits of
using linear hashing is summarized in "Changes
linear hashing (or partition splitting) vs rehashing every key: The benefit
of the former is that it reduces the # of partitions to which keys from an
existing partition are re-distributed, which potentially reduces the
overhead of rebuilding the state in a consumer. The downside is that the
load
Hey Dong,
Cool, obviously we'd need to have a solution here work with connect and
streams to be viable.
On the linear hashing thing, what I am talking about is something
different. I am talking about splitting existing partitions incrementally.
E.g. if you have 100 partitions and want to move to
Hey Jason,
Thanks much for all the valuable feedback!
On Wed, Feb 28, 2018 at 11:09 AM, Jason Gustafson
wrote:
> Hi Dong,
>
> Great work on this proposal! Just a couple initial comments:
>
> My understanding is that the consumer will block on a topic until the all
>
Hi Dong,
On Tue, Feb 27, 2018 at 10:07 PM, Dong Lin wrote:
> Hey Allen,
>
> Thanks for the comments.
>
> On Mon, Feb 26, 2018 at 9:27 PM, Allen Wang
> wrote:
>
> > Hi Dong,
> >
> > Please see my comments inline.
> >
> > Thanks,
> > Allen
> >
>
Hi Dong,
I tried to focus on what the steps are one can currently perform to
expand or shrink a keyed topic while maintaining a top notch semantics.
I can understand that there might be confusion about "stopping the
consumer". It is exactly the same as proposed in the KIP. there needs to be
a
Hi Dong,
Great work on this proposal! Just a couple initial comments:
My understanding is that the consumer will block on a topic until the all
partitions have reached a certain partition epoch. What are the
implications if a partition is offline? If we observe an epoch change while
a partition
Sounds awesome !
Are you planning to have auto scaling of partitions in a following KIP ?
That would be the holy grail
On 28 Feb. 2018 5:13 pm, "Dong Lin" wrote:
> Hey Jan,
>
> I am not sure if it is acceptable for producer to be stopped for a while,
> particularly for
Hey Jan,
I am not sure if it is acceptable for producer to be stopped for a while,
particularly for online application which requires low latency. I am also
not sure how consumers can switch to a new topic. Does user application
needs to explicitly specify a different topic for producer/consumer
Hey Allen,
Thanks for the comments.
On Mon, Feb 26, 2018 at 9:27 PM, Allen Wang
wrote:
> Hi Dong,
>
> Please see my comments inline.
>
> Thanks,
> Allen
>
> On Sun, Feb 25, 2018 at 3:33 PM, Dong Lin wrote:
>
> > Hey Allen,
> >
> > Thanks for
Hi Dong,
Please see my comments inline.
Thanks,
Allen
On Sun, Feb 25, 2018 at 3:33 PM, Dong Lin wrote:
> Hey Allen,
>
> Thanks for your comment. I will comment inline.
>
> On Thu, Feb 22, 2018 at 3:05 PM, Allen Wang
> wrote:
>
> > Overall this
Hi,
just want to throw my though in. In general the functionality is very
usefull, we should though not try to find the architecture to hard while
implementing.
The manual steps would be to
create a new topic
the mirrormake from the new old topic to the new topic
wait for mirror making to
Hey Matthias,
Thanks for the comments.
I think when we compact topics, we only delete those messages (when there
is later message with the same key) and we do not change offset of a given
message. As long as offsets of existing messages are not changes, I think
the KIP should still work in the
Hey Jay,
Thanks for the comment!
I have not specifically thought about how this works with Streams and
Connect. The current KIP w.r.t. the interface that our producer and
consumer exposes to the user. It ensures that if there are two messages
with the same key produced by the same producer, say
Hey Allen,
Thanks for your comment. I will comment inline.
On Thu, Feb 22, 2018 at 3:05 PM, Allen Wang
wrote:
> Overall this is a very useful feature. With this we can finally scale keyed
> messages.
>
> +1 on the ability to remove partitions. This will greatly
Hey Jun,
Yeah I think this definitely makes sense. I have updated the KIP to support
partition deletion and also added callback as you previously suggested. Can
you take another look?
Thanks!
Dong
On Thu, Feb 22, 2018 at 11:52 AM, Jun Rao wrote:
> Hi, Dong,
>
> Regarding
Hey Gwen,
Thanks for the use-case. This is certainly useful. I have updated the KIP
to allow partition deletion as well.
Dong
On Wed, Feb 21, 2018 at 9:49 PM, Gwen Shapira wrote:
> Re: Why would anyone want to delete partitions:
>
> Imagine a system that is built to handle
Hey all,
Thanks much for all the comments. I have a rough idea of how to shrink
partitions of a topic. I need some time to think through and write down the
details of the procedure. Then I will reply to your comments.
Thanks,
Dong
On Thu, Feb 22, 2018 at 7:18 PM, Matthias J. Sax
One more thought:
What about older Producer/Consumers? They don't understand the new
protocol. How can we guarantee backward compatibility?
Or would this "only" imply, that there is no ordering guarantee for
older clients?
-Matthias
On 2/22/18 6:24 PM, Matthias J. Sax wrote:
> Dong,
>
>
Dong,
thanks a lot for the KIP!
Can you elaborate how this would work for compacted topics? If it does
not work for compacted topics, I think Streams API cannot allow to scale
input topics.
This question seems to be particularly interesting for deleting
partitions: assume that a key is never
Hey Dong,
Two questions:
1. How will this work with Streams and Connect?
2. How does this compare to a solution where we physically split partitions
using a linear hashing approach (the partition number is equivalent to the
hash bucket in a hash table)?
Overall this is a very useful feature. With this we can finally scale keyed
messages.
+1 on the ability to remove partitions. This will greatly increase Kafka's
scalability in cloud.
For example, when there is traffic increase, we can add brokers and assign
new partitions to the new brokers.
Hi, Dong,
Regarding deleting partitions, Gwen's point is right on. In some of the
usage of Kafka, the traffic can be bursty. When the traffic goes up, adding
partitions is a quick way of shifting some traffic to the newly added
brokers. Once the traffic goes down, the newly added brokers will be
Re: Why would anyone want to delete partitions:
Imagine a system that is built to handle 200K events per sec most of the
time, but they have 2-4 days per year where traffic spikes to 11M events
per sec. We'll almost certainly need more partitions (and more consumers)
during the spike to process
Hi, Dong,
Thanks for the KIP. At the high level, this makes sense. A few comments
below.
1. It would be useful to support removing partitions as well. The general
idea could be bumping the leader epoch for the remaining partitions. For
the partitions to be removed, we can make them read-only and
Hey Jun,
Thanks much for your comments.
On Wed, Feb 21, 2018 at 10:17 AM, Jun Rao wrote:
> Hi, Dong,
>
> Thanks for the KIP. At the high level, this makes sense. A few comments
> below.
>
> 1. It would be useful to support removing partitions as well. The general
> idea
Hi all,
I have created KIP-253: Support in-order message delivery with partition
expansion. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%3A+Support+in-order+message+delivery+with+partition+expansion
.
This KIP provides a way to allow messages of the same key from the same
95 matches
Mail list logo