RE: Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-08-24 Thread Victoria Xia
Hi Alieh,

Thanks for the KIP!

1. As mentioned in the discussion thread for KIP-960, I think it'd be good to 
add a new method to the VersionedKeyValueStore interface for supporting 
single-key, multi-timestamp lookups, as part of implementing these new 
interactive queries so that the IQ implementation can simply call the relevant 
underlying method.

2. Also, I'd like to discuss whether it'd be better to update the 
VersionedRecord class to allow null values, or to add a validTo timestamp field 
to VersionedRecord instead. Here's an example scenario to consider: suppose a 
user puts two values and two tombstones into a versioned store, all for the 
same key:

put(k, v1, time=0)
put(k, null, time=5)
put(k, null, time=10)
put(k, v2, time=20)

And then issues an interactive query for all records for this key, in order to 
get back a `ValueIterator>`. How many records do you expect 
to be in ValueIterator?

By proposing to allow null values in VersionedRecord, I think you are proposing 
either four records ((v1, t=0), (null, t=5), (null, t=10), (v2, t=20)) or three 
records ((v1, t=0), (null, t=5), (v2, t=20)) but there's a case to be made that 
actually only two records should be returned ((v1, t=0, validTo=5), (v2, t=20, 
validTo=null/infinity)). The last interpretation is most consistent with the 
fact that `get(key)` and `get(key, asOfTimestamp)` do not distinguish between 
tombstones and no records having been inserted. As in, get(key=k, 
asOfTimestamp=7) returns null instead of a VersionedRecord with null value and 
timestamp 5. Curious to hear what others think about this.

Best,
Victoria

On 2023/08/17 02:13:24 "Matthias J. Sax" wrote:
> Thanks for splitting this part into a separate KIP!
> 
> For `withKey()` we should be explicit that `null` is not allowed.
> 
> (Looking into existing `KeyQuery` it seems the JavaDocs don't cover this 
> either -- would you like to do a tiny cleanup PR for this, or fix 
> on-the-side in one of your PRs?)
> 
> 
> 
> > The key query returns all the records that are valid in the time range 
> > starting from the timestamp {@code fromTimestamp}.
> 
> In the JavaDocs you use the phrase `are valid` -- I think we need to 
> explain what "valid" means? It might even be worth to add some examples. 
> It's annoying, but being precise if kinda important.
> 
> With regard to KIP-962, should we allow `null` for time bounds ? The 
> JavaDocs should also be explicit if `null` is allowed or not and what 
> the semantics are if allowed.
> 
> 
> 
> You are using `asOf()` however, because we are doing time-range queries, 
> to me using `until()` to describe the upper bound would sound better (I 
> am not a native speaker though, so maybe I am off?)
> 
> 
> > The key query returns all the records that have timestamp <= {@code 
> > asOfTimestamp}.
> 
> This is only correct if not lower-bound is set, right?
> 
> 
> In your reply to KIP-960 you mentioned:
> 
> > the meaningless combinations are prevented by throwing exceptions.
> 
> We should add corresponding JavaDocs like:
> 
> @throws IllegalArgumentException if {@code fromTimestamp} is equal or
>  larger than {@code untilTimestamp}
> 
> Or something similar.
> 
> 
> With regard to KIP-960: if we need to introduce a `VersionedKeyQuery` 
> class for single-key-single-ts lookup, would we need to find a new name 
> for the query class of this KIP, given that the return type is different?
> 
> 
> -Matthias
> 
> 
> 
> On 8/16/23 10:57 AM, Alieh Saeedi wrote:
> > Hi all,
> > 
> > I splitted KIP-960
> > 
> > into three separate KIPs. Therefore, please continue discussions
> > about single-key, multi-timestamp interactive queries here. You can see all
> > the addressed reviews on the following page. Thanks in advance.
> > 
> > KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for
> > versioned state stores
> > 
> > 
> > I look forward to your feedback!
> > 
> > Cheers,
> > Alieh
> > 
> 

Re: [DISCUSS] KIP-960: Support interactive queries (IQv2) for versioned state stores

2023-08-24 Thread Victoria Xia
ee to make the queries more composable. I was considering to
> >> raise this originally, but hold off because `RangeQuery` is also not
> >> designed very composable. But for versioned store, we have many more
> >> combinations, so making it composable does make sense to me.
> >>
> >> About iterator order: I would also propose to be pragmatic, and only add
> >> what is simple to implement for now. We can always extend it later. We
> >> just need to clearly document the order (or say: order is not defined --
> >> also a valid option). Of course, if we limit what we add now, we should
> >> keep in mind how to extend the API in the future without the need to
> >> deprecate a lot of stuff (ideally, we would not need to deprecate
> >> anything but only extend what we have).
> >>
> >> Btw: I am also happy to de-scope this KIP to only implement the two
> >> queries Victoria mentioned being easy to implement, and do follow up
> >> KIPs for range queries. There is no need to do everything with a single
> >> KIP.
> >>
> >> About the original v-store KIP and `long` vs `Instance` -- I don't think
> >> we forget it. If the store is use inside a `Processor` using `long` is
> >> preferred because performance is important and we are on the hot code
> >> path. For IQ on the other hand, it's not the hot code path, and
> >> semantics exposed to the user are more important. -- At least, this is
> >> how we did it in the past.
> >>
> >>
> >> One more thoughts.
> >>
> >> The new `VersionedKeyQuery` seems to have two different query types
> >> merged into a single class. Queries which return a single result, and
> >> queries that return multiple results. This does not seem ideal. For
> >> `withKeyLatestValue` and `withKeyWithTimestampBound` (should we rename
> >> this to `withKeyAsOfTimestamp`?) I would expect to get a single
> >> `VersionedRecord` back, not an interator. Hence, we might need to
> >> split `VersionedKeyQuery` into two query types?
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >>
> >> On 8/9/23 6:46 AM, Victoria Xia wrote:
> >>> Hey Alieh,
> >>>
> >>> Thanks for the KIP!
> >>>
> >>> It looks like the KIP proposes three different types of interactive
> >> queries for versioned stores, though they are grouped together into two
> >> classes: VersionedKeyQuery adds supports for single-key,
> single-timestamp
> >> lookups, and also for single-key, multi-timestamp lookups, while
> >> VersionedRangeQuery additionally adds support for key-range queries.
> >>>
> >>> The first type of query (single-key, single-timestamp lookups) are
> >> already supported by versioned stores (per the VersionedKeyValueStore
> >> interface) today, so exposing these via interactive queries require low
> >> additional implementation effort, and are a quick win to users. The
> other
> >> two types of queries will require more effort to add, and also come with
> >> more design decisions. I've sorted my thoughts accordingly.
> >>>
> >>> Regarding single-key, multi-timestamp lookups:
> >>>
> >>> 1. If we add these, we should add a new method to the
> >> VersionedKeyValueStore interface to support this type of lookup.
> Otherwise,
> >> there is no easy/efficient way to compose methods from the existing
> >> interface in order to implement this type of lookup, and therefore the
> new
> >> interactive query type cannot be used on generic
> VersionedKeyValueStores.
> >>>
> >>> 2. I agree with Matthias's and Lucas's comments about being very
> >> explicit about what the timestamp range means. For consistency with
> >> single-key, single-timestamp lookups, I think the "upper timestamp
> bound"
> >> should really be an "as of timestamp bound" instead, so that it is
> >> inclusive. For the "lower timestamp bound"/start timestamp, we have a
> >> choice regarding whether to interpret it as the user saying "I want
> valid
> >> records for all timestamps in the range" in which case the query should
> >> return a record with timestamp earlier than the start timestamp, or to
> >> interpret it as the user saying "I want all records with timestamps in
> the
> >> range" in which case the query should not return any records with
> timestamp
> >> earlier than the start ti

RE: Re: [DISCUSS] KIP-960: Support interactive queries (IQv2) for versioned state stores

2023-08-09 Thread Victoria Xia
Hey Alieh,

Thanks for the KIP! 

It looks like the KIP proposes three different types of interactive queries for 
versioned stores, though they are grouped together into two classes: 
VersionedKeyQuery adds supports for single-key, single-timestamp lookups, and 
also for single-key, multi-timestamp lookups, while VersionedRangeQuery 
additionally adds support for key-range queries.

The first type of query (single-key, single-timestamp lookups) are already 
supported by versioned stores (per the VersionedKeyValueStore interface) today, 
so exposing these via interactive queries require low additional implementation 
effort, and are a quick win to users. The other two types of queries will 
require more effort to add, and also come with more design decisions. I've 
sorted my thoughts accordingly.

Regarding single-key, multi-timestamp lookups:

1. If we add these, we should add a new method to the VersionedKeyValueStore 
interface to support this type of lookup. Otherwise, there is no easy/efficient 
way to compose methods from the existing interface in order to implement this 
type of lookup, and therefore the new interactive query type cannot be used on 
generic VersionedKeyValueStores.

2. I agree with Matthias's and Lucas's comments about being very explicit about 
what the timestamp range means. For consistency with single-key, 
single-timestamp lookups, I think the "upper timestamp bound" should really be 
an "as of timestamp bound" instead, so that it is inclusive. For the "lower 
timestamp bound"/start timestamp, we have a choice regarding whether to 
interpret it as the user saying "I want valid records for all timestamps in the 
range" in which case the query should return a record with timestamp earlier 
than the start timestamp, or to interpret it as the user saying "I want all 
records with timestamps in the range" in which case the query should not return 
any records with timestamp earlier than the start timestamp. My current 
preference is for the former, but it'd be good to hear other opinions.

3. The existing VersionedRecord interface contains only a value and validFrom 
timestamp, and does not allow null values. This presents a problem for 
introducing single-key, multi-timestamp lookups because if there is a tombstone 
contained within the timestamp range of the query, then there is no way to 
represent this as part of a ValueIterator return type. You'll 
either have to allow null values or add a validTo timestamp to the returned 
records.

4. Also +1 to Matthias's question about standardizing the order in which 
records are returned. Will they always be returned in forwards-timestamp order? 
Reverse-timestamp order? Will users get a choice? It'd be good to make this 
explicit in the KIP.

Regarding key-range queries (either single-timestamp or multi-timestamp):

5. Same comment about adding new methods for this type of lookup to the 
VersionedKeyValueStore interface.

6. Again +1 to Matthias's question about the order in which records are 
returned, for multi-key, multi-timestamp queries. Organizing first by key and 
then by timestamp makes the most sense to me, based on the layout of the 
existing store implementation. (Trying to sort by timestamp would require 
reading potentially all keys into memory first, which is not feasible.)

I think the complexity of introducing single-key, multi-timestamp lookups and 
especially multi-key, multi-timestamp lookups is significantly higher than for 
single-key, single-timestamp lookups, so it'd be good to think about/guage what 
the use cases for these types of queries are before committing to the 
implementation, and also to stage the implementation to get single-key, 
single-timestamp lookups as a quick win first without blocking on the others. 
(Guessing you were already planning to do that, though :))

Also a separate question: 

7. What's the motivation for introducing new VersionedKeyQuery and 
VersionedRangeQuery types rather than re-using the existing KeyQuery and 
RangeQuery types, to add optional asOfTimestamp bounds? I can see pros and cons 
of each, just curious to hear your thoughts.

If you do choose to keep VersionedKeyQuery and VersionedRangeQuery separate 
from KeyQuery and RangeQuery, then you can remove the KeyQuery and RangeQuery 
placeholders in the versioned store implementation as part of implementing your 
KIP: 
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java#L142-L158

Best,
Victoria

On 2023/08/09 10:16:44 Bruno Cadonna wrote:
> Hi,
> 
> I will use the initials of the authors to distinguish the points.
> 
> LB 2.
> I like the idea of composable construction of queries. It would make the 
> API more readable. I think this is better than the VersionsQualifier, I 
> proposed in BC 3..
> 
> LB 4. and LB 5.
> Being explicit on the time bounds and key ranges is really important.
> 
> LB 6.
> I 

Re: [DISCUSS] KIP-923: Add A Grace Period to Stream Table Join

2023-06-06 Thread Victoria Xia
Handling the "changelog" topic for the buffer in the same way as
repartition topics makes sense. Thanks for clearing that up!

- Victoria

On Tue, Jun 6, 2023 at 10:22 AM Walker Carlson
 wrote:

> Good Point Victoria. I just removed the compacted topic mention from the
> KIP. I agree with Burno about using a normal topic and deleting records
> that have been processed.
>
> On Tue, Jun 6, 2023 at 2:28 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > another idea that came to my mind. Instead of using a compacted topic,
> > the buffer could use a non-compacted topic and regularly delete records
> > before a given offset as Streams does for repartition topics.
> >
> > Best,
> > Bruno
> >
> > On 05.06.23 21:48, Bruno Cadonna wrote:
> > > Hi Victoria,
> > >
> > > that is a good point!
> > >
> > > I think, the topic needs to be a compacted topic to be able to get rid
> > > of records that are evicted from the buffer. So the key might be
> > > something with the key, the timestamp, and a sequence number to
> > > distinguish between records with the same key and same timestamp.
> > >
> > > Just an idea! Maybe Walker comes up with something better.
> > >
> > > Best,
> > > Bruno
> > >
> > > On 05.06.23 20:38, Victoria Xia wrote:
> > >> Hi Walker,
> > >>
> > >> Thanks for the latest updates! The KIP looks great. Just one question
> > >> about
> > >> the changelog topic for the join buffer: The KIP says "When a failure
> > >> occurs the buffer will try to recover from an OffsetCheckpoint if
> > >> possible.
> > >> If not it will reload the buffer from a compacted change-log topic."
> > This
> > >> is a new changelog topic that will be introduced specifically for the
> > >> join
> > >> buffer, right? Why is the changelog topic compacted? What are the
> keys?
> > I
> > >> am confused because the buffer contains records from the stream-side
> > >> of the
> > >> join, for which multiple records with the same key should be treated
> as
> > >> separate updates will all must be tracked in the buffer, rather than
> > >> updates which replace each other.
> > >>
> > >> Thanks,
> > >> Victoria
> > >>
> > >> On Mon, Jun 5, 2023 at 1:47 AM Bruno Cadonna 
> > wrote:
> > >>
> > >>> Hi Walker,
> > >>>
> > >>> Thanks once more for the updates to the KIP!
> > >>>
> > >>> Do you also plan to expose metrics for the buffer?
> > >>>
> > >>> Best,
> > >>> Bruno
> > >>>
> > >>> On 02.06.23 17:16, Walker Carlson wrote:
> > >>>> Hello Bruno,
> > >>>>
> > >>>> I think this covers your questions. Let me know what you think
> > >>>>
> > >>>> 2.
> > >>>> We can use a changelog topic. I think we can treat it like any other
> > >>> store
> > >>>> and recover in the usual manner. Also implementation is on disk
> > >>>>
> > >>>> 3.
> > >>>> The description is in the public interfaces description. I will copy
> > it
> > >>>> into the proposed changes as well.
> > >>>>
> > >>>> This is a bit of an implementation detail that I didn't want to add
> > >>>> into
> > >>>> the kip, but the record will be added to the buffer to keep the
> stream
> > >>> time
> > >>>> consistent, it will just be ejected immediately. If of course if
> this
> > >>>> causes performance issues we will skip this step and track stream
> time
> > >>>> separately. I will update the kip to say that stream time advances
> > >>>> when a
> > >>>> stream record enters the node.
> > >>>>
> > >>>> Also, yes, updated.
> > >>>>
> > >>>> 5.
> > >>>> No there is no difference right now, everything gets processed as it
> > >>> comes
> > >>>> in and tries to find a record for its time stamp.
> > >>>>
> > >>>> Walker
> > >>>>
> > >>>> On Fri, Jun 2, 2023 at 6:41 AM Bruno Cadonna 
> > >>>> wrote:
> > >>>>
> > >&g

Re: [VOTE] KIP-923: Add A Grace Period to Stream Table Join

2023-06-05 Thread Victoria Xia
Hi Walker,

Thanks for the KIP! Left a clarification question on the discussion thread
just now but it's about an implementation detail, so I don't think it
changes anything in this vote thread.

+1 (non-binding)

Cheers,
Victoria

On Mon, Jun 5, 2023 at 10:23 AM Bill Bejeck  wrote:

> Hi Walker,
>
> Thanks for the KIP.
>
> I've caught up on the discussion thread and I'm satisfied with all
> responses.
>
> +1(binding)
>
> -Bill
>
> On Mon, Jun 5, 2023 at 10:20 AM Bruno Cadonna  wrote:
>
> > Hi Walker,
> >
> > Thank you for the KIP!
> >
> > +1 (binding)
> >
> > Best,
> > Bruno
> >
> > On 24.05.23 23:00, Walker Carlson wrote:
> > > Hello everybody,
> > >
> > > I'm opening the vote on KIP-923 here
> > > .
> > >
> > > If we have more to discus please continue the discussion on the
> existing
> > > thread
> https://www.mail-archive.com/dev@kafka.apache.org/msg130657.html
> > >
> > > best,
> > > Walker
> > >
> >
>


Re: [DISCUSS] KIP-923: Add A Grace Period to Stream Table Join

2023-06-05 Thread Victoria Xia
nse to allow a grace period if the table is
> >>>>>> non-versioned? You also say: "If table is not materialized it will
> >>>>>> materialize it as versioned." -- What history retention time would
> we
> >>>>>> pick for this case (also asked by Victoria)? Or should we rather not
> >>>>>> support this and force the user to materialize the table explicitly,
> >> and
> >>>>>> thus explicitly picking a history retention time? It's tradeoff
> >> between
> >>>>>> usability and guiding uses that there will be a significant impact
> on
> >>>>>> disk usage. There is also compatibility concerns: If the table is
> not
> >>>>>> explicitly materialized in the old program, we would already need to
> >>>>>> materialize it also in the old program (of course, we would use a
> >>>>>> non-versioned store so far). Thus, if somebody adds a grace period,
> we
> >>>>>> cannot just switch the store type, as it would be a breaking change,
> >>>>>> potentially required an application re-set, or following the upgrade
> >>>>>> path for versioned state stores, and also changing the program to
> >>>>>> explicitly materialize using a versioned store. Also note, that we
> >> might
> >>>>>> not materialize the actual join table, but only an upstream table,
> and
> >>>>>> use `ValueGetter` to access the upstream data.
> >>>>>>
> >>>>>> To this end, as you already mentioned, history retention of the
> table
> >>>>>> should be at least grace period. You proposed to include this in a
> >>>>>> follow up KIP, but I am wondering if it's a fundamental requirement
> >> and
> >>>>>> thus we should put a check in place right away and reject an invalid
> >>>>>> configuration? (It always easier to lift restriction than to
> introduce
> >>>>>> them later.) This would also imply that a non-versioned table cannot
> >> be
> >>>>>> supported, because it does not have a history retention that is
> larger
> >>>>>> than grace period, and maybe also answer the requirement about
> >>>>>> materialization: as we already always materialize something on the
> >>>>>> tablet side as non-versioned store right now, it seems difficult to
> >>>>>> migrate the store to a versioned store. Ie, it might be ok to push
> the
> >>>>>> burden onto the user and say: if you start using grace period, you
> >> also
> >>>>>> need to manually switch from non-versioned to versioned KTables.
> Doing
> >>>>>> stuff automatically under the hood if very complex for this case, we
> >> if
> >>>>>> we push the burden onto the user, it might be ok to not complicate
> >> this
> >>>>>> KIP significantly.
> >>>>>>
> >>>>>> To summarize the last two paragraphs, I would propose to:
> >>>>>>  - don't support non-versioned KTables
> >>>>>>  - if grace period is added, users need to explicitly
> materialize
> >> the
> >>>>>> table as version (either directly, or upstream. Upstream only works
> if
> >>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914)
> >>>>>>  - the table's history retention time must be larger than the
> grace
> >>>>>> period (should be easy to check at runtime, when we build the
> >> topology)
> >>>>>>  - because switching from non-versioned to version stores is not
> >>>>>> backward compatibly (cf KIP-914), users need to take care of this
> >>>>>> themselves, and this also implies that adding grace period is not a
> >>>>>> backward compatible change (even only if via indirect means)
> >>>>>>
> >>>>>> About dropping late records: wondering if we should never drop a
> >>>>>> stream-side record for a left-join, even if it's late? In general,
> one
> >>>>>> thing I observed over the years is, that it's easier to keep stuff
> and
> >>>>>> let users filter explicitly downstream (or make it configurable),
> >>>>>> instead of dropping pro-actively, because user

Re: [DISCUSS] KIP-923: Add A Grace Period to Stream Table Join

2023-05-24 Thread Victoria Xia
right away when read (instead of blocking them behind in-order records
> > that are not ready yet). -- It might even be a possibility, to let users
> > pick a emit strategy eg "EmitStrategy.preserveOffsets" (name just a
> > placeholder).
> >
> > The KIP should explain this in more detail and also discuss different
> > options and mention them in "Rejected alternatives" in case we don't
> > want to include them.
> >
> >
> > 50) What happens when users change the grace period? Especially, when
> > they turn it on/off (but also increasing/decreasing is an interesting
> > point)? I think we should try to support this if possible; the
> > "Compatibility" section needs to cover switching on/off in more detail.
> >
> >
> > -Matthias
> >
> >
> >
> >
> > On 5/2/23 2:06 PM, Victoria Xia wrote:
> > > Cool KIP, Walker! Thanks for sharing this proposal.
> > >
> > > A few clarifications:
> > >
> > > 1. Is the order that records exit the buffer in necessarily the same as
> > the
> > > order that records enter the buffer in, or no? Based on the description
> > in
> > > the KIP, it sounds like the answer is no, i.e., records will exit the
> > > buffer in increasing timestamp order, which means that they may be
> > ordered
> > > (even for the same key) compared to the input order.
> > >
> > > 2. What happens if the join grace period is nonzero, and a stream-side
> > > record arrives with a timestamp that is older than the current stream
> > time
> > > minus the grace period? Will this record trigger a join result, or will
> > it
> > > be dropped? Based on the description for what happens when the join
> grace
> > > period is set to zero, it sounds like the late record will be dropped,
> > even
> > > if the join grace period is nonzero. Is that true?
> > >
> > > 3. What could cause stream time to advance, for purposes of removing
> > > records from the join buffer? For example, will new records arriving on
> > the
> > > table side of the join cause stream time to advance? From the KIP it
> > sounds
> > > like only stream-side records will advance stream time -- does that
> mean
> > > that the join processor itself will have to track this stream time?
> > >
> > > Also +1 to Lucas's question about what options will be available for
> > > configuring the join buffer. Will users have the option to choose
> whether
> > > they want the buffer to be in-memory vs persistent?
> > >
> > > - Victoria
> > >
> > > On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
> > >  wrote:
> > >
> > >> HI Walker,
> > >>
> > >> thanks for the KIP! We definitely need this. I have two questions:
> > >>
> > >>   - Have you considered allowing the customization of the underlying
> > >> buffer implementation? As I can see, `StreamJoined` lets you customize
> > >> the underlying store via a `WindowStoreSupplier`. Would it make sense
> > >> for `Joined` to have this as well? I can imagine one may want to limit
> > >> the number of records in the buffer, for example. If we hit the
> > >> maximum, the only option would be to drop semantic guarantees, but
> > >> users may still want to do this.
> > >>   - With "second option on the table side" you are referring to
> > >> versioned tables, right? Will the buffer on the stream side behave any
> > >> different whether the table side is versioned or not?
> > >>
> > >> Finally, I think a simple example in the motivation section could help
> > >> non-experts understand the KIP.
> > >>
> > >> Best,
> > >> Lucas
> > >>
> > >> On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
> > >>  wrote:
> > >>>
> > >>> Hello everybody,
> > >>>
> > >>> I have a stream proposal to improve the stream table join by adding a
> > >> grace
> > >>> period and buffer to the stream side of the join to allow processing
> in
> > >>> timestamp order matching the recent improvements of the versioned
> > tables.
> > >>>
> > >>> Please take a look here <
> https://cwiki.apache.org/confluence/x/lAs0Dw>
> > >> and
> > >>> share your thoughts.
> > >>>
> > >>> best,
> > >>> Walker
> > >>
> > >
> >
>


Re: [DISCUSS] KIP-923: Add A Grace Period to Stream Table Join

2023-05-02 Thread Victoria Xia
Cool KIP, Walker! Thanks for sharing this proposal.

A few clarifications:

1. Is the order that records exit the buffer in necessarily the same as the
order that records enter the buffer in, or no? Based on the description in
the KIP, it sounds like the answer is no, i.e., records will exit the
buffer in increasing timestamp order, which means that they may be ordered
(even for the same key) compared to the input order.

2. What happens if the join grace period is nonzero, and a stream-side
record arrives with a timestamp that is older than the current stream time
minus the grace period? Will this record trigger a join result, or will it
be dropped? Based on the description for what happens when the join grace
period is set to zero, it sounds like the late record will be dropped, even
if the join grace period is nonzero. Is that true?

3. What could cause stream time to advance, for purposes of removing
records from the join buffer? For example, will new records arriving on the
table side of the join cause stream time to advance? From the KIP it sounds
like only stream-side records will advance stream time -- does that mean
that the join processor itself will have to track this stream time?

Also +1 to Lucas's question about what options will be available for
configuring the join buffer. Will users have the option to choose whether
they want the buffer to be in-memory vs persistent?

- Victoria

On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
 wrote:

> HI Walker,
>
> thanks for the KIP! We definitely need this. I have two questions:
>
>  - Have you considered allowing the customization of the underlying
> buffer implementation? As I can see, `StreamJoined` lets you customize
> the underlying store via a `WindowStoreSupplier`. Would it make sense
> for `Joined` to have this as well? I can imagine one may want to limit
> the number of records in the buffer, for example. If we hit the
> maximum, the only option would be to drop semantic guarantees, but
> users may still want to do this.
>  - With "second option on the table side" you are referring to
> versioned tables, right? Will the buffer on the stream side behave any
> different whether the table side is versioned or not?
>
> Finally, I think a simple example in the motivation section could help
> non-experts understand the KIP.
>
> Best,
> Lucas
>
> On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
>  wrote:
> >
> > Hello everybody,
> >
> > I have a stream proposal to improve the stream table join by adding a
> grace
> > period and buffer to the stream side of the join to allow processing in
> > timestamp order matching the recent improvements of the versioned tables.
> >
> > Please take a look here 
> and
> > share your thoughts.
> >
> > best,
> > Walker
>


[jira] [Created] (KAFKA-14949) Add Streams upgrade tests from AK 3.4

2023-04-27 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14949:


 Summary: Add Streams upgrade tests from AK 3.4
 Key: KAFKA-14949
 URL: https://issues.apache.org/jira/browse/KAFKA-14949
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Victoria Xia


Streams upgrade tests currently only test upgrading from 3.3 and earlier 
versions 
([link|https://github.com/apache/kafka/blob/056657d84d84e116ffc9460872945b4d2b479ff3/tests/kafkatest/tests/streams/streams_application_upgrade_test.py#L30]).
 We should add 3.4 as an "upgrade_from" version into these tests, in light of 
the upcoming 3.5 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-13 Thread Victoria Xia
Hey everyone,

Two more minor updates to the KIP came out of wrapping up the
implementation and discussions with Guozhang and Matthias offline:

   - Versioned stores will be disabled for global tables and also the
   suppress operator, in order to limit the scope of these changes and to
   prevent unexpected semantic consequences.
   - The return type of `VersionedKeyValueStore#put(...)` has been updated
   from `void` to `long`, where the long represents the validTo timestamp of
   the newly put record, and may also be used to determine whether the call to
   put() was accepted or not (due to grace period having been expired). This
   is necessary in order to allow processors to determine whether records are
   out-of-order or not.

The KIP has been updated with more details about each of the two changes.
Barring objections, the current state of the KIP is what will be released
with 3.5 as the release deadline is fast approaching.

Thanks,
Victoria

On Tue, Apr 11, 2023 at 2:56 PM Victoria Xia 
wrote:

> Thanks for your comments and suggestions, Matthias, Lucas, and Guozhang!
>
> I was just in the process of responding when I saw Guozhang's message. I
> came up with a different approach to simplify my proposal with regards to
> the table aggregate processor, as part of mulling over comments from
> Matthias and Lucas: when aggregating a versioned table, instead of dropping
> out-of-order records at the aggregate processor (after the repartition), we
> can equivalently drop them before the repartition at the repartition map
> processor. With this new approach, no changes to the repartition topic
> format are necessary as part of this KIP.
>
> As an example, consider a table aggregation which counts the number of
> appearances of each value of the table. The repartition prior to the
> aggregation will map values (pre-repartition) to keys (post-repartition) as
> part of the groupBy(), in order for subsequent aggregation over the
> (original) values to occur.
>
> source record --> repartition record(s)
> (key=k, value=v1, ts=2) --> (key=v1, newValue=v1, oldValue=null, newTs=2,
> oldTs=-1)
> (key=k, value=v2, ts=1) --> (key=v2, newValue=v2, oldValue=null, newTs=1,
> oldTs=2), (key=v1, newValue=null, oldValue=v1, newTs=1, oldTs=2)
>
> Under the old proposal, the aggregate processor would see the last two
> repartition records and drop them as out-of-order (because newTs is older
> than oldTs). Under the new proposal, the repartition map processor would
> not send any repartition records upon seeing the second source record (with
> value=v2) because its timestamp is older than the latest timestamp seen for
> the key (k).
>
> This new approach is simpler than what's currently written in the KIP
> because no repartition topic format change is required, and also saves on
> unnecessary repartition topic records.
>
> In comparing this suggestion to Guozhang's suggestion:
>
>- A main difference between the two proposals is that under this
>proposal, the "old value" when an out-of-order record is forwarded
>downstream from a versioned table would be the current latest (by
>timestamp) value, rather than the same as the new value (as in Guozhang's
>proposal).
>- In both proposals, the various processors (table aggregate, joins,
>suppress) actually do not need to call get(key) on the materialization to
>determine whether the record is out-of-order or not. If the "old value" is
>the latest record, then a timestamp comparison would suffice (assuming that
>the old record timestamp is added into the `Change` object). If the "old
>value" is the same as the "new value", then an equality check on the two
>values is sufficient.
>- In both proposals, the repartition topic format does not need to be
>updated.
>
> I think regardless of which of the two implementations we go with, the net
> effect will be hidden from users, in which case it may be better to discuss
> which to pick as part of implementation rather than on this KIP itself. (I
> can tell I will need more time to process the tradeoffs :-) ) Regardless, I
> will update the KIP to reflect that no repartition topic format change is
> required, which is indeed a great simplification.
>
> > for the repartition topic format change, do we want to re-use flag=2, or
> should we introduce flag=3, and determine when compiling the DSL into the
> Topology if we want/need to include the timestamp, and if not, use format
> version=2 to avoid unnecessary overhead?
>
> I believe the new proposal above is even more efficient in terms of
> avoiding unnecessary overhead. LMK what you think.
>
> > for detecting out-of-order records, I need both new and old timestamp, I
> suppose 

Re: Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-11 Thread Victoria Xia
wing operations
> today:
>
> a. Read topic as a table source
> b. KStream#toTable
> c. Aggregated from a stream
> d. Aggregated from a table
> e. From a table-table join.
> f. From a stateless table operator like a `table.filter`.
>
> Per this KIP, case d) and e) should not emit out-of-order records to
> the downstream any more, so we only need to consider the others: today
> when we send out the old/new pairs downstream, the old value is just
> the value read right before being overwritten from the materialized
> store. If the store is versioned, however, then this old value is
> already been sent before as part of the new/old pair, so it's actually
> correct to just indicate the old/new pair as just the out-of-order
> record itself? More specifically: say given a table source operator,
> with the topic's incoming records for the same key:
>
> (A, t10), (B, t20), (C, t15)
>
> If the store is not versioned, we would emit, in the form of old/new:
>
> A10/null, B20/A10, C15/B20
>
> While if there's no such out-of-ordering, the ideal emit ordering should
> be:
>
> A10/null, C15/A10, B20/C15
>
> So I'm thinking, if the store is versioned, we should try to emit in a
> way that is as coherent with the ideal ordering as possible, for the
> downstream operators to handle:
>
> A10/null, B20/A10, C15/null, null/C15 (or to be succinct, just
> A10/null, B20/A10, C15/C15)
>
> This is because, the A10 is already sent as part of the B20/A10 before
> C comes, in order for downstream operators to negate its effect; so
> when C comes, we only need to let the downstream know that "there was
> a C coming at t15 between A at 10 and B at 20, which is already
> obsoleted because of the later B20 that I sent you before".
>
> This gives the underlying operator the correct information, which can
> handle it accordingly:
>
> * For aggregate operators, it can simply ignore the C15/C15 from upstream.
> * For stateless operators, it just apply the filter still on C15/C15
> and forward downwards.
> * For join operators, as this KIP indicated, it would apply the join
> if necessary and not emit the older join results.
>
> If we can do that, then maybe we do not even need to change the
> repartition topic format again?
>
>
> Guozhang
>
> On Tue, Apr 11, 2023 at 11:17 AM Matthias J. Sax  wrote:
> >
> > If we send old and new value as two messages, this should work I guess?
> > Victory could confirm. -- But not if we send old/new as a single message
> > in case the new-key does not change?
> >
> > -Matthias
> >
> > On 4/11/23 5:25 AM, Lucas Brutschy wrote:
> > > Hi,
> > >
> > > No concerns at all, just a clarifying question from my side: for
> > > detecting out-of-order records, I need both new and old timestamp, I
> > > suppose I get it for the new record via timestamp extractor, can I not
> > > get it the same way from the old record that is passed down to the
> > > aggregation after KIP-904?
> > >
> > > Thanks,
> > > Lucas
> > >
> > > On Tue, Apr 11, 2023 at 5:35 AM Matthias J. Sax 
> wrote:
> > >>
> > >> Thanks.
> > >>
> > >> One question: for the repartition topic format change, do we want to
> > >> re-use flag=2, or should we introduce flag=3, and determine when
> > >> compiling the DSL into the Topology if we want/need to include the
> > >> timestamp, and if not, use format version=2 to avoid unnecessary
> overhead?
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 4/10/23 5:47 PM, Victoria Xia wrote:
> > >>> Hi everyone,
> > >>>
> > >>> While wrapping up the implementation for KIP-914, I have discovered
> that
> > >>> two more DSL processors require semantic updates in the presence of
> > >>> versioned tables:
> > >>>
> > >>>  - The table filter processor has an optimization to drop nulls
> if the
> > >>>  previous filtered value is also null. When the upstream table
> is versioned,
> > >>>  this optimization should be disabled in order to preserve
> proper version
> > >>>  history in the presence of out-of-order data.
> > >>>  - When performing an aggregation over a versioned table, only
> the latest
> > >>>  value by timestamp (per key) should be included in the final
> aggregate
> > >>>  value. This is not happening today in the presence of
> out-of-order data,
> > >>>  due to the way that Table

Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-10 Thread Victoria Xia
Hi everyone,

While wrapping up the implementation for KIP-914, I have discovered that
two more DSL processors require semantic updates in the presence of
versioned tables:

   - The table filter processor has an optimization to drop nulls if the
   previous filtered value is also null. When the upstream table is versioned,
   this optimization should be disabled in order to preserve proper version
   history in the presence of out-of-order data.
   - When performing an aggregation over a versioned table, only the latest
   value by timestamp (per key) should be included in the final aggregate
   value. This is not happening today in the presence of out-of-order data,
   due to the way that TableSourceNodes call `get(key)` in order to determine
   the "old value" which is to be removed from the aggregate as part of
   applying an update. To fix this, aggregations should ignore out-of-order
   records when aggregating versioned tables.
  - In order to implement this change, table aggregate processors need
  a way to determine whether a record is out-of-order or not. This
cannot be
  done by querying the source table value getter as that store belongs to a
  different subtopology (because a repartition occurs before
aggregation). As
  such, an additional timestamp must be included in the repartition topic.
  The 3.5 release already includes an update to the repartition
topic format
  (with upgrade implications properly handled) via KIP-904
  
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed>,
  so making an additional change to the repartition topic format to add a
  timestamp comes at no additional cost to users.


I have updated the KIP
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores>
itself with more detail about each of these changes. Please let me know if
there are any concerns. In the absence of dissent, I'd like to include
these changes along with the rest of KIP-914 in the 3.5 release.

Apologies for not noticing these additional semantics implications earlier,
Victoria

-- Forwarded message ---------
From: Victoria Xia 
Date: Wed, Mar 22, 2023 at 10:08 AM
Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores
To: 


Thanks for voting, everyone! We have three binding yes votes with no
objections during four full days of voting. I will close the vote and mark
the KIP as accepted, right in time for the 3.5 release.

Thanks,
Victoria

On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna  wrote:

> +1 (binding)
>
> Thanks Victoria!
>
> Best,
> Bruno
>
> On 20.03.23 17:13, Matthias J. Sax wrote:
> > +1 (binding)
> >
> > On 3/20/23 9:05 AM, Guozhang Wang wrote:
> >> +1, thank you Victoria!
> >>
> >> On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
> >>  wrote:
> >>>
> >>> Hi all,
> >>>
> >>> I'd like to start a vote on KIP-914 for updating the Kafka Streams join
> >>> processors to use proper timestamp-based semantics in applications with
> >>> versioned stores:
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> >>>
> >>> To avoid compatibility concerns, I'd like to include the changes from
> >>> this
> >>> KIP together with KIP-889
> >>> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >
> >>> (for introducing versioned stores) in the upcoming 3.5 release. I will
> >>> close the vote on the 3.5 KIP deadline, March 22, if there are no
> >>> objections before then.
> >>>
> >>> Thanks,
> >>> Victoria
>


[jira] [Created] (KAFKA-14864) Memory leak in KStreamWindowAggregate with ON_WINDOW_CLOSE emit strategy

2023-03-28 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14864:


 Summary: Memory leak in KStreamWindowAggregate with 
ON_WINDOW_CLOSE emit strategy
 Key: KAFKA-14864
 URL: https://issues.apache.org/jira/browse/KAFKA-14864
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Victoria Xia
Assignee: Victoria Xia
 Fix For: 3.5.0


The Streams DSL processor implementation for the ON_WINDOW_CLOSE emit strategy 
during KStream windowed aggregations opens a key-value iterator but does not 
call `close()` on it 
([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java#L203]),
 despite the Javadocs for the iterator making clear that users must do so in 
order to release resources 
([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java#L27]).
  

I discovered this bug while running load testing benchmarks and noticed that 
some runs were sporadically hitting OOMs, so it is definitely possible to hit 
this in practice.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14834) Improved stream-table and table-table join semantics for versioned stores

2023-03-22 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14834:


 Summary: Improved stream-table and table-table join semantics for 
versioned stores
 Key: KAFKA-14834
 URL: https://issues.apache.org/jira/browse/KAFKA-14834
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Victoria Xia
Assignee: Victoria Xia


With the introduction of versioned state stores in 
[KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
 we should leverage them to provide improved join semantics. 

As described in 
[KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores],
 we will make the following two improvements:
 * stream-table joins will perform a timestamped lookup (using the stream-side 
record timestamp) if the table is materialized with a versioned store
 * table-table joins, including foreign key joins, will not produce new join 
results on out-of-order records (by key) from tables materialized with 
versioned stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-22 Thread Victoria Xia
Thanks for voting, everyone! We have three binding yes votes with no
objections during four full days of voting. I will close the vote and mark
the KIP as accepted, right in time for the 3.5 release.

Thanks,
Victoria

On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna  wrote:

> +1 (binding)
>
> Thanks Victoria!
>
> Best,
> Bruno
>
> On 20.03.23 17:13, Matthias J. Sax wrote:
> > +1 (binding)
> >
> > On 3/20/23 9:05 AM, Guozhang Wang wrote:
> >> +1, thank you Victoria!
> >>
> >> On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
> >>  wrote:
> >>>
> >>> Hi all,
> >>>
> >>> I'd like to start a vote on KIP-914 for updating the Kafka Streams join
> >>> processors to use proper timestamp-based semantics in applications with
> >>> versioned stores:
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> >>>
> >>> To avoid compatibility concerns, I'd like to include the changes from
> >>> this
> >>> KIP together with KIP-889
> >>> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >
> >>> (for introducing versioned stores) in the upcoming 3.5 release. I will
> >>> close the vote on the 3.5 KIP deadline, March 22, if there are no
> >>> objections before then.
> >>>
> >>> Thanks,
> >>> Victoria
>


Re: [DISCUSS] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-20 Thread Victoria Xia
The voting thread for this KIP is open. Thanks, Guozhang and Matthias, for
already having voted!

On Fri, Mar 17, 2023 at 1:18 PM Guozhang Wang 
wrote:

> Thanks Matthias / Victoria, both bullet points make sense to me.
>
> On Thu, Mar 16, 2023 at 10:39 AM Victoria Xia
>  wrote:
> >
> > Thanks for your comments, Matthias!
> >
> > > For stream-table joins, I think we need to elaborate that a `get(k,
> ts)`
> > call now might return `null` if the history retention of the store is too
> > short.
> >
> > Great callout -- I agree we should definitely clarify this in the KIP and
> > mention it in the eventual docs as well.
> >
> > When a call to `get(k, ts)` returns null, there's not really a good way
> to
> > distinguish whether it's because the timestamp is outside of the store's
> > history retention or if it's because there's actually no record version
> for
> > the key at the specified timestamp. Determining this from the processor
> > would require (1) exposing the store's history retention to the
> processor,
> > and (2) reconciling the fact that state stores today (including the new
> > versioned store implementation) track their own observed stream time
> > separate from processor time.
> >
> > In light of this, I think your proposal to treat a null from `get(k, ts)`
> > due to history retention having been exceeded the same as we'd treat any
> > other null makes sense, and is also our only viable option right now.
> I'll
> > call this out in the docs so users are aware that their choice of history
> > retention has this implication.
> >
> > > For left-table-table joins, there seems to be no special impact, but it
> > should be called out, too. The lookup itself does not go into the history
> > of the table so no change here (as we don't have the "query older than
> > history case")
> >
> > Yup, we're on the same page. Using a versioned store for table-table
> joins
> > results in the semantic change that the join result will include the
> > latest-by-timestamp record rather than the latest-by-offset record, but
> no
> > timestamped lookups (i.e., `get(key, ts)` calls) are used in the process
> so
> > there is no concern about history retention having elapsed and affecting
> > join results. (The only implication of history retention for this use
> case
> > is indirect, since history retention doubles as grace period for the
> store.
> > Because grace period is per store instance, which has task-level
> > granularity, that means if grace period is set too low then the latest
> > record for one key could be dropped from the store if another key has
> > already advanced the store's observed stream time past the grace period
> by
> > the time that this record is seen.)
> >
> > I will update the KIP with these additional notes.
> >
> > Thanks,
> > Victoria
> >
> > On Wed, Mar 15, 2023 at 7:16 PM Matthias J. Sax 
> wrote:
> >
> > > Thanks for the KIP! Great to see a first step towards using the new
> > > versioned stores!
> > >
> > > I think the described tradeoffs make sense and I like make a pragmatic
> > > step into the right direction, and avoid boiling the ocean. Thus, I
> > > agree to the proposed solution.
> > >
> > > One minor thing, that I believe just need clarification in the KIP
> (does
> > > not seem to be a change to the KIP itself):
> > >
> > > For stream-table joins, I think we need to elaborate that a `get(k,
> ts)`
> > > call now might return `null` if the history retention of the store is
> > > too short. For inner-joins it would result in no output record (ie,
> > > stream input record is dropped). Would be good to have it mentioned in
> > > the KIP explicitly.
> > >
> > > We should also discuss how left-joins should work for this case. I
> think
> > > it's ok (better) to include the stream record in the result if the
> > > lookup returns `null` -- either because no key exist in the exiting
> > > history for the provided timestamp, or (the actual case in question)
> > > because we query older than available history. If you agree, can we add
> > > this to the KIP?
> > >
> > > For left-table-table joins, there seems to be no special impact, but it
> > > should be called out, too. The lookup itself does not go into the
> > > history of the table so no change here (as we don't have the "query
> > > older than history case") -- and for out-of-order records, we just
> > 

[VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-18 Thread Victoria Xia
Hi all,

I'd like to start a vote on KIP-914 for updating the Kafka Streams join
processors to use proper timestamp-based semantics in applications with
versioned stores:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores

To avoid compatibility concerns, I'd like to include the changes from this
KIP together with KIP-889

(for introducing versioned stores) in the upcoming 3.5 release. I will
close the vote on the 3.5 KIP deadline, March 22, if there are no
objections before then.

Thanks,
Victoria


Re: [DISCUSS] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-17 Thread Victoria Xia
Hi everyone,

As mentioned in the KIP, I would like to include these changes together
with KIP-889 in the 3.5 release to avoid any potential compatibility
concerns. Given that the current discussion is nearing convergence and the
KIP deadline for 3.5 is five days away (March 22), I will initiate a vote
at the end of the day.

Happy to continue discussing even after the vote has started. Thanks for
all the great suggestions so far!

Best,
Victoria

On Thu, Mar 16, 2023 at 10:38 AM Victoria Xia 
wrote:

> Thanks for your comments, Matthias!
>
> > For stream-table joins, I think we need to elaborate that a `get(k, ts)`
> call now might return `null` if the history retention of the store is too
> short.
>
> Great callout -- I agree we should definitely clarify this in the KIP and
> mention it in the eventual docs as well.
>
> When a call to `get(k, ts)` returns null, there's not really a good way to
> distinguish whether it's because the timestamp is outside of the store's
> history retention or if it's because there's actually no record version for
> the key at the specified timestamp. Determining this from the processor
> would require (1) exposing the store's history retention to the processor,
> and (2) reconciling the fact that state stores today (including the new
> versioned store implementation) track their own observed stream time
> separate from processor time.
>
> In light of this, I think your proposal to treat a null from `get(k, ts)`
> due to history retention having been exceeded the same as we'd treat any
> other null makes sense, and is also our only viable option right now. I'll
> call this out in the docs so users are aware that their choice of history
> retention has this implication.
>
> > For left-table-table joins, there seems to be no special impact, but it
> should be called out, too. The lookup itself does not go into the history
> of the table so no change here (as we don't have the "query older than
> history case")
>
> Yup, we're on the same page. Using a versioned store for table-table joins
> results in the semantic change that the join result will include the
> latest-by-timestamp record rather than the latest-by-offset record, but no
> timestamped lookups (i.e., `get(key, ts)` calls) are used in the process so
> there is no concern about history retention having elapsed and affecting
> join results. (The only implication of history retention for this use case
> is indirect, since history retention doubles as grace period for the store.
> Because grace period is per store instance, which has task-level
> granularity, that means if grace period is set too low then the latest
> record for one key could be dropped from the store if another key has
> already advanced the store's observed stream time past the grace period by
> the time that this record is seen.)
>
> I will update the KIP with these additional notes.
>
> Thanks,
> Victoria
>
> On Wed, Mar 15, 2023 at 7:16 PM Matthias J. Sax  wrote:
>
>> Thanks for the KIP! Great to see a first step towards using the new
>> versioned stores!
>>
>> I think the described tradeoffs make sense and I like make a pragmatic
>> step into the right direction, and avoid boiling the ocean. Thus, I
>> agree to the proposed solution.
>>
>> One minor thing, that I believe just need clarification in the KIP (does
>> not seem to be a change to the KIP itself):
>>
>> For stream-table joins, I think we need to elaborate that a `get(k, ts)`
>> call now might return `null` if the history retention of the store is
>> too short. For inner-joins it would result in no output record (ie,
>> stream input record is dropped). Would be good to have it mentioned in
>> the KIP explicitly.
>>
>> We should also discuss how left-joins should work for this case. I think
>> it's ok (better) to include the stream record in the result if the
>> lookup returns `null` -- either because no key exist in the exiting
>> history for the provided timestamp, or (the actual case in question)
>> because we query older than available history. If you agree, can we add
>> this to the KIP?
>>
>> For left-table-table joins, there seems to be no special impact, but it
>> should be called out, too. The lookup itself does not go into the
>> history of the table so no change here (as we don't have the "query
>> older than history case") -- and for out-of-order records, we just
>> "drop" them anyway, so no change for left-joins either I believe.
>>
>>
>> -Matthias
>>
>>
>>
>> On 3/15/23 2:00 PM, Guozhang Wang wrote:
>> > Sounds good to me. Thanks!
>> >
>> > On Wed, Mar 15, 20

Re: [DISCUSS] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-16 Thread Victoria Xia
Thanks for your comments, Matthias!

> For stream-table joins, I think we need to elaborate that a `get(k, ts)`
call now might return `null` if the history retention of the store is too
short.

Great callout -- I agree we should definitely clarify this in the KIP and
mention it in the eventual docs as well.

When a call to `get(k, ts)` returns null, there's not really a good way to
distinguish whether it's because the timestamp is outside of the store's
history retention or if it's because there's actually no record version for
the key at the specified timestamp. Determining this from the processor
would require (1) exposing the store's history retention to the processor,
and (2) reconciling the fact that state stores today (including the new
versioned store implementation) track their own observed stream time
separate from processor time.

In light of this, I think your proposal to treat a null from `get(k, ts)`
due to history retention having been exceeded the same as we'd treat any
other null makes sense, and is also our only viable option right now. I'll
call this out in the docs so users are aware that their choice of history
retention has this implication.

> For left-table-table joins, there seems to be no special impact, but it
should be called out, too. The lookup itself does not go into the history
of the table so no change here (as we don't have the "query older than
history case")

Yup, we're on the same page. Using a versioned store for table-table joins
results in the semantic change that the join result will include the
latest-by-timestamp record rather than the latest-by-offset record, but no
timestamped lookups (i.e., `get(key, ts)` calls) are used in the process so
there is no concern about history retention having elapsed and affecting
join results. (The only implication of history retention for this use case
is indirect, since history retention doubles as grace period for the store.
Because grace period is per store instance, which has task-level
granularity, that means if grace period is set too low then the latest
record for one key could be dropped from the store if another key has
already advanced the store's observed stream time past the grace period by
the time that this record is seen.)

I will update the KIP with these additional notes.

Thanks,
Victoria

On Wed, Mar 15, 2023 at 7:16 PM Matthias J. Sax  wrote:

> Thanks for the KIP! Great to see a first step towards using the new
> versioned stores!
>
> I think the described tradeoffs make sense and I like make a pragmatic
> step into the right direction, and avoid boiling the ocean. Thus, I
> agree to the proposed solution.
>
> One minor thing, that I believe just need clarification in the KIP (does
> not seem to be a change to the KIP itself):
>
> For stream-table joins, I think we need to elaborate that a `get(k, ts)`
> call now might return `null` if the history retention of the store is
> too short. For inner-joins it would result in no output record (ie,
> stream input record is dropped). Would be good to have it mentioned in
> the KIP explicitly.
>
> We should also discuss how left-joins should work for this case. I think
> it's ok (better) to include the stream record in the result if the
> lookup returns `null` -- either because no key exist in the exiting
> history for the provided timestamp, or (the actual case in question)
> because we query older than available history. If you agree, can we add
> this to the KIP?
>
> For left-table-table joins, there seems to be no special impact, but it
> should be called out, too. The lookup itself does not go into the
> history of the table so no change here (as we don't have the "query
> older than history case") -- and for out-of-order records, we just
> "drop" them anyway, so no change for left-joins either I believe.
>
>
> -Matthias
>
>
>
> On 3/15/23 2:00 PM, Guozhang Wang wrote:
> > Sounds good to me. Thanks!
> >
> > On Wed, Mar 15, 2023 at 12:07 PM Victoria Xia
> >  wrote:
> >>
> >> Thanks for kicking off the discussion, John and Guozhang!
> >>
> >>> Just one thing that might be out of scope: if users want to enable the
> >> versioned table feature across the topology, should we allow them to do
> it
> >> via a single config rather than changing the materialized object at each
> >> place?
> >>
> >> Yes, I think this would be a great usability improvement and am in
> favor of
> >> introducing such a config. As long as the config defaults to using
> >> unversioned stores (which makes sense anyway), there will be no
> >> compatibility concerns with introducing the config in a future release.
> >> It's out of scope for this particular KIP as a result, but can
> hopefully be
> >>

Re: [DISCUSS] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-15 Thread Victoria Xia
Thanks for kicking off the discussion, John and Guozhang!

> Just one thing that might be out of scope: if users want to enable the
versioned table feature across the topology, should we allow them to do it
via a single config rather than changing the materialized object at each
place?

Yes, I think this would be a great usability improvement and am in favor of
introducing such a config. As long as the config defaults to using
unversioned stores (which makes sense anyway), there will be no
compatibility concerns with introducing the config in a future release.
It's out of scope for this particular KIP as a result, but can hopefully be
introduced as part of the next release after 3.5.

Best,
Victoria

On Wed, Mar 15, 2023 at 10:49 AM Guozhang Wang 
wrote:

> Thanks Victoria for the great writeup, with a thorough analysis and
> trade-offs. I do not have any major questions about the proposal.
>
> Just one thing that might be out of scope: if users want to enable the
> versioned table feature across the topology, should we allow them to
> do it via a single config rather than changing the materialized object
> at each place? Maybe we can defer that for future discussions, but
> just want to hear your thoughts.
>
> Anyways, I think this proposal is great just as-is even if we agree to
> do the configuration improvement later.
>
>
> Guozhang
>
> On Thu, Mar 9, 2023 at 7:52 PM John Roesler  wrote:
> >
> > Thanks for the KIP, Victoria!
> >
> > I had some questions/concerns, but you addressed them in the Rejected
> Alternatives section. Thanks for the thorough proposal!
> >
> > -John
> >
> > On Thu, Mar 9, 2023, at 18:59, Victoria Xia wrote:
> > > Hi everyone,
> > >
> > > I have a proposal for updating Kafka Streams's stream-table join and
> > > table-table join semantics for the new versioned key-value state stores
> > > introduced in KIP-889
> > > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >.
> > > Would love to hear your thoughts and suggestions.
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> > >
> > > Thanks,
> > > Victoria
>


[DISCUSS] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-09 Thread Victoria Xia
Hi everyone,

I have a proposal for updating Kafka Streams's stream-table join and
table-table join semantics for the new versioned key-value state stores
introduced in KIP-889
.
Would love to hear your thoughts and suggestions.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores

Thanks,
Victoria


[jira] [Created] (KAFKA-14723) Do not write expired store records to changelog

2023-02-15 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14723:


 Summary: Do not write expired store records to changelog
 Key: KAFKA-14723
 URL: https://issues.apache.org/jira/browse/KAFKA-14723
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Victoria Xia


Window stores and versioned stores both have concepts of "retention" and 
"expiration." Records which are expired are not written to the store, e.g., 
[this 
example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
 for segments stores. However, these expired records are still written to the 
changelog topic, in the case of persistent stores. This does not cause any 
problems because the records are once again omitted from the store during 
restore, but it is inefficient. It'd be good to avoid writing expired records 
to the changelog topic in the first place. Another benefit is that doing so 
would allow us to simplify the restoration code for versioned stores (see 
[relevant 
discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because 
the whether records are expired or not is only tracked at the innermost store 
layer, and not any of the outer store layers such as the changelogging layer. 
The innermost store layer keeps its own `observedStreamTime` which is advanced 
on calls to put() and during restoration, and uses this variable to determine 
when a record is expired. Because the return type from put() is void, the 
changelogging layer has no way to tell whether the inner store's put() actually 
put the record or dropped it as expired, and always writes to the changelog 
topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record 
was actually put or not, or
 * move the logic for determining when a record is expired into an outer store 
layer, or
 * reorder/restructure the wrapped store layers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-889 Versioned State Stores

2023-02-13 Thread Victoria Xia
Hi everyone,

I have just pushed two minor amendments to KIP-889:

   - Updated the versioned store specification to clarify that the *"history
   retention" parameter is also used as "grace period,"* which means that
   writes (including inserts, updates, and deletes) to the store will not be
   accepted if the associated timestamp is older than the store's grace period
   (i.e., history retention) relative to the current observed stream time.
  - Additional context: previously, the KIP was not explicit about
  if/when old writes would no longer be accepted. The reason for
enforcing a
  strict grace period after which writes will no longer be accepted is
  because otherwise tombstones must be retained indefinitely -- if
the latest
  value for a key is a very old tombstone, we would not be able to
expire it
  from the store because if there’s an even older non-null put to the store
  later, then without the tombstone the store would accept this
write as the
  latest value for the key, even though it isn't. In the spirit of
not adding
  more to this KIP which has already been accepted, I do not propose to add
  additional interfaces to allow users to configure grace period separately
  from history retention at this time. Such options can be introduced in a
  future KIP in a backwards-compatible way.
   - Added a *new method to TopologyTestDriver* for getting a versioned
   store: getVersionedKeyValueStore().
  - This new method is analogous to existing methods for other types of
  stores, and its previous omission from the KIP was an oversight.

If there are no concerns / objections, then perhaps these updates are minor
enough that we can proceed without re-voting.

Happy to discuss,
Victoria

On Wed, Dec 21, 2022 at 8:22 AM Victoria Xia 
wrote:

> Hi everyone,
>
> We have 3 binding and 1 non-binding vote in favor of this KIP (and no
> objections) so KIP-889 is now accepted.
>
> Thanks for voting, and for your excellent comments in the KIP discussion
> thread!
>
> Happy holidays,
> Victoria
>
> On Tue, Dec 20, 2022 at 12:24 PM Sagar  wrote:
>
>> Hi Victoria,
>>
>> +1 (non-binding).
>>
>> Thanks!
>> Sagar.
>>
>> On Tue, Dec 20, 2022 at 1:39 PM Bruno Cadonna  wrote:
>>
>> > Hi Victoria,
>> >
>> > Thanks for the KIP!
>> >
>> > +1 (binding)
>> >
>> > Best,
>> > Bruno
>> >
>> > On 19.12.22 20:03, Matthias J. Sax wrote:
>> > > +1 (binding)
>> > >
>> > > On 12/15/22 1:27 PM, John Roesler wrote:
>> > >> Thanks for the thorough KIP, Victoria!
>> > >>
>> > >> I'm +1 (binding)
>> > >>
>> > >> -John
>> > >>
>> > >> On 2022/12/15 19:56:21 Victoria Xia wrote:
>> > >>> Hi all,
>> > >>>
>> > >>> I'd like to start a vote on KIP-889 for introducing versioned
>> key-value
>> > >>> state stores to Kafka Streams:
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
>> > >>>
>> > >>> The discussion thread has been open for a few weeks now and has
>> > >>> converged
>> > >>> among the current participants.
>> > >>>
>> > >>> Thanks,
>> > >>> Victoria
>> > >>>
>> >
>>
>


Re: [VOTE] KIP-889 Versioned State Stores

2022-12-21 Thread Victoria Xia
Hi everyone,

We have 3 binding and 1 non-binding vote in favor of this KIP (and no
objections) so KIP-889 is now accepted.

Thanks for voting, and for your excellent comments in the KIP discussion
thread!

Happy holidays,
Victoria

On Tue, Dec 20, 2022 at 12:24 PM Sagar  wrote:

> Hi Victoria,
>
> +1 (non-binding).
>
> Thanks!
> Sagar.
>
> On Tue, Dec 20, 2022 at 1:39 PM Bruno Cadonna  wrote:
>
> > Hi Victoria,
> >
> > Thanks for the KIP!
> >
> > +1 (binding)
> >
> > Best,
> > Bruno
> >
> > On 19.12.22 20:03, Matthias J. Sax wrote:
> > > +1 (binding)
> > >
> > > On 12/15/22 1:27 PM, John Roesler wrote:
> > >> Thanks for the thorough KIP, Victoria!
> > >>
> > >> I'm +1 (binding)
> > >>
> > >> -John
> > >>
> > >> On 2022/12/15 19:56:21 Victoria Xia wrote:
> > >>> Hi all,
> > >>>
> > >>> I'd like to start a vote on KIP-889 for introducing versioned
> key-value
> > >>> state stores to Kafka Streams:
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> > >>>
> > >>> The discussion thread has been open for a few weeks now and has
> > >>> converged
> > >>> among the current participants.
> > >>>
> > >>> Thanks,
> > >>> Victoria
> > >>>
> >
>


Re: [DISCUSS] KIP-889 Versioned State Stores

2022-12-20 Thread Victoria Xia
Hi Bruno,

Thanks for reviewing the KIP, and for voting!

> I would make the constructor public and remove the static method make().

You are right. The static factory method is not providing much benefit for
the VersionedRecord class so I will remove it in order to simplify the
class.

Best,
Victoria

On Mon, Dec 19, 2022 at 9:34 AM Bruno Cadonna  wrote:

> Hi Victoria,
>
> I am +1 on the KIP. I just have one minor comment:
> Why do we need method
>
> public static  VersionedRecord make(final V value, final long
> timestamp)
>
> in the VersionedRecord?
>
> The public constructor would do exactly the same, wouldn't it?
>
> I would make the constructor public and remove the static method make().
>
> Best,
> Bruno
>
> On 15.12.22 20:58, Victoria Xia wrote:
> > Thanks again for the great discussion, Sagar, Bruno, and Matthias. I've
> > just sent a message to start the vote on this KIP. Please have a look
> when
> > you get the chance.
> >
> > Thanks,
> > Victoria
> >
> > On Wed, Dec 14, 2022 at 12:28 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks for clarifying about the null-question. SGTM.
> >>
> >> On 12/13/22 3:06 PM, Victoria Xia wrote:
> >>> Hi Matthias,
> >>>
> >>> Thanks for chiming in! Barring objections from anyone on this thread, I
> >>> will start the vote for this KIP on Thursday. That should be enough
> time
> >> to
> >>> incorporate any lingering minor changes.
> >>>
> >>>> I slightly prefer to add `VersionedRecord` interface (also
> >>> like the name). I agree that it's low overhead and providing a clean
> >>> path forward for future changes seems worth it to me.
> >>>
> >>> OK, that makes two of us. I updated the KIP just now to formally
> include
> >>> VersionedRecord as the new return type from the various
> >>> VersionedKeyValueStore methods.
> >>>
> >>>> if we introduce `VersionedRecord`, I think we can keep the not-null
> >>> requirement for `ValueAndTimestamp`
> >>>
> >>> Not quite. VersionedRecord is only used as a return type from read
> >> methods,
> >>> which is why VersionedRecord is able to enforce that its value is never
> >>> null. If the value being returned would have been null, then we return
> a
> >>> null VersionedRecord instead, rather than non-null VersionedRecord with
> >>> null value. So, there's no use case for a VersionedRecord with null
> >> value.
> >>>
> >>> In contrast, even though ValueAndTimestamp is not anywhere in the
> public
> >>> VersionedKeyValueStore interface, ValueAndTimestamp still needs to be
> >> used
> >>> internally when representing a versioned key-value store as a
> >>> TimestampedKeyValueStore, since TimestampedKeyValueStore is used
> >> everywhere
> >>> throughout the internals of the codebase. In order to represent a
> >> versioned
> >>> key-value store as a TimestampedKeyValueStore, we have to support
> `put(K
> >>> key, ValueAndTimestamp value)`, which means ValueAndTimestamp needs
> to
> >>> support null value (with timestamp). Otherwise we cannot put a
> tombstone
> >>> into a versioned key-value store when using the internal
> >>> TimestampedKeyValueStore representation.
> >>>
> >>> It's very much an implementation detail that ValueAndTimestamp needs to
> >> be
> >>> relaxed to allow null values. I think this is a minor enough change
> that
> >> is
> >>> still preferable to the alternatives (refactoring the processors to not
> >>> require TimestampedKeyValueStore, or introducing a separate workaround
> >>> `put()` method on the TimestampedKeyValueStore representation of
> >> versioned
> >>> key-value stores), so I have left it in as part of the KIP.
> >>>
> >>> Best,
> >>> Victoria
> >>>
> >>> On Mon, Dec 12, 2022 at 8:42 PM Matthias J. Sax 
> >> wrote:
> >>>
> >>>> Thanks Victoria.
> >>>>
> >>>> I did not re-read the KIP in full on the wiki but only your email.
> >>>>
> >>>> Points (1)-(8) SGTM.
> >>>>
> >>>> About (9): I slightly prefer to add `VersionedRecord` interface (also
> >>>> like the name). I agree that it's low overhead and providing a clean
> >>>> path forward for future changes seems worth i

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-12-15 Thread Victoria Xia
Thanks again for the great discussion, Sagar, Bruno, and Matthias. I've
just sent a message to start the vote on this KIP. Please have a look when
you get the chance.

Thanks,
Victoria

On Wed, Dec 14, 2022 at 12:28 PM Matthias J. Sax  wrote:

> Thanks for clarifying about the null-question. SGTM.
>
> On 12/13/22 3:06 PM, Victoria Xia wrote:
> > Hi Matthias,
> >
> > Thanks for chiming in! Barring objections from anyone on this thread, I
> > will start the vote for this KIP on Thursday. That should be enough time
> to
> > incorporate any lingering minor changes.
> >
> >> I slightly prefer to add `VersionedRecord` interface (also
> > like the name). I agree that it's low overhead and providing a clean
> > path forward for future changes seems worth it to me.
> >
> > OK, that makes two of us. I updated the KIP just now to formally include
> > VersionedRecord as the new return type from the various
> > VersionedKeyValueStore methods.
> >
> >> if we introduce `VersionedRecord`, I think we can keep the not-null
> > requirement for `ValueAndTimestamp`
> >
> > Not quite. VersionedRecord is only used as a return type from read
> methods,
> > which is why VersionedRecord is able to enforce that its value is never
> > null. If the value being returned would have been null, then we return a
> > null VersionedRecord instead, rather than non-null VersionedRecord with
> > null value. So, there's no use case for a VersionedRecord with null
> value.
> >
> > In contrast, even though ValueAndTimestamp is not anywhere in the public
> > VersionedKeyValueStore interface, ValueAndTimestamp still needs to be
> used
> > internally when representing a versioned key-value store as a
> > TimestampedKeyValueStore, since TimestampedKeyValueStore is used
> everywhere
> > throughout the internals of the codebase. In order to represent a
> versioned
> > key-value store as a TimestampedKeyValueStore, we have to support `put(K
> > key, ValueAndTimestamp value)`, which means ValueAndTimestamp needs to
> > support null value (with timestamp). Otherwise we cannot put a tombstone
> > into a versioned key-value store when using the internal
> > TimestampedKeyValueStore representation.
> >
> > It's very much an implementation detail that ValueAndTimestamp needs to
> be
> > relaxed to allow null values. I think this is a minor enough change that
> is
> > still preferable to the alternatives (refactoring the processors to not
> > require TimestampedKeyValueStore, or introducing a separate workaround
> > `put()` method on the TimestampedKeyValueStore representation of
> versioned
> > key-value stores), so I have left it in as part of the KIP.
> >
> > Best,
> > Victoria
> >
> > On Mon, Dec 12, 2022 at 8:42 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks Victoria.
> >>
> >> I did not re-read the KIP in full on the wiki but only your email.
> >>
> >> Points (1)-(8) SGTM.
> >>
> >> About (9): I slightly prefer to add `VersionedRecord` interface (also
> >> like the name). I agree that it's low overhead and providing a clean
> >> path forward for future changes seems worth it to me. Btw: if we
> >> introduce `VersionedRecord`, I think we can keep the not-null
> >> requirement for `ValueAndTimestamp` what seems a small side benefit.
> >> (Btw: your code snippet in the KIP shows what `VersionedRecord` would
> >> have a non-null requirement for the value, but I think it would need to
> >> allow null as value?)
> >>
> >>
> >> -Matthias
> >>
> >> On 12/7/22 5:23 PM, Victoria Xia wrote:
> >>> Thanks for the discussion, Bruno, Sagar, and Matthias!
> >>>
> >>> It seems we've reached consensus on almost all of the discussion
> points.
> >>> I've updated the KIP with the following:
> >>> 1) renamed "timestampTo" in `get(key, timestampTo)` to "asOfTimestamp"
> to
> >>> clarify that this timestamp bound is inclusive, per the SQL guideline
> >> that
> >>> "AS OF " queries are inclusive. In the future, if we want to
> >>> introduce a timestamp range query, we can use `get(key, timestampFrom,
> >>> timestampTo)` and specify that timestampTo is exclusive in this method,
> >>> while avoiding confusing with the inclusive asOfTimestamp parameter in
> >> the
> >>> other method, given that the names are different.
> >>> 2) added a description of "history retention" semantics into the
> &

[VOTE] KIP-889 Versioned State Stores

2022-12-15 Thread Victoria Xia
Hi all,

I'd like to start a vote on KIP-889 for introducing versioned key-value
state stores to Kafka Streams:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores

The discussion thread has been open for a few weeks now and has converged
among the current participants.

Thanks,
Victoria


[jira] [Created] (KAFKA-14491) Introduce Versioned Key-Value Stores to Kafka Streams

2022-12-14 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14491:


 Summary: Introduce Versioned Key-Value Stores to Kafka Streams
 Key: KAFKA-14491
 URL: https://issues.apache.org/jira/browse/KAFKA-14491
 Project: Kafka
  Issue Type: Improvement
Reporter: Victoria Xia
Assignee: Victoria Xia


The key-value state stores used by Kafka Streams today maintain only the latest 
value associated with each key. In order to support applications which require 
access to older record versions, Kafka Streams should have versioned state 
stores. Versioned state stores are similar to key-value stores except they can 
store multiple record versions for a single key. An example use case for 
versioned key-value stores is in providing proper temporal join semantics for 
stream-tables joins with regards to out-of-order data.

See KIP for more: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-889 Versioned State Stores

2022-12-13 Thread Victoria Xia
Hi Matthias,

Thanks for chiming in! Barring objections from anyone on this thread, I
will start the vote for this KIP on Thursday. That should be enough time to
incorporate any lingering minor changes.

> I slightly prefer to add `VersionedRecord` interface (also
like the name). I agree that it's low overhead and providing a clean
path forward for future changes seems worth it to me.

OK, that makes two of us. I updated the KIP just now to formally include
VersionedRecord as the new return type from the various
VersionedKeyValueStore methods.

> if we introduce `VersionedRecord`, I think we can keep the not-null
requirement for `ValueAndTimestamp`

Not quite. VersionedRecord is only used as a return type from read methods,
which is why VersionedRecord is able to enforce that its value is never
null. If the value being returned would have been null, then we return a
null VersionedRecord instead, rather than non-null VersionedRecord with
null value. So, there's no use case for a VersionedRecord with null value.

In contrast, even though ValueAndTimestamp is not anywhere in the public
VersionedKeyValueStore interface, ValueAndTimestamp still needs to be used
internally when representing a versioned key-value store as a
TimestampedKeyValueStore, since TimestampedKeyValueStore is used everywhere
throughout the internals of the codebase. In order to represent a versioned
key-value store as a TimestampedKeyValueStore, we have to support `put(K
key, ValueAndTimestamp value)`, which means ValueAndTimestamp needs to
support null value (with timestamp). Otherwise we cannot put a tombstone
into a versioned key-value store when using the internal
TimestampedKeyValueStore representation.

It's very much an implementation detail that ValueAndTimestamp needs to be
relaxed to allow null values. I think this is a minor enough change that is
still preferable to the alternatives (refactoring the processors to not
require TimestampedKeyValueStore, or introducing a separate workaround
`put()` method on the TimestampedKeyValueStore representation of versioned
key-value stores), so I have left it in as part of the KIP.

Best,
Victoria

On Mon, Dec 12, 2022 at 8:42 PM Matthias J. Sax  wrote:

> Thanks Victoria.
>
> I did not re-read the KIP in full on the wiki but only your email.
>
> Points (1)-(8) SGTM.
>
> About (9): I slightly prefer to add `VersionedRecord` interface (also
> like the name). I agree that it's low overhead and providing a clean
> path forward for future changes seems worth it to me. Btw: if we
> introduce `VersionedRecord`, I think we can keep the not-null
> requirement for `ValueAndTimestamp` what seems a small side benefit.
> (Btw: your code snippet in the KIP shows what `VersionedRecord` would
> have a non-null requirement for the value, but I think it would need to
> allow null as value?)
>
>
> -Matthias
>
> On 12/7/22 5:23 PM, Victoria Xia wrote:
> > Thanks for the discussion, Bruno, Sagar, and Matthias!
> >
> > It seems we've reached consensus on almost all of the discussion points.
> > I've updated the KIP with the following:
> > 1) renamed "timestampTo" in `get(key, timestampTo)` to "asOfTimestamp" to
> > clarify that this timestamp bound is inclusive, per the SQL guideline
> that
> > "AS OF " queries are inclusive. In the future, if we want to
> > introduce a timestamp range query, we can use `get(key, timestampFrom,
> > timestampTo)` and specify that timestampTo is exclusive in this method,
> > while avoiding confusing with the inclusive asOfTimestamp parameter in
> the
> > other method, given that the names are different.
> > 2) added a description of "history retention" semantics into the
> > VersionedKeyValueStore interface Javadoc, and updated the Javadoc for
> > `get(key, asOfTimestamp)` to mention explicitly that a null result is
> > returned if the provided timestamp bound is not within history retention.
> > 3) added a `delete(key, timestamp)` method (with return type
> > `ValueAndTimestamp`) to the VersionedKeyValueStore interface.
> > 4) updated the Javadoc for `segmentInterval` to clarify that the only
> > reason a user might be interested in this parameter is performance.
> >
> > Other points we discussed which did not result in updates include:
> > 5) whether to automatically update the `min.compaction.lag.ms` config on
> > changelog topics when history retention is changed -- there's support for
> > this but let's not bundle it with this KIP. We can have a separate KIP to
> > change this behavior for the existing windowed changelog topics, in
> > addition to versioned changelog topics.
> > 6) should we expose segmentInterval in this KIP -- let's go ahead and
> > expose it now since we'll almost certainly expo

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-12-07 Thread Victoria Xia
ug-in custom implementations
> into the DSL later? -- I guess, having a stricter contract initially,
> and relaxing it later if necessary, is the easier was forward, than the
> other way around.
>
> For PAPI users, they are not bound to implement the interface anyway and
> can just add any store they like by extending the top level `StateStore`
> interface.
>
>
>
> (4) About `segmentInterval`: I am personally fine both ways. Seems it's
> your call to expose it or not. It seems there is a slight preference to
> expose it.
>
>
>
> (5) About `validTo`: based on my experience, it's usually simpler to
> have it exclusive. It's also how it's defined in "system versioned
> temporal tables" in the SQL standard, and how `AS OF ` queries work.
>
> For a join, it of course implies that if a table record has [100,200) as
> inclusive `validFrom=100` and exclusive `validTo=200` it would only join
> with a stream-side record with 100 <= ts <= 199 (or 100 <= ts < 200 :)).
>
> I would strongly advocate to make the upper bound exclusive (it did
> serve us well in the past to align to SQL semantics). It must be clearly
> documented of course and we can also name variable accordingly if
> necessary.
>
>
>
> (6) About including `validTo` in return types -- it's not easy to change
> the return type, because the signature of a method is only determined by
> it's name in input parameter types, ie, we cannot overload an existing
> method to just change the return type, but would need to change its name
> or parameter list... Not sure if we can or cannot add `validTo` to
> `ValueAndTimestamp` though, but it's a tricky question. Would be good to
> get some more input from other if we think that it would be important
> enough to worry about it now or not.
>
>
>
> (7) About `get(k)` vs `get(k, ts)` vs `getAsOf(k, ts)`: I would prefer
> to just keep `get()` with two overloads and not add `getAsOf()`; the
> fact that we pass in a timestamp implies we have a point in time query.
> (It's cleaner API design to leverage method overloads IMHO, and it's
> what we did in the past). Of course, we can name the parameter `get(key,
> asOfTimestamp)` if we think it's helpful. And in alignment to have
> `validTo` exclusive, `validTo` would be `asOfTimestampe+1` (or larger),
> in case we return it.
>
>
>
> (8) About updating topic config (ie, history retention and compaction
> lag): It think it was actually some oversight to not update topic
> configs if the code changes. There is actually a Jira ticket about it. I
> would prefer to keep the behavior consistent though and not change it
> just for the new versioned-store, but change it globally in one shot
> independent of this KIP.
>
>
> -Matthias
>
>
>
> On 12/1/22 10:15 AM, Sagar wrote:
> > Thanks Victoria,
> >
> > I guess an advantage of exposing a method like delete(key, timestamp)
> could
> > be that from a user's standpoint, it is a single operation and not 2. The
> > equivalent of this method i.e put followed by get is not atomic so
> exposing
> > it certainly sounds like a good idea.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Nov 29, 2022 at 1:15 AM Victoria Xia
> >  wrote:
> >
> >> Thanks, Sagar and Bruno, for your insights and comments!
> >>
> >>> Sagar: Can we name according to the semantics that you want to
> >> support like `getAsOf` or something like that? I am not sure if we do
> that
> >> in our codebase though. Maybe the experts can chime in.
> >>
> >> Because it is a new method that will be added, we should be able to
> name it
> >> whatever we like. I agree `getAsOf` is more clear, albeit wordier.
> >> Introducing `getAsOf(key, timestamp)` means we could leave open
> `get(key,
> >> timeFrom, timeTo)` to have an exclusive `timeTo` without introducing a
> >> collision. (We could introduce `getBetween(key, timeFrom, timeTo)`
> instead
> >> to delineate even more clearly, though this is better left for a future
> >> KIP.)
> >>
> >> I don't think there's any existing precedent in codebase to follow here
> but
> >> I'll leave that to the experts. Curious to hear what others prefer as
> well.
> >>
> >>> Sagar: With delete, we would stlll keep the older versions of the key
> >> right?
> >>
> >> We could certainly choose this for the semantics of delete(...) -- and
> it
> >> sounds like we should too, based on Bruno's confirmation below that this
> >> feels more natural to him as well -- but as Bruno noted in his message
> >> below I think we'll want the me

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-28 Thread Victoria Xia
Hopefully Matthias (or others) has context, otherwise I will have a
closer look.

- Victoria

On Wed, Nov 23, 2022 at 8:52 AM Bruno Cadonna  wrote:

> Hi all,
>
> Thanks for the KIP, Victoria!
>
> I have a couple of comments.
>
> 1. delete(key)
> I think delete(key) should not remove all versions of a key. We should
> use it to close the validity interval of the last version.
> Assuming we have records of different versions for key A:
> (A, e, 0, 2),
> (A, f, 2, 3),
> (A, g, 3, MAX)
>
> delete(A) would update them to
>
> (A, e, 0, 2),
> (A, f, 2, 3),
> (A, g, 3, 5)
> (A, null, 5, MAX)
>
> But then the question arises where does timestamp 5 that closes the
> interval in (A, g, 3, 5) and opens the interval in (A, null, 5, MAX)
> come from. We could use the timestamp at which delete(A) is called, but
> actually I do not like that because it seems to me it opens the doors to
> non-determinism. If we use event time for put() we should also use it
> for delete(). Actually, put(A, null, 5) would have the same effect as
> delete(A) in the example above. As a syntactical sugar, we could add
> delete(key, validFrom). (I just realized now that I just repeated what
> Victoria said in her previous e-mail.)
> I agree with Victoria that delete(A) as defined for other state stores
> is hard to re-use in the versioned key-value store.
> I would also not change the semantics so that it deletes all versions of
> a key. I would rather add a new method purge(key) or
> deleteAllVersions(key) or similar if we want to have such a method in
> this first KIP.
>
>
> 2. history retention
> I would remove "(up to store implementation discretion when this is the
> case)". I would treat the history retention as a strict limit. If users
> want to implement a less strict behavior, they can still do it. Maybe
> mention in the javadocs the implications of not adhering strictly to the
> history retention. That is, the DSL might become non-deterministic. You
> could also add historyRetentionMs() to the VersionedKeyValueStore
> interface to make the concept of the history retention part of the
> interface.
>
> 3. null vs. exception for out-of-bound queries
> I am in favor of null. The record version is not there anymore because
> it expired. This seems to me normal and nothing exceptional. That would
> also consistent with the behavior of other APIs as already mentioned.
>
>
> 4. Exposing segmentInterval
> Since we have evidence that the segment interval affects performance, I
> would expose it. But I find it also OK to expose it once we have a
> corresponding metric.
>
> 5. exclusive vs inclusive regarding validTo timestamp in get()
> Doesn't this decision depend on the semantics of the join for which this
> state store should be used? Should a record on the table side that has
> the same timestamp as the record on the stream side join? Or should only
> records in the table that are strictly before the record on the stream
> side join?
>
>
> 6. Not setting min.compaction.lag.ms during rebalances
> If Streams does not update min.compaction.lag.ms during rebalances,
> users have to do it each time they change history retention in the code,
> right? That seems odd to me. What is the actual reason for not updating
> the config? How does Streams handle updates to windowed stores? That
> should be a similar situation for the retention time config of the
> changelog topic.
>
>
> Best,
> Bruno
>
>
>
> On 23.11.22 09:11, Sagar wrote:
> > Hi Vicky,
> >
> > Thanks for your response!
> >
> > I would just use numbers to refer to your comments.
> >
> > 1) Thanks for your response. Even I am not totally sure whether these
> > should be supported via IQv2 or via store interface. That said, I
> wouldn't
> > definitely qualify this as  blocking the KIP for sure so we can live
> > without it :)
> >
> > 2) Yeah if the 2 APIs for get have different semantics for timestampTo,
> > then it could be confusing. I went through the link for temporal tables
> > (TFS!) and I now get why the AS OF semantics would have it inclusive. I
> > think part of the problem is that the name get on it's own is not as
> > expressive as SQL. Can we name according to the semantics that you want
> to
> > support like `getAsOf` or something like that? I am not sure if we do
> that
> > in our codebase though. Maybe the experts can chime in.
> >
> > 3) hmm I would have named it `validUpto` But again not very picky about
> it.
> > After going through the link and your KIP, it's a lot clearer to me.
> >
> > 4) I think delete(key) should be sufficient. With delete, we would
> > stlll keep the

Re: [DISCUSS] KIP-889 Versioned State Stores

2022-11-22 Thread Victoria Xia
ment is to align to existing behavior? Or do we have
> > case for which the current behavior is problematic?
> >
> >   (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
> > discretion when this is the case)" -> Should we make it a stricter
> > contract such that the user can reason about it better (there is WIP to
> > make retention time a strict bound for windowed stores atm)
> >-> JavaDocs on `persistentVersionedKeyValueStore` seems to suggest a
> > strict bound, too.
> >
> >   (5a) Do we need to expose `segmentInterval`? For windowed-stores, we
> > also use segments but hard-code it to two (it was exposed in earlier
> > versions but it seems not useful, even if we would be open to expose it
> > again if there is user demand).
> >
> >   (5b) JavaDocs says: "Performance degrades as more record versions for
> > the same key are collected in a single segment. On the other hand,
> > out-of-order writes and reads which access older segments may slow down
> > if there are too many segments." -- Wondering if JavaDocs should make
> > any statements about expected performance? Seems to be an implementation
> > detail?
> >
> >   (6) validTo timestamp is "exclusive", right? Ie, if I query
> > `get(key,ts[=validToV1])` I would get `null` or the "next" record v2
> > with validFromV2=ts?
> >
> >   (7) The KIP says, that segments are stores in the same RocksDB -- for
> > this case, how are efficient deletes handled? For windowed-store, we can
> > just delete a full RocksDB.
> >
> >   (8) Rejected alternatives: you propose to not return the validTo
> > timestamp -- if we find it useful in the future to return it, would
> > there be a clean path to change it accordingly?
> >
> >
> > -Matthias
> >
> >
> > On 11/16/22 9:57 PM, Victoria Xia wrote:
> > > Hi everyone,
> > >
> > > I have a proposal for introducing versioned state stores in Kafka
> > Streams.
> > > Versioned state stores are similar to key-value stores except they can
> > > store multiple record versions for a single key. This KIP focuses on
> > > interfaces only in order to limit the scope of the KIP.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> > >
> > > Thanks,
> > > Victoria
> > >
> >
>


[DISCUSS] KIP-889 Versioned State Stores

2022-11-16 Thread Victoria Xia
Hi everyone,

I have a proposal for introducing versioned state stores in Kafka Streams.
Versioned state stores are similar to key-value stores except they can
store multiple record versions for a single key. This KIP focuses on
interfaces only in order to limit the scope of the KIP.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores

Thanks,
Victoria


Re: [VOTE] KIP-775: Custom partitioners in foreign key joins

2021-10-01 Thread Victoria Xia
Thanks, everyone!

We've got four binding +1's (Matthias, Bill, John, Guozhang), one
non-binding +1 (Adam), and no -1's, so I will mark this KIP as accepted.

A PR is ready for review: https://github.com/apache/kafka/pull/11368

Thanks again!

-Victoria

On Wed, Sep 29, 2021 at 10:28 AM Guozhang Wang  wrote:

> +1. Thanks Victoria!
>
> On Tue, Sep 28, 2021 at 2:40 PM John Roesler  wrote:
>
> > +1 (binding)
> >
> > Thanks, Victoria!
> >
> > -John
> >
> > On Tue, Sep 28, 2021, at 16:29, Adam Bellemare wrote:
> > > +1 (non-binding)
> > >
> > > Glad to see this in here :)
> > >
> > > On Tue, Sep 28, 2021 at 5:11 PM Bill Bejeck  wrote:
> > >
> > >> +1 (binding)
> > >>
> > >> On Tue, Sep 28, 2021 at 12:59 PM Matthias J. Sax 
> > wrote:
> > >>
> > >> > +1 (binding)
> > >> >
> > >> > On 9/28/21 8:29 AM, Victoria Xia wrote:
> > >> > > Hi all,
> > >> > >
> > >> > > I'd like to start a vote for KIP-775 for adding Kafka Streams
> > support
> > >> for
> > >> > > foreign key joins on tables with custom partitioners:
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> > >> > >
> > >> > > Thanks,
> > >> > > Victoria
> > >> > >
> > >> >
> > >>
> >
>
>
> --
> -- Guozhang
>


[VOTE] KIP-775: Custom partitioners in foreign key joins

2021-09-28 Thread Victoria Xia
Hi all,

I'd like to start a vote for KIP-775 for adding Kafka Streams support for
foreign key joins on tables with custom partitioners:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins

Thanks,
Victoria


Re: [DISCUSS] KIP-775: Custom partitioners in foreign key joins

2021-09-27 Thread Victoria Xia
gt;>>> default partitioner mechanism" (e.g. in
> >>>> https://issues.apache.org/jira/browse/KAFKA-13261, the issue arises
> >> when a
> >>>> different partitioner which is not based on hashing of the bytes is
> >> used,
> >>>> which still guarantees "the input topic is partitioned by key").
> >>>>
> >>>> So I feel that if we feel the assumption 1) above in Streams should
> >> still
> >>>> hold in the long run, it's not very meaningful to require the source
> >> tables
> >>>> to indicate their partitioners, but only require the FK join operators
> >>>> itself to make sure the co-partition conditions a) and b) above holds.
> >> Of
> >>>> course the easiest way is to require users to pass-in the partitioners
> >> for
> >>>> those two internal topics, which they have to make sure are the same
> >> as the
> >>>> input partitioners. We can also have a bit more complicated approach
> to
> >>>> have some "inheritance" rule for partitioners when they are given at
> >> the
> >>>> sink (we have similar rules for serde inheritance throughout the
> >> topology),
> >>>> but that only fixes for "#through / #repartition" cases, but not fixes
> >> for
> >>>> source "builder#table" cases -- i.e. we still need to require users to
> >>>> indicate partitioners which we can hopefully inherit within the
> >> topology.
> >>>>
> >>>> I agree that for IQ, it would be beneficial if we have the partitioner
> >> for
> >>>> each table/stream entities so that IQ itself does not need the
> >> partitioners
> >>>> to be specified, but to make this fix work, we'd probably need both 1)
> >>>> source table/stream partitioner specification and 2) inheritance rule
> >>>> (otherwise we'd have to enforce users to specify the same for
> >> #repartition
> >>>> etc as well?). Given that rationale, I'm slightly leaning towards the
> >>>> current proposal in the KIP doc, i.e. just fixing for this FK operator
> >>>> only, with the easy approach that requires users themselves to set
> >>>> partitioners accordingly.
> >>>>
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Sep 22, 2021 at 10:29 AM John Roesler 
> >> wrote:
> >>>>
> >>>>> Thanks for the KIP, Victoria!
> >>>>>
> >>>>> This is a great catch. As fas as my memory serves, we
> >>>>> totally missed this case in the design of FK joins.
> >>>>>
> >>>>> tl;dr: I'm wondering if it might be better to instead
> >>>>> introduce a way to specify the partitioning scheme when we
> >>>>> create a KTable and then just use it when we join, rather
> >>>>> than to specify it in the join itself.
> >>>>>
> >>>>> I was initially surprised to see the proposal to add these
> >>>>> partitioners in the join operation itself rather than
> >>>>> inheriting them from the tables on both sides, but I
> >>>>> reviewed the Streams DSL and see that we _cannot_ inherit it
> >>>>> because almost all the time, the input KTables' partitioning
> >>>>> is not known!
> >>>>>
> >>>>> It seems like the only times we specify a partitioner on a
> >>>>> DSL object is:
> >>>>> 1. when we produce to an output topic via KStream#to or
> >>>>> KStream#through.
> >>>>> 2. when we repartition via KStream#repartition
> >>>>>
> >>>>> These are both specifying the partitioner to use in output
> >>>>> operations (ie, we are telling Streams the partition *to
> >>>>> use*); there's currently only one situation in which we have
> >>>>> to _inform_ streams about the partitioning of a KTable or
> >>>>> KStream:
> >>>>> 3. when we issue a key-based query via IQ, we need to know
> >>>>> the partitioner, so the IQ interface allows us to pass in a
> >>>>> custom partitioner with the query.
> >>>>>
> >>>>> This is a bit weird. Taking a step back, the parti

Re: [DISCUSS] KIP-775: Custom partitioners in foreign key joins

2021-09-22 Thread Victoria Xia
Thanks, Matthias.

> > The existing methods which accept Named will be marked for deprecation
in 4.0.

> We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0) and (2)
a KIP could always slip into a future release.

This was actually a typo -- I meant that the methods will be marked for
deprecation with removal scheduled for the next major release (likely 4.0).
I've updated the KIP.


> To me, it seems sufficient to only have two static methods:

> > as(final String name);

> and

> > with(final StreamPartitioner partitioner,
> >  final StreamPartitioner otherPartitioner);

Originally I was mimicking the methods in the existing Joined class, but I
took another look at StreamJoined just now (the more recently added class)
and saw that it indeed has far fewer static methods. I will remove the
`partitioner()` and `otherPartitioner()` static methods as I agree they are
of limited value. I could see `with(partitioner, otherPartitioner, name)`
being useful shorthand but I agree it's not necessary. I will remove it as
well to keep the number of static methods small. Thanks for the suggestions.


Best,
Victoria

On Mon, Sep 20, 2021 at 11:52 PM Matthias J. Sax  wrote:

> Thanks for updating the KIP.
>
> One nit:
>
> > The existing methods which accept Named will be marked for deprecation
> in 4.0.
>
> We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0) and (2)
> a KIP could always slip into a future release.
>
>
> About `TableJoined`: It seems you propose to add static methods for all
> possible parameter combination. We usually try to avoid this to keep the
> number of methods low; if we add too many methods, it defeats the
> purpose to use a "builder like" config object.
>
> To me, it seems sufficient to only have two static methods:
>
> > as(final String name);
>
> and
>
> > with(final StreamPartitioner partitioner,
> >  final StreamPartitioner otherPartitioner);
>
> The second one should allow to pass in `null` to only set one of both
> partitioners.
>
> Curious to hear what other think.
>
>
> -Matthias
>
> On 9/20/21 8:27 PM, Victoria Xia wrote:
> > Hi Matthias,
> >
> > Thanks for having a look at the KIP! I've updated it with your suggestion
> > to introduce a new `TableJoined` object with partitioners of type
> > `StreamPartitioner` and `StreamPartitioner`, and to
> > deprecate the existing FK join methods which accept a `Named` object
> > accordingly. I agree it makes sense to keep the number of join interfaces
> > smaller.
> >
> > Thanks,
> > Victoria
> >
> > On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax 
> wrote:
> >
> >> Thanks for the KIP Victoria.
> >>
> >> As pointed out on the Jira ticket by you, using `` and `` as
> >> partitioner types does not really work, because we don't have access to
> >> the right value on the left side nor have we access to the left value on
> >> the right hand side. -- I like your idea to use `Void` as value types to
> >> make it clear to the users that partitioning must be done on the key
> only.
> >>
> >> For the proposed public API change, I would propose not to pass the
> >> partitioners directly, but to introduce a config object (similar to
> >> `Joined` for stream-table joins, and `StreamJoined` for stream-stream
> >> joins). This new object could also implement `NamedOperation` and thus
> >> replace `Named`. To this end, we would deprecate the existing methods
> >> using `Named` and replace them with the new methods. Net benefit is,
> >> that we don't get more overloads (after we removed the deprecated ones).
> >>
> >> Not sure how we want to call the new object. Maybe `TableJoined` in
> >> alignment to `StreamJoined`?
> >>
> >>
> >> -Matthias
> >>
> >> On 9/15/21 3:36 PM, Victoria Xia wrote:
> >>> Hi,
> >>>
> >>> I've opened a small KIP for adding Kafka Streams support for foreign
> key
> >>> joins on tables with custom partitioners:
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> >>>
> >>> Feedback appreciated. Thanks!
> >>>
> >>> - Victoria
> >>>
> >>
> >
>


Re: [DISCUSS] KIP-775: Custom partitioners in foreign key joins

2021-09-20 Thread Victoria Xia
Hi Matthias,

Thanks for having a look at the KIP! I've updated it with your suggestion
to introduce a new `TableJoined` object with partitioners of type
`StreamPartitioner` and `StreamPartitioner`, and to
deprecate the existing FK join methods which accept a `Named` object
accordingly. I agree it makes sense to keep the number of join interfaces
smaller.

Thanks,
Victoria

On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax  wrote:

> Thanks for the KIP Victoria.
>
> As pointed out on the Jira ticket by you, using `` and `` as
> partitioner types does not really work, because we don't have access to
> the right value on the left side nor have we access to the left value on
> the right hand side. -- I like your idea to use `Void` as value types to
> make it clear to the users that partitioning must be done on the key only.
>
> For the proposed public API change, I would propose not to pass the
> partitioners directly, but to introduce a config object (similar to
> `Joined` for stream-table joins, and `StreamJoined` for stream-stream
> joins). This new object could also implement `NamedOperation` and thus
> replace `Named`. To this end, we would deprecate the existing methods
> using `Named` and replace them with the new methods. Net benefit is,
> that we don't get more overloads (after we removed the deprecated ones).
>
> Not sure how we want to call the new object. Maybe `TableJoined` in
> alignment to `StreamJoined`?
>
>
> -Matthias
>
> On 9/15/21 3:36 PM, Victoria Xia wrote:
> > Hi,
> >
> > I've opened a small KIP for adding Kafka Streams support for foreign key
> > joins on tables with custom partitioners:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> >
> > Feedback appreciated. Thanks!
> >
> > - Victoria
> >
>


[DISCUSS] KIP-775: Custom partitioners in foreign key joins

2021-09-15 Thread Victoria Xia
Hi,

I've opened a small KIP for adding Kafka Streams support for foreign key
joins on tables with custom partitioners:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins

Feedback appreciated. Thanks!

- Victoria


[jira] [Created] (KAFKA-12366) Performance regression in stream-table joins on trunk

2021-02-23 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-12366:


 Summary: Performance regression in stream-table joins on trunk
 Key: KAFKA-12366
 URL: https://issues.apache.org/jira/browse/KAFKA-12366
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Victoria Xia


Stream-table join benchmarks have revealed a significant performance regression 
on trunk as compared to the latest release version. We should investigate as a 
blocker prior to the 2.8 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10008) Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 10.13.6

2020-05-15 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-10008:


 Summary: Symbol not found when running Kafka Streams with RocksDB 
dependency on MacOS 10.13.6
 Key: KAFKA-10008
 URL: https://issues.apache.org/jira/browse/KAFKA-10008
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
 Environment: MacOS 10.13.6
Reporter: Victoria Xia


In bumping the RocksDB dependency version from 5.18.3 to 5.18.4 on trunk, Kafka 
Streams apps that require initializing RocksDB state stores fail on MacOS 
10.13.6 with
{code:java}
dyld: lazy symbol binding failed: Symbol not found: chkstk_darwin
  Referenced from: 
/private/var/folders/y4/v3q4tgb559sb0x6kwpll19bmgn/T/librocksdbjni4028367213086899694.jnilib
 (which was built for Mac OS X 10.15)
  Expected in: /usr/lib/libSystem.B.dylib
{code}
as a result of [https://github.com/facebook/rocksdb/issues/6852]

2.5.0 is unaffected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)