Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1315

2022-10-21 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-21 Thread Lucas Brutschy
Hi all,

thanks, Sophie, this makes sense. I suppose then the way to help the user
not apply this in the wrong setting is having good documentation and a one
or two examples of good use cases.

I think Colt's time-based partitioning is a good example of how to use
this. It actually doesn't have to be time, the same will work with any
monotonically increasing identifier. I.e. the new partitions will only get
records for users with a "large" user ID greater than some user ID
threshold hardcoded in the static partitioner. At least in this restricted
use-case, lookups by user ID would still be possible.

Cheers,
Lucas

On Fri, Oct 21, 2022 at 5:37 PM Colt McNealy  wrote:

> Sophie,
>
> Regarding item "3" (my last paragraph from the previous email), perhaps I
> should give a more general example now that I've had more time to clarify
> my thoughts:
>
> In some stateful applications, certain keys have to be findable without any
> information about when the relevant data was created. For example, if I'm
> running a word-count app and I want to use Interactive Queries to find the
> count for "foo", I would need to know whether "foo" first arrived before or
> after time T before I could find the correct partition to look up the data.
> In this case, I don't think static partitioning is possible. Is this
> use-case a non-goal of the KIP, or am I missing something?
>
> Colt McNealy
> *Founder, LittleHorse.io*
>
>
> On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman
>  wrote:
>
> > Thanks for the responses guys! I'll get the easy stuff out of the way
> > first:
> >
> > 1) Fixed the KIP so that StaticStreamPartitioner extends
> StreamPartitioner
> > 2) I totally agree with you Colt, the record value might have valuable
> (no
> > pun) information
> > in it that is needed to compute the partition without breaking the static
> > constraint. As in my
> > own example earlier, maybe the userId is a field in the value and not the
> > key itself. Actually
> > it was that exact thought that made me do a U-turn on this but I forgot
> to
> > update the thread
> > 3) Colt, I'm not  sure I follow what you're trying to say in that last
> > paragraph, can you expand?
> > 4) Lucas, it's a good question as to what kind of guard-rails we could
> put
> > up to enforce or even
> > detect a violation of static partitioning. Most likely Streams would need
> > to track every key to
> > partition mapping in an internal state store, but we have no guarantee
> the
> > key space is bounded
> > and the store wouldn't grow out of control. Mostly however I imagine
> users
> > would be frustrated
> > to find out there's a secret, extra state store taking up space when you
> > enable autoscaling, and
> > it's not even to provide functionality but just to make sure users aren't
> > doing something wrong.
> >
> > I wish I had a better idea, but sadly I think the only practical solution
> > here is to try and make this
> > condition as clear and obvious and easy to understand as possible,
> perhaps
> > by providing an
> > example of what does and does not satisfy the constraint in the javadocs.
> > I'll work on that
> > 5) I covered a bit above the impracticality of storing a potentially
> > unbounded keyspace, which
> > as you mention would need to be shared by all partitioners as well, so I
> > would agree that this
> > feels insurmountable. I'm leaning towards only enabling this feature for
> > the static partitioning
> > case at least in the first iteration, and we can see how things go from
> > there -- for example, are
> > people generally able to implement it correctly? If we find that the
> > feature is working well and
> > users are hungry for more, then it would be relatively straightforward to
> > open things up to
> > stateless applications, or even stateful applications which can withstand
> > some "blips" in the
> > logic/correctness.
> >
> > That said, *technically* the feature would be able to be turned on for
> any
> > such case as it is, since
> > as discussed above it's difficult to place true guardrails around the
> > feature that can enforce
> > static partitioning. Perhaps we could put a short note in the
> > StaticStreamPartitioner docs that
> > explain how and when it's safe to break the static requirement, but that
> we
> > recommend against
> > doing so..
> >
> > Thoughts?
> >
> > -Sophie
> >
> > On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy 
> wrote:
> >
> > > Sophie,
> > >
> > > Thank you for your detailed response. That makes sense (one partition
> per
> > > user seems like a lot of extra metadata if you've got millions of
> users,
> > > but I'm guessing that was just for illustrative purposes).
> > >
> > > In this case I'd like to question one small detail in your kip. The
> > > StaticPartitioner takes in just the key and not the value...in an
> > > application I've been working on, the "value" is a long-lived entity
> > > (spanning hundreds of records over several days) that has timestamp
> > > information about the 

Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-21 Thread Colt McNealy
Sophie,

Regarding item "3" (my last paragraph from the previous email), perhaps I
should give a more general example now that I've had more time to clarify
my thoughts:

In some stateful applications, certain keys have to be findable without any
information about when the relevant data was created. For example, if I'm
running a word-count app and I want to use Interactive Queries to find the
count for "foo", I would need to know whether "foo" first arrived before or
after time T before I could find the correct partition to look up the data.
In this case, I don't think static partitioning is possible. Is this
use-case a non-goal of the KIP, or am I missing something?

Colt McNealy
*Founder, LittleHorse.io*


On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman
 wrote:

> Thanks for the responses guys! I'll get the easy stuff out of the way
> first:
>
> 1) Fixed the KIP so that StaticStreamPartitioner extends StreamPartitioner
> 2) I totally agree with you Colt, the record value might have valuable (no
> pun) information
> in it that is needed to compute the partition without breaking the static
> constraint. As in my
> own example earlier, maybe the userId is a field in the value and not the
> key itself. Actually
> it was that exact thought that made me do a U-turn on this but I forgot to
> update the thread
> 3) Colt, I'm not  sure I follow what you're trying to say in that last
> paragraph, can you expand?
> 4) Lucas, it's a good question as to what kind of guard-rails we could put
> up to enforce or even
> detect a violation of static partitioning. Most likely Streams would need
> to track every key to
> partition mapping in an internal state store, but we have no guarantee the
> key space is bounded
> and the store wouldn't grow out of control. Mostly however I imagine users
> would be frustrated
> to find out there's a secret, extra state store taking up space when you
> enable autoscaling, and
> it's not even to provide functionality but just to make sure users aren't
> doing something wrong.
>
> I wish I had a better idea, but sadly I think the only practical solution
> here is to try and make this
> condition as clear and obvious and easy to understand as possible, perhaps
> by providing an
> example of what does and does not satisfy the constraint in the javadocs.
> I'll work on that
> 5) I covered a bit above the impracticality of storing a potentially
> unbounded keyspace, which
> as you mention would need to be shared by all partitioners as well, so I
> would agree that this
> feels insurmountable. I'm leaning towards only enabling this feature for
> the static partitioning
> case at least in the first iteration, and we can see how things go from
> there -- for example, are
> people generally able to implement it correctly? If we find that the
> feature is working well and
> users are hungry for more, then it would be relatively straightforward to
> open things up to
> stateless applications, or even stateful applications which can withstand
> some "blips" in the
> logic/correctness.
>
> That said, *technically* the feature would be able to be turned on for any
> such case as it is, since
> as discussed above it's difficult to place true guardrails around the
> feature that can enforce
> static partitioning. Perhaps we could put a short note in the
> StaticStreamPartitioner docs that
> explain how and when it's safe to break the static requirement, but that we
> recommend against
> doing so..
>
> Thoughts?
>
> -Sophie
>
> On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy  wrote:
>
> > Sophie,
> >
> > Thank you for your detailed response. That makes sense (one partition per
> > user seems like a lot of extra metadata if you've got millions of users,
> > but I'm guessing that was just for illustrative purposes).
> >
> > In this case I'd like to question one small detail in your kip. The
> > StaticPartitioner takes in just the key and not the value...in an
> > application I've been working on, the "value" is a long-lived entity
> > (spanning hundreds of records over several days) that has timestamp
> > information about the creation of the entity inside of it. The ID itself
> is
> > provided by the end-user of the system and as such isn't guaranteed to
> have
> > timestamp info.
> >
> > This is quite a corner case, but if the StaticStreamPartitioner interface
> > were allowed to peak at the record value, it would be trivial to
> implement
> > logic as follows:
> > ```
> > entity = deserialize(record.value())
> >
> > if entity.created_before(T):
> >   return hash(key) % old_partitions
> > else:
> >   return hash(key) % new_partitions
> > ```
> >
> > That said, you're a rockstar architect and have seen a lot more system
> > design than I have (I'm 23 and only 3 years out of school...you
> implemented
> > cooperative rebalancing ). So don't make that decision unless you can
> see
> > other use-cases where it is appropriate.
> >
> > Additionally, for my own use-case I'm not sure if static partitioning
> 

Re: [VOTE] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-10-21 Thread ShunKang Lin
Hi everyone,

Thank you for the vote. I've got three +1 votes (Guozhang, Luke, Chris),
can this vote be concluded?

Best,
ShunKang

Chris Egerton  于2022年10月12日周三 23:17写道:

> +1 (binding)
> Thanks ShunKang!
>
> On Tue, Oct 11, 2022 at 9:26 PM Luke Chen  wrote:
>
> > +1 from me.
> > Thanks for the KIP.
> >
> > Luke
> >
> > On Fri, Sep 23, 2022 at 1:50 AM Guozhang Wang 
> wrote:
> >
> > > +1, thanks ShunKang.
> > >
> > > Though its proposed motivation is on consumer fetcher's
> deserialization,
> > I
> > > think adding an overloaded method with ByteBuffer would help with other
> > > serde places on the client side as well.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Sep 22, 2022 at 9:41 AM ShunKang Lin <
> linshunkang@gmail.com>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I'd like to open the vote for KIP-863, which proposes to reduce
> memory
> > > > allocation and memory copying in Fetcher#parseRecord(TopicPartition,
> > > > RecordBatch, Record).
> > > >
> > > > The proposal is here:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
> > > >
> > > > Thanks to all who reviewed the proposal, and thanks in advance for
> > taking
> > > > the time to vote!
> > > >
> > > > Best,
> > > > ShunKang
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: [DISCUSS] KIP-874: TopicRoundRobinAssignor

2022-10-21 Thread Mathieu Amblard
Hello everybody,

Just to let you know that I have added a chapter about having multiple
containers (multiple pods for Kubernetes) running the same application :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-874%3A+TopicRoundRobinAssignor#KIP874:TopicRoundRobinAssignor-Howdoesitworkifwehavemultiplecontainersrunningthesameapplication
?

Regards,
Mathieu

Le mar. 11 oct. 2022 à 20:00, Mathieu Amblard  a
écrit :

> Hi Hector,
>
>
>
> First, thank you for your questions !
>
>
>
> *If the goal is to do the partition assignments at a topic level, wouldn't
> having single-partition topics solve this problem?*
>
>
>
> We are in a microservices environment; therefore, we can have multiple
> containers running the same application.
>
> Using the TopicRoundRobinAssignor, the partitions are uniformly balanced
> to each container, and we can get better performances.
>
> Let suppose there are 2 instances of the same application A0 and A1, 2
> consumers C0 and C1, and two topics t0 and t1. t0 has 3 partitions and t1
> has two partitions resulting in partitions : t0p0, t0p1, t0p2, t1p0, t1p1.
>
> If we use the TopicRoundRobinAssignor, the assignment will be :
>
> A0 : [ C0: [t0p0, t0p2], C1: [t1p0] ]
>
> A1 : [ C0: [t0p1], C1: [t1p1] ]
>
>
>
> *How will the group leader know that T2 should not be re-assigned on the
> next rebalance? Can you elaborate a bit more on the mechanisms used to
> communicate this state to the other group members?*
>
>
>
> Currently, the group leader will not know that T2 should not be
> re-assigned on the next balance.
>
> For this first iteration, we simply keep in memory that T2 has a poison
> pill and therefore we ignore all incoming messages from T2. We basically
> consume them without acknowledging them.
>
> As you can imagine, in the case of having multiple instances of the same
> application, in case of error, the partition will be rebalanced to another
> instance.
>
> Nevertheless, this is not really a problem (at least for our use cases),
> as soon as the poison pill is consumed, the consumer of this other instance
> will be stopped, and so on, and so on. It will take a few tens of seconds
> before the last consumer of the poison pill will be stopped and so the
> consumption of the entire topic.
>
> For a second iteration, we have planned to find a solution to avoid this
> time lapse between the consumer of the first instance being stopped and the
> last one. Currently, I do not have a solution, but we are thinking about
> different options, avoiding rebalancing partitions containing a poison pill
> is one of them.
>
>
>
> Cheers,
>
> Mathieu
>
> Le ven. 7 oct. 2022 à 16:54, Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> hgerald...@bloomberg.net> a écrit :
>
>> Hi Mathieu. I took a look at your KIP and have a couple questions.
>>
>> If the goal is to do the partition assignments at a topic level, wouldn't
>> having single-partition topics solve this problem?
>>
>> You also mentioned that your goal is to minimize the potential of a
>> poison pill message breaking all members of a group (by keeping track of
>> which topics have 'failed'), but it is not clear how this can be achieved
>> with this assignor. If we imagine an scenario where:
>>
>> * A group has 3 members (A, B, C)
>> * Members are subscribed to 3 topics (T1, T2, T3)
>> * Each member is assigned one topic (A[T1], B[T2], C[T3])
>> * One member fails to consume from a topic/partition (B[T2]), and goes
>> into failed state
>>
>> How will the group leader know that T2 should not be re-assigned on the
>> next rebalance? Can you elaborate a bit more on the mechanisms used to
>> communicate this state to the other group members?
>>
>> Thanks
>>
>> From: dev@kafka.apache.org At: 10/05/22 03:47:33 UTC-4:00To:
>> dev@kafka.apache.org
>> Subject: [DISCUSS] KIP-874: TopicRoundRobinAssignor
>>
>> Hi Kafka Developers,
>>
>> My proposal is to add a new partition assignment strategy at the topic
>> level to :
>>  - have a better data consistency by consumed topic in case of exception
>>  - have a solution much thread safe for the consumer
>> In case there are multiple consumers and multiple topics.
>>
>> Here is the link to the KIP with all the explanations :
>> https://cwiki.apache.org/confluence/x/XozGDQ
>>
>> Thank you in advance for your feedbacks,
>> Mathieu
>>
>>
>>