Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-11-30 Thread Colt McNealy
Hi Matthias and everyone—

Some clarification is necessary just for posterity. It turns out that, on a
fresh standby task before we start polling for records, we wouldn't be able
to get the current end offset without a network call. This leaves us three
options:

A) Make it an Optional or use a sentinel value to mark that it's not
present.
B) Perform a network call to get the endOffset when it's not there.
C) Remove it.

Option A) seems like it could be a confusing API, especially because in the
strong majority of cases, the Optional would be empty. Option B) is
undesirable because of the performance considerations—if we're going to
make a network round trip, we might as well get some records back! That
leaves us with option C), which is the least-bad of all of them.

At LittleHorse we actually do care about the endOffset in the
onUpdateStart() method, and having it would be useful to us. However, the
work-around isn't horrible, because the endOffset will be passed into the
first call to onBatchLoaded() , which normally follows onUpdateStart()
within <100ms.

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Nov 30, 2023 at 4:43 PM Matthias J. Sax  wrote:

> > parameter is somewhat irrelevant to our use case
>
> Sounds like a weird justification to change the KIP. Providing more
> information is usually better than less, so it seems it won't hurt to
> just keep it (seems useful in general to get the current end offset in
> this callback) -- you can always ignore it, if it's not relevant for
> your use case.
>
>
> -Matthias
>
>
> On 11/30/23 6:56 AM, Eduwer Camacaro wrote:
> > Hello everyone,
> >
> > We have come to the conclusion, during our work on this KIP's
> > implementation, that the #onUpdateStart callback's "currentEndOffset"
> > parameter is somewhat irrelevant to our use case. When this callback is
> > invoked, I think this value is usually unknown. Our choice to delete this
> > parameter from the #onUpdateStart callback requires an update to the KIP.
> >
> > Please feel free to review the PR and provide any comments you may have.
> :)
> > Thanks in advance
> >
> > Edu-
> >
> > On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks. SGTM.
> >>
> >> On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:
> >>> That all sounds good to me! Thanks for the KIP
> >>>
> >>> On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy 
> >> wrote:
> >>>
> >>>> Hi Sophie, Matthias, Bruno, and Eduwer—
> >>>>
> >>>> Thanks for your patience as I have been scrambling to catch up after a
> >> week
> >>>> of business travel (and a few days with no time to code). I'd like to
> >> tie
> >>>> up some loose ends here, but in short, I don't think the KIP document
> >>>> itself needs any changes (our internal implementation does, however).
> >>>>
> >>>> 1. In the interest of a) not changing the KIP after it's already out
> >> for a
> >>>> vote, and b) making sure our English grammar is "correct", let's stick
> >> with
> >>>> 'onBatchLoaded()`. It is the Store that gets updated, not the Batch.
> >>>>
> >>>> 2. For me (and, thankfully, the community as well) adding a remote
> >> network
> >>>> call at any point in this KIP is a non-starter. We'll ensure that
> >>>> our implementation does not introduce one.
> >>>>
> >>>> 3. I really don't like changing API behavior, even if it's not
> >> documented
> >>>> in the javadoc. As such, I am strongly against modifying the behavior
> of
> >>>> endOffsets() on the consumer as some people may implicitly depend on
> the
> >>>> contract.
> >>>> 3a. The Consumer#currentLag() method gives us exactly what we want
> >> without
> >>>> a network call (current lag from a cache, from which we can compute
> the
> >>>> offset).
> >>>>
> >>>> 4. I have no opinion about whether we should pass endOffset or
> >> currentLag
> >>>> to the callback. Either one has the same exact information inside it.
> In
> >>>> the interest of not changing the KIP after the vote has started, I'll
> >> leave
> >>>> it as endOffset.
> >>>>
> >>>> As such, I believe the KIP doesn't need any updates, nor has it been
> >>>> updated since the vote started.
> >>>>
> >>>> Would anyone else like to discuss somet

Re: [VOTE] KIP-892: Transactional StateStores

2023-11-13 Thread Colt McNealy
+1 (non-binding).

Thank you, Nick, for making all of the changes (especially around the
`default.state.isolation.level` config).

Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Nov 13, 2023 at 7:15 AM Nick Telford  wrote:

> Hi everyone,
>
> I'd like to call a vote for KIP-892: Transactional StateStores[1], which
> makes Kafka Streams StateStores transactional under EOS.
>
> Regards,
>
> Nick
>
> 1:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>


Re: [VOTE] KIP-988 Streams StandbyUpdateListener

2023-10-31 Thread Colt McNealy
Thanks everyone. Now that everyone who commented on the discussion has
voted, I think we can close this vote as one touchdown with a missed field
goal to zero (5 binding, 1 non-binding).

Cheers,
Colt

On Tue, Oct 31, 2023 at 5:35 AM Bruno Cadonna  wrote:

> Hi Colt and Eduwer,
>
> Thanks for the KIP!
>
> +1 (binding)
>
> Best,
> Bruno
>
> On 10/26/23 7:17 PM, Matthias J. Sax wrote:
> > +1 (binding)
> >
> > On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:
> >> Happy to see this -- that's a +1 (binding) from me
> >>
> >> On Mon, Oct 23, 2023 at 6:33 AM Bill Bejeck  wrote:
> >>
> >>> This is a great addition
> >>>
> >>> +1(binding)
> >>>
> >>> -Bill
> >>>
> >>> On Fri, Oct 20, 2023 at 2:29 PM Almog Gavra 
> >>> wrote:
> >>>
> >>>> +1 (non-binding) - great improvement, thanks Colt & Eduwer!
> >>>>
> >>>> On Tue, Oct 17, 2023 at 11:25 AM Guozhang Wang <
> >>> guozhang.wang...@gmail.com
> >>>>>
> >>>> wrote:
> >>>>
> >>>>> +1 from me.
> >>>>>
> >>>>> On Mon, Oct 16, 2023 at 1:56 AM Lucas Brutschy
> >>>>>  wrote:
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> thanks again for the KIP!
> >>>>>>
> >>>>>> +1 (binding)
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Lucas
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Oct 15, 2023 at 9:13 AM Colt McNealy 
> >>>>> wrote:
> >>>>>>>
> >>>>>>> Hello there,
> >>>>>>>
> >>>>>>> I'd like to call a vote on KIP-988 (co-authored by my friend and
> >>>>> colleague
> >>>>>>> Eduwer Camacaro). We are hoping to get it in before the 3.7.0
> >>>> release.
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Colt McNealy
> >>>>>>>
> >>>>>>> *Founder, LittleHorse.dev*
> >>>>>
> >>>>
> >>>
> >>
>


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-29 Thread Colt McNealy
Guozhang—I agree, I am in favor of moving forward with the KIP now that the
Transactional State Stores will be behind a feature flag.

Nick—I just did a bit more light testing of your branch `KIP-892-3.5.0`
with your most recent changes. I couldn't detect a performance difference
versus trunk (in the past there was a slight degradation of performance on
the restoration path, but that has been fixed). I don't believe that your
branch has the state updater thread enabled, so I didn't test that path too
heavily.

As expected, however, our internal correctness tests failed due to the IQ
read-your-own-writes issue we discussed previously. The community as a
whole would vastly benefit from this KIP getting over the finish line in
3.7.0, and so long as it is behind a feature flag so that we at LittleHorse
can still guarantee RYOW for our users, I think it's purely a win for the
community. Until we can figure out how to get read_committed, we will just
be smart with standby's + rebalances etc (:

Thanks Nick! This improvement is long overdue for the streams community.

Colt McNealy

*Founder, LittleHorse.dev*


On Sun, Oct 29, 2023 at 11:30 AM Guozhang Wang 
wrote:

> I'd agree with you guys that as long as we are in agreement about the
> configuration semantics, that would be a big win to move forward for
> this KIP. As for the TaskCorruptedException handling like wiping state
> stores, we can discuss that in the PR rather than in the KIP.
>
> Just to clarify, I'm onboard with the latest proposal, and probably we
> can move on for voting on this KIP now?
>
> Guozhang
>
> On Thu, Oct 19, 2023 at 5:33 AM Bruno Cadonna  wrote:
> >
> > Hi Nick,
> >
> > What you and Lucas wrote about the different configurations of ALOS/EOS
> > and READ_COMMITTED/READ_UNCOMMITTED make sense to me. My earlier
> > concerns about changelogs diverging from the content of the local state
> > stores turned out to not apply. So I think, we can move on with those
> > configurations.
> >
> > Regarding the TaskCorruptedException and wiping out the state stores
> > under EOS, couldn't we abort the transaction on the state store and
> > close the task dirty? If the Kafka transaction was indeed committed, the
> > store would restore the missing part from the changelog topic. If the
> > Kafka transaction was not committed, changelog topic and state store are
> > in-sync.
> >
> > In any case, IMO those are implementation details that we do not need to
> > discuss and solve in the KIP discussion. We can solve them on the PR.
> > The important thing is that the processing guarantees hold.
> >
> > Best,
> > Bruno
> >
> > On 10/18/23 3:56 PM, Nick Telford wrote:
> > > Hi Lucas,
> > >
> > > TaskCorruptedException is how Streams signals that the Task state
> needs to
> > > be wiped, so we can't retain that exception without also wiping state
> on
> > > timeouts.
> > >
> > > Regards,
> > > Nick
> > >
> > > On Wed, 18 Oct 2023 at 14:48, Lucas Brutschy  .invalid>
> > > wrote:
> > >
> > >> Hi Nick,
> > >>
> > >> I think indeed the better behavior would be to retry commitTransaction
> > >> until we risk running out of time to meet `max.poll.interval.ms`.
> > >>
> > >> However, if it's handled as a `TaskCorruptedException` at the moment,
> > >> I would do the same in this KIP, and leave exception handling
> > >> improvements to future work. This KIP is already improving the
> > >> situation a lot by not wiping the state store.
> > >>
> > >> Cheers,
> > >> Lucas
> > >>
> > >> On Tue, Oct 17, 2023 at 3:51 PM Nick Telford 
> > >> wrote:
> > >>>
> > >>> Hi Lucas,
> > >>>
> > >>> Yeah, this is pretty much the direction I'm thinking of going in
> now. You
> > >>> make an interesting point about committing on-error under
> > >>> ALOS/READ_COMMITTED, although I haven't had a chance to think
> through the
> > >>> implications yet.
> > >>>
> > >>> Something that I ran into earlier this week is an issue with the new
> > >>> handling of TimeoutException. Without TX stores, TimeoutException
> under
> > >> EOS
> > >>> throws a TaskCorruptedException, which wipes the stores. However,
> with TX
> > >>> stores, TimeoutException is now just bubbled up and dealt with as it
> is
> > >>> under ALOS. The problem arises when the Producer#commitTransaction
> call
> > >>>

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-25 Thread Colt McNealy
Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a week
of business travel (and a few days with no time to code). I'd like to tie
up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out for a
vote, and b) making sure our English grammar is "correct", let's stick with
'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote network
call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not documented
in the javadoc. As such, I am strongly against modifying the behavior of
endOffsets() on the consumer as some people may implicitly depend on the
contract.
3a. The Consumer#currentLag() method gives us exactly what we want without
a network call (current lag from a cache, from which we can compute the
offset).

4. I have no opinion about whether we should pass endOffset or currentLag
to the callback. Either one has the same exact information inside it. In
the interest of not changing the KIP after the vote has started, I'll leave
it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman 
wrote:

> Just want to checkpoint the current state of this KIP and make sure we're
> on track to get it in to 3.7 (we still have a few weeks)  -- looks like
> there are two remaining open questions, both relating to the
> middle/intermediate callback:
>
> 1. What to name it: seems like the primary candidates are onBatchLoaded and
> onBatchUpdated (and maybe also onStandbyUpdated?)
> 2. What additional information can we pass in that would strike a good
> balance between being helpful and impacting performance.
>
> Regarding #1, I think all of the current options are reasonable enough that
> we should just let Colt decide which he prefers. I personally think
> #onBatchUpdated is fine -- Bruno does make a fair point but the truth is
> that English grammar can be sticky and while it could be argued that it is
> the store which is updated, not the batch, I feel that it is perfectly
> clear what is meant by "onBatchUpdated" and to me, this doesn't sound weird
> at all. That's just my two cents in case it helps, but again, whatever
> makes sense to you Colt is fine
>
> When it comes to #2 -- as much as I would love to dig into the Consumer
> client lore and see if we can modify existing APIs or add new ones in order
> to get the desired offset metadata in an efficient way, I think we're
> starting to go down a rabbit hole that is going to expand the scope way
> beyond what Colt thought he was signing up for. I would advocate to focus
> on just the basic feature for now and drop the end-offset from the
> callback. Once we have a standby listener it will be easy to expand on with
> a followup KIP if/when we find an efficient way to add additional useful
> information. I think it will also become more clear what is and isn't
> useful after more people get to using it in the real world
>
> Colt/Eduwer: how necessary is receiving the end offset during a batch
> update to your own application use case?
>
> Also, for those who really do need to check the current end offset, I
> believe in theory you should be able to use the KafkaStreams#metrics API to
> get the current lag and/or end offset for the changelog -- it's possible
> this does not represent the most up-to-date end offset (I'm not sure it
> does or does not), but it should be close enough to be reliable and useful
> for the purpose of monitoring -- I mean it is a metric, after all.
>
> Hope this helps -- in the end, it's up to you (Colt) to decide what you
> want to bring in scope or not. We still have more than 3 weeks until the
> KIP freeze as currently proposed, so in theory you could even implement
> this KIP without the end offset and then do a followup KIP to add the end
> offset within the same release, ie without any deprecations. There are
> plenty of paths forward here, so don't let us drag this out forever if you
> know what you want
>
> Cheers,
> Sophie
>
> On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax  wrote:
>
> > Forgot one thing:
> >
> > We could also pass `currentLag()` into `onBachLoaded()` instead of
> > end-offset.
> >
> >
> > -Matthias
> >
> > On 10/20/23 10:56 

[VOTE] KIP-988 Streams StandbyUpdateListener

2023-10-15 Thread Colt McNealy
Hello there,

I'd like to call a vote on KIP-988 (co-authored by my friend and colleague
Eduwer Camacaro). We are hoping to get it in before the 3.7.0 release.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-15 Thread Colt McNealy
Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang 
wrote:

> Thanks for the summary, that looks good to me.
>
> Guozhang
>
> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy  wrote:
> >
> > Hello there!
> >
> > Thanks everyone for the comments. There's a lot of back-and-forth going
> on,
> > so I'll do my best to summarize what everyone's said in TLDR format:
> >
> > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
> similarly
> > for the other methods.
> > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> > 3. Remove the `earliestOffset` parameter for performance reasons.
> >
> > If that's all fine with everyone, I'll update the KIP and we—well, mostly
> > Edu (:  —will open a PR.
> >
> > Cheers,
> > Colt McNealy
> >
> > *Founder, LittleHorse.dev*
> >
> >
> > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
> > wrote:
> >
> > > Hello everyone,
> > >
> > > Thanks for all your feedback for this KIP!
> > >
> > > I think that the key to choosing proper names for this API is
> understanding
> > > the terms used inside the StoreChangelogReader. Currently, this class
> has
> > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my
> opinion,
> > > using StandbyUpdateListener for the interface fits better on these
> terms.
> > > Same applies for onUpdateStart/Suspended.
> > >
> > > StoreChangelogReader uses "the same mechanism" for active task
> restoration
> > > and standby task updates, but this is an implementation detail. Under
> > > normal circumstances (no rebalances or task migrations), the changelog
> > > reader will be in STANDBY_UPDATING, which means it will be updating
> standby
> > > tasks as long as there are new records in the changelog topic. That's
> why I
> > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't
> 100%
> > > align with StateRestoreListener, but either one is fine.
> > >
> > > Edu
> > >
> > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> guozhang.wang...@gmail.com>
> > > wrote:
> > >
> > > > Hello Colt,
> > > >
> > > > Thanks for writing the KIP! I have read through the updated KIP and
> > > > overall it looks great. I only have minor naming comments (well,
> > > > aren't naming the least boring stuff to discuss and that takes the
> > > > most of the time for KIPs :P):
> > > >
> > > > 1. I tend to agree with Sophie regarding whether or not to include
> > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > > > it is also more consistent with the functions of
> > > > "StateRestoreListener" where we do not name it as
> > > > "onStateRestoreState" etc.
> > > >
> > > > 2. I know in community discussions we sometimes say "a standby is
> > > > promoted to active", but in the official code / java docs we did not
> > > > have a term of "promotion", since what the code does is really
> recycle
> > > > the task (while keeping its state stores open), and create a new
> > > > active task that takes in the recycled state stores and just changing
> > > > the other fields like task type etc. After thinking about this for a
> > > > bit, I tend to feel that "promoted" is indeed a better name for user
> > > > facing purposes while "recycle" is more of a technical detail inside
> > > > the code and could be abstracted away from users. So I feel keeping
> > > > the name "PROMOTED" is fine.
> > > >
> > > > 3. Regarding "earliestOffset", it does feel like we cannot always
> > > > avoid another call to the Kafka API. And on the other hand, I also
> > > > tend to think that such bookkeeping may be better done at the app
> > > > level than from the Streams' public API level. I.e. the app could
> keep
> > > > a "first ever starting offset" per "topic-partition-store" key, and a
> > > > when we have rolling restart and hence some standby task keeps
> > > > "jumping" from one client to another via task assignment, the app
> > > > would update this value just one when it finds the
> > > > &

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-13 Thread Colt McNealy
Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth going on,
so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do similarly
for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well, mostly
Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
wrote:

> Hello everyone,
>
> Thanks for all your feedback for this KIP!
>
> I think that the key to choosing proper names for this API is understanding
> the terms used inside the StoreChangelogReader. Currently, this class has
> two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion,
> using StandbyUpdateListener for the interface fits better on these terms.
> Same applies for onUpdateStart/Suspended.
>
> StoreChangelogReader uses "the same mechanism" for active task restoration
> and standby task updates, but this is an implementation detail. Under
> normal circumstances (no rebalances or task migrations), the changelog
> reader will be in STANDBY_UPDATING, which means it will be updating standby
> tasks as long as there are new records in the changelog topic. That's why I
> prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100%
> align with StateRestoreListener, but either one is fine.
>
> Edu
>
> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang 
> wrote:
>
> > Hello Colt,
> >
> > Thanks for writing the KIP! I have read through the updated KIP and
> > overall it looks great. I only have minor naming comments (well,
> > aren't naming the least boring stuff to discuss and that takes the
> > most of the time for KIPs :P):
> >
> > 1. I tend to agree with Sophie regarding whether or not to include
> > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > it is also more consistent with the functions of
> > "StateRestoreListener" where we do not name it as
> > "onStateRestoreState" etc.
> >
> > 2. I know in community discussions we sometimes say "a standby is
> > promoted to active", but in the official code / java docs we did not
> > have a term of "promotion", since what the code does is really recycle
> > the task (while keeping its state stores open), and create a new
> > active task that takes in the recycled state stores and just changing
> > the other fields like task type etc. After thinking about this for a
> > bit, I tend to feel that "promoted" is indeed a better name for user
> > facing purposes while "recycle" is more of a technical detail inside
> > the code and could be abstracted away from users. So I feel keeping
> > the name "PROMOTED" is fine.
> >
> > 3. Regarding "earliestOffset", it does feel like we cannot always
> > avoid another call to the Kafka API. And on the other hand, I also
> > tend to think that such bookkeeping may be better done at the app
> > level than from the Streams' public API level. I.e. the app could keep
> > a "first ever starting offset" per "topic-partition-store" key, and a
> > when we have rolling restart and hence some standby task keeps
> > "jumping" from one client to another via task assignment, the app
> > would update this value just one when it finds the
> > ""topic-partition-store" was never triggered before. What do you
> > think?
> >
> > 4. I do not have a strong opinion either, but what about
> "onBatchUpdated" ?
> >
> >
> > Guozhang
> >
> > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy 
> wrote:
> > >
> > > Sohpie—
> > >
> > > Thank you very much for such a detailed review of the KIP. It might
> > > actually be longer than the original KIP in the first place!
> > >
> > > 1. Ack'ed and fixed.
> > >
> > > 2. Correct, this is a confusing passage and requires context:
> > >
> > > One thing on our list of TODO's regarding reliability is to determine
> how
> > > to configure `session.timeout.ms`. In our Kubernetes Environment, an
> > > instance of our Streams App can be terminated, restarted, and get back
> > into
> > > the "RUNNING" Streams state in about 20 seconds. We have two options
> > here:
> > > a) set session.timeout.ms to 30 seconds or so, and deal with 20
> seconds
>

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-11 Thread Colt McNealy
Will update to `onStandbyUpdateSuspended()`
T6. Thoughts on renaming SuspendReason to StandbySuspendReason, rather than
renaming PROMOTED to RECYCLED? @Eduwer?

Long Live the Otter,
Colt McNealy

*Founder, LittleHorse.dev*


On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman 
wrote:

> Hey Colt! Thanks for the KIP -- this will be a great addition to Streams, I
> can't believe we've gone so long without this.
>
> Overall the proposal makes sense, but I had a handful of fairly minor
> questions and suggestions/requests
>
> 1. Seems like the last sentence in the 2nd paragraph of the Motivation
> section is cut off and incomplete -- "want to be able to know " what
> exactly?
>
> 2. This isn't that important since the motivation as a whole is clear to me
> and convincing enough, but I'm not quite sure I understand the example at
> the end of the Motivation section. How are standby tasks (and the ability
> to hook into and monitor their status) related to the session.timeout.ms
> config?
>
> 3. To help both old and new users of Kafka Streams understand this new
> restore listener and its purpose/semantics, can we try to name the class
> and
>  callbacks in a way that's more consistent with the active task restore
> listener?
>
> 3a. StandbyTaskUpdateListener:
> The existing restore listener is called StateRestoreListener, so the new
> one could be called something like StandbyStateRestoreListener. Although
> we typically refer to standby tasks as "processing" rather than "restoring"
> records -- ie restoration is a term for active task state specifically. I
> actually
> like the original suggestion if we just drop the "Task" part of the name,
> ie StandbyUpdateListener. I think either that or StandbyRestoreListener
> would be fine and probably the two best options.
> Also, this probably goes without saying but any change to the name of this
> class should of course be reflected in the KafkaStreams#setXXX API as well
>
> 3b. #onTaskCreated
>  I know the "start" callback feels a bit different for the standby task
> updater vs an active task beginning restoration, but I think we should try
> to
> keep the various callbacks aligned to their active restore listener
> counterpart. We can/should just replace the term "restore" with "update"
> for the
> callback method names the same way we do for the class name, which in this
> case would give us #onUpdateStart. Personally I like this better,
> but it's ultimately up to you. However, I would push back against anything
> that includes the word "Task" (eg #onTaskCreated) as the listener
>  is actually not scoped to the task itself but instead to the individual
> state store(s). This is the main reason I would prefer calling it something
> like #onUpdateStart, which keeps the focus on the store being updated
> rather than the task that just happens to own this store
> One last thing on this callback -- do we really need both the
> `earliestOffset` and `startingOffset`? I feel like this might be more
> confusing than it
> is helpful (tbh even I'm not completely sure I know what the earliestOffset
> is supposed to represent) More importantly, is this all information
> that is already available and able to be passed in to the callback by
> Streams? I haven't checked on this but it feels like the earliestOffset is
> likely to require a remote call, either by the embedded consumer or via the
> admin client. If so, the ROI on including this parameter seems
> quite low (if not outright negative)
>
> 3c. #onBatchRestored
> If we opt to use the term "update" in place of "restore" elsewhere, then we
> should consider doing so here as well. What do you think about
> #onBatchUpdated, or even #onBatchProcessed?
> I'm actually not super concerned about this particular API, and honestly I
> think we can use restore or update interchangeably here, so if you
>  don't like any of the suggested names (and no one can think of anything
> better), I would just stick with #onBatchRestored. In this case,
> it kind of makes the most sense.
>
> 3d. #onTaskSuspended
> Along the same lines as 3b above, #onUpdateSuspended or just
> #onRestoreSuspended probably makes more sense for this callback. Also,
>  I notice the StateRestoreListener passes in the total number of records
> restored to its #onRestoreSuspended. Assuming we already track
> that information in Streams and have it readily available to pass in at
> whatever point we would be invoking this callback, that might be a
> useful  parameter for the standby listener to have as well
>
> 4. I totally love the SuspendReason thing, just two notes/requests:
>
> 4a. Feel free to push back against adding onto the scope

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-09 Thread Colt McNealy
Hanyu,

I like the attention to detail!

It is correct that the JavaDoc does not "guarantee" order. However, the
public API contract specified in the javadoc does mention lexicographical
ordering of the bytes, which is a useful API contract. In our Streams app
we make use of that contract during interactive queries (specifically, to
guarantee correctness when doing a paginated range scan. If the order
changes, then the "bookmark" we use for pagination would be meaningless).

As such, I still think the KIP as you proposed is a highly useful feature.
I would just make a note of the semantics in the JavaDoc and also in the
KIP.

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 9, 2023 at 2:22 PM Hanyu (Peter) Zheng
 wrote:

> After our discussion, we discovered something intriguing. The definitions
> for the range and reverseRange methods in the ReadOnlyKeyValueStore are as
> follows:
> /**
>  * Get an iterator over a given range of keys. This iterator must be
> closed after use.
>  * The returned iterator must be safe from {@link
> java.util.ConcurrentModificationException}s
>  * and must not return null values.
>  ** Order is not guaranteed as bytes lexicographical ordering might not
> represent key order.*
>  *
>  * @param from The first key that could be in the range, where
> iteration starts from.
>  * A null value indicates that the range starts with the
> first element in the store.
>  * @param to   The last key that could be in the range, where iteration
> ends.
>  * A null value indicates that the range ends with the last
> element in the store.
>  * @return The iterator for this range, from smallest to largest bytes.
>  * @throws InvalidStateStoreException if the store is not initialized
>  */
> KeyValueIterator range(K from, K to);
>
> /**
>  * Get a reverse iterator over a given range of keys. This iterator
> must be closed after use.
>  * The returned iterator must be safe from {@link
> java.util.ConcurrentModificationException}s
>  * and must not return null values.
>  * *Order is not guaranteed as bytes lexicographical ordering might not
> represent key order.*
>  *
>  * @param from The first key that could be in the range, where
> iteration ends.
>  * A null value indicates that the range starts with the
> first element in the store.
>  * @param to   The last key that could be in the range, where iteration
> starts from.
>  * A null value indicates that the range ends with the last
> element in the store.
>  * @return The reverse iterator for this range, from largest to
> smallest key bytes.
>  * @throws InvalidStateStoreException if the store is not initialized
>  */
> default KeyValueIterator reverseRange(K from, K to) {
> throw new UnsupportedOperationException();
> }
>
> The query methods of RangeQuery ultimately invoke either the range method
> or the reverseRange method. However, as per the JavaDoc: the order is not
> guaranteed, since byte lexicographical ordering may not correspond to the
> actual key order.
>
> Sincerely,
> Hanyu
>
> On Fri, Oct 6, 2023 at 10:00 AM Hanyu (Peter) Zheng 
> wrote:
>
> > Thank you, Matthias, for the detailed implementation and explanation. As
> > of now, our capability is limited to executing interactive queries on
> > individual partitions. To illustrate:
> >
> > Consider the IQv2StoreIntegrationTest:
> >
> > We have two partitions:
> > Partition0 contains key-value pairs: <0,0> and <2,2>.
> > Partition1 contains key-value pairs: <1,1> and <3,3>.
> > When executing RangeQuery.withRange(1,3), the results are:
> >
> > Partition0: [2]
> > Partition1: [1, 3]
> > To support functionalities like reverseRange and reverseAll, we can
> > introduce the withDescendingKeys() method. For instance, using
> > RangeQuery.withRange(1,3).withDescendingKeys(), the anticipated results
> are:
> >
> > Partition0: [2]
> > Partition1: [3, 1]
> >
> > In response to Hao's inquiry about the boundary issue, please refer to
> the
> > StoreQueryUtils class. The code snippet:
> >
> > iterator = kvStore.range(lowerRange.orElse(null),
> upperRange.orElse(null));
> > indicates that when implementing range in each store, it's structured
> like:
> >
> > @Override
> > public KeyValueIterator range(final Bytes from, final
> Bytes
> > to) {
> > if (from != null && to != null && from.compareTo(to) > 0) {
> > This section performs the necessary checks.
> &

Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics

2023-10-05 Thread Colt McNealy
Thank you for the KIP, Nick!

This would be highly useful for many reasons. Much more sane than checking
for leaked iterators by profiling memory usage while running 100's of
thousands of range scans via interactive queries (:

One small question:

>The iterator-duration metrics will be updated whenever an Iterator's
close() method is called

Does the Iterator have its own "createdAt()" or equivalent field, or do we
need to keep track of the Iterator's start time upon creation?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Wed, Oct 4, 2023 at 9:07 AM Nick Telford  wrote:

> Hi everyone,
>
> KIP-989 is a small Kafka Streams KIP to add a few new metrics around the
> creation and use of RocksDB Iterators, to aid users in identifying
> "Iterator leaks" that could cause applications to leak native memory.
>
> Let me know what you think!
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics
>
> P.S. I'm not too sure about the formatting of the "New Metrics" table, any
> advice there would be appreciated.
>
> Regards,
> Nick
>


[DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-03 Thread Colt McNealy
Hi all,

We would like to propose a small KIP to improve the ability of Streams apps
to monitor the progress of their standby tasks through a callback interface.

We have a nearly-working implementation on our fork and are curious for
feedback.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener

Thank you,
Colt McNealy

*Founder, LittleHorse.dev*


Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-03 Thread Colt McNealy
Hello Hanyu,

Thank you for the KIP. I agree with Matthias' proposal to keep the naming
convention consistent with KIP-969. I favor the `.withDescendingKeys()`
name.

I am curious about one thing. RocksDB guarantees that records returned
during a range scan are lexicographically ordered by the bytes of the keys
(either ascending or descending order, as specified in the query). This
means that results within a single partition are indeed ordered.** My
reading of KIP-805 suggests to me that you don't need to specify the
partition number you are querying in IQv2, which means that you can have a
valid reversed RangeQuery over a store with "multiple partitions" in it.

Currently, IQv1 does not guarantee order of keys in this scenario. Does
IQv2 support ordering across partitions? Such an implementation would
require opening a rocksdb range scan** on multiple rocksdb instances (one
per partition), and polling the first key of each. Whether or not this is
ordered, could we please add that to the documentation?

**(How is this implemented/guaranteed in an `inMemoryKeyValueStore`? I
don't know about that implementation).

Colt McNealy

*Founder, LittleHorse.dev*


On Tue, Oct 3, 2023 at 1:35 PM Hanyu (Peter) Zheng
 wrote:

> ok, I will update it. Thank you  Matthias
>
> Sincerely,
> Hanyu
>
> On Tue, Oct 3, 2023 at 11:23 AM Matthias J. Sax  wrote:
>
> > Thanks for the KIP Hanyu!
> >
> >
> > I took a quick look and it think the proposal makes sense overall.
> >
> > A few comments about how to structure the KIP.
> >
> > As you propose to not add `ReverseRangQuery` class, the code example
> > should go into "Rejected Alternatives" section, not in the "Proposed
> > Changes" section.
> >
> > For the `RangeQuery` code example, please omit all existing methods etc,
> > and only include what will be added/changed. This make it simpler to
> > read the KIP.
> >
> >
> > nit: typo
> >
> > >  the fault value is false
> >
> > Should be "the default value is false".
> >
> >
> > Not sure if `setReverse()` is the best name. Maybe `withDescandingOrder`
> > (or similar, I guess `withReverseOrder` would also work) might be
> > better? Would be good to align to KIP-969 proposal that suggest do use
> > `withDescendingKeys` methods for "reverse key-range"; if we go with
> > `withReverseOrder` we should change KIP-969 accordingly.
> >
> > Curious to hear what others think about naming this consistently across
> > both KIPs.
> >
> >
> > -Matthias
> >
> >
> > On 10/3/23 9:17 AM, Hanyu (Peter) Zheng wrote:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2
> > >
> >
>
>
> --
>
> [image: Confluent] <https://www.confluent.io>
> Hanyu (Peter) Zheng he/him/his
> Software Engineer Intern
> +1 (213) 431-7193 <+1+(213)+431-7193>
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> <https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
> <https://slackpass.io/confluentcommunity>[image: YouTube]
> <https://youtube.com/confluent>
>
> [image: Try Confluent Cloud for Free]
> <
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic
> >
>


Permission to Create KIP

2023-10-03 Thread Colt McNealy
Hello there,

Could I please have access to create a Wiki page? A team member and I would
like to jointly propose a small KIP.

JIRA id: coltmcnealy-lh

Thank you,
Colt McNealy

*Founder, LittleHorse.dev*


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-17 Thread Colt McNealy
> Making IsolationLevel a query-time constraint, rather than linking it to
the processing.guarantee.

As I understand it, would this allow even a user of EOS to control whether
reading committed or uncommitted records? If so, I am highly in favor of
this.

I know that I was one of the early people to point out the current
shortcoming that IQ reads uncommitted records, but just this morning I
realized a pattern we use which means that (for certain queries) our system
needs to be able to read uncommitted records, which is the current behavior
of Kafka Streams in EOS.***

If IsolationLevel being a query-time decision allows for this, then that
would be amazing. I would also vote that the default behavior should be for
reading uncommitted records, because it is totally possible for a valid
application to depend on that behavior, and breaking it in a minor release
might be a bit strong.

*** (Note, for the curious reader) Our use-case/query pattern is a bit
complex, but reading "uncommitted" records is actually safe in our case
because processing is deterministic. Additionally, IQ being able to read
uncommitted records is crucial to enable "read your own writes" on our API:
Due to the deterministic processing, we send an "ack" to the client who
makes the request as soon as the processor processes the result. If they
can't read uncommitted records, they may receive a "201 - Created"
response, immediately followed by a "404 - Not Found" when doing a lookup
for the object they just created).

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Wed, Sep 13, 2023 at 9:19 AM Nick Telford  wrote:

> Addendum:
>
> I think we would also face the same problem with the approach John outlined
> earlier (using the record cache as a transaction buffer and flushing it
> straight to SST files). This is because the record cache (the ThreadCache
> class) is not thread-safe, so every commit would invalidate open IQ
> Iterators in the same way that RocksDB WriteBatches do.
> --
> Nick
>
> On Wed, 13 Sept 2023 at 16:58, Nick Telford 
> wrote:
>
> > Hi Bruno,
> >
> > I've updated the KIP based on our conversation. The only things I've not
> > yet done are:
> >
> > 1. Using transactions under ALOS and EOS.
> > 2. Making IsolationLevel a query-time constraint, rather than linking it
> > to the processing.guarantee.
> >
> > There's a wrinkle that makes this a challenge: Interactive Queries that
> > open an Iterator, when using transactions and READ_UNCOMMITTED.
> > The problem is that under READ_UNCOMMITTED, queries need to be able to
> > read records from the currently uncommitted transaction buffer
> > (WriteBatch). This includes for Iterators, which should iterate both the
> > transaction buffer and underlying database (using
> > WriteBatch#iteratorWithBase()).
> >
> > The issue is that when the StreamThread commits, it writes the current
> > WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the
> > WriteBatch while an Interactive Query holds an open Iterator on it will
> > invalidate the Iterator. Worse, it turns out that Iterators over a
> > WriteBatch become invalidated not just when the WriteBatch is cleared,
> but
> > also when the Iterators' current key receives a new write.
> >
> > Now that I'm writing this, I remember that this is the major reason that
> I
> > switched the original design from having a query-time IsolationLevel to
> > having the IsolationLevel linked to the transactionality of the stores
> > themselves.
> >
> > It *might* be possible to resolve this, by having a "chain" of
> > WriteBatches, with the StreamThread switching to a new WriteBatch
> whenever
> > a new Interactive Query attempts to read from the database, but that
> could
> > cause some performance problems/memory pressure when subjected to a high
> > Interactive Query load. It would also reduce the efficiency of
> WriteBatches
> > on-commit, as we'd have to write N WriteBatches, where N is the number of
> > Interactive Queries since the last commit.
> >
> > I realise this is getting into the weeds of the implementation, and you'd
> > rather we focus on the API for now, but I think it's important to
> consider
> > how to implement the desired API, in case we come up with an API that
> > cannot be implemented efficiently, or even at all!
> >
> > Thoughts?
> > --
> > Nick
> >
> > On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna  wrote:
> >
> >> Hi Nick,
> >>
> >> 6.
> >> Of course, you are right! My bad!
> >> Wiping out the state in the downgrading case is fine.
> >>
> &g

[jira] [Created] (KAFKA-15448) Streams StandbyTaskUpdateListener

2023-09-11 Thread Colt McNealy (Jira)
Colt McNealy created KAFKA-15448:


 Summary: Streams StandbyTaskUpdateListener
 Key: KAFKA-15448
 URL: https://issues.apache.org/jira/browse/KAFKA-15448
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Colt McNealy


In addition to the new metrics in KIP-869, it would be great to have a callback 
that allows for monitoring of Standby Task status. The StateRestoreListener is 
currently not called for Standby Tasks for good reasons (the API wouldn't make 
sense for Standby). I've attached an interface which would be nice to have:

 

```
public interface StandbyTaskUpdateListener {
​
public enum SuspendReason {
MIGRATED,
PROMOTED;
}
 
/**
* Method called upon the creation of the Standby Task.
*
* @param topicPartition the TopicPartition of the Standby Task.
* @param storeName the name of the store being watched by this Standby Task.
* @param earliestOffset the earliest offset available on the Changelog topic.
* @param startingOffset the offset from which the Standby Task starts watching.
* @param currentEndOffset the current latest offset on the associated changelog 
partition.
*/
void onTaskCreated(final TopicPartition topicPartition,
final String storeName,
final long earliestOffset
final long startingOffset,
final long currentEndOffset);
​
/**
* Method called after restoring a batch of records. In this case the maximum 
size of the batch is whatever
* the value of the MAX_POLL_RECORDS is set to.
*
* This method is called after restoring each batch and it is advised to keep 
processing to a minimum.
* Any heavy processing will hold up recovering the next batch, hence slowing 
down the restore process as a
* whole.
*
* If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
*
* @param topicPartition the TopicPartition containing the values to restore
* @param storeName the name of the store undergoing restoration
* @param batchEndOffset the inclusive ending offset for the current restored 
batch for this TopicPartition
* @param numRestored the total number of records restored in this batch for 
this TopicPartition
* @param currentEndOffset the current end offset of the changelog topic 
partition.
*/
void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored,
final long currentEndOffset);
​
/**
* Method called after a Standby Task is closed, either because the task 
migrated to a new instance or because the task was promoted to an Active task.
*/
void onTaskSuspended(final TopicPartition topicPartition,
final String storeName,
final long storeOffset,
final long currentEndOffset,
final SuspendReason reason);
}
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-11 Thread Colt McNealy
Nick,

Thanks for the response.

>Can you clarify how much state was restored in those 11 seconds?
That was a full restoration of ~650MB of state after I wiped the state
directory. The restoration after a crash with your branch is nearly
instantaneous, whereas with plain Kafka 3.5.0 a crash triggers a full
restoration (8 seconds).

Additionally, I pulled, rebuilt, and re-tested your changes and now the
restoration time with your branch is the same as with vanilla Streams!
Fantastic work!

I plan to do some more testing with larger state stores over the next
couple weeks, both with RocksDB and Speedb OSS. And perhaps I might even
try enabling some of the experimental Speedb OSS features, such as the
[Improved Write Flow](https://docs.speedb.io/speedb-features/write-flow).
As far as I understand, this isn't possible to do through the standard
RocksDBConfigSetter since some of the config options are Speedb-specific.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Sep 11, 2023 at 4:29 AM Nick Telford  wrote:

> Hi Colt,
>
> Thanks for taking the time to run your benchmarks on this, that's
> incredibly helpful.
>
> > With KIP 892, I verified that unclean shutdown does not cause a fresh
> > restore (). I got the following benchmark results:
> > - Benchmark took 216 seconds
> > - 1,401 tasks per second on one partition
> > - 11 seconds to restore the state
>
> Can you clarify how much state was restored in those 11 seconds? Was this
> the time to do the full restore regardless, or was it the time to only
> restore a small fraction of the state (e.g. the last aborted transaction)?
>
> > -- QUESTION: Because we observed a significant (30% or so) and
> reproducible
> > slowdown during restoration, it seems like KIP-892 uses the checkpointing
> > behavior during restoration as well? If so, I would argue that this might
> > not be necessary, because everything we write is already committed, so we
> > don't need to change the behavior during restoration or standby tasks.
> > Perhaps we could write the offsets to RocksDB on every batch (or even
> every
> > 5 seconds or so).
>
> Restore has always used a completely separate code-path to regular writes,
> and continues to do so. I had a quick pass over the code and I suspect I
> know what's causing the performance degradation: for every restored record,
> I was adding the changelog offset of that record to the batch along with
> the record. This is different to the regular write-path, which only adds
> the current offsets once, on-commit. This writeOffset method is fairly
> expensive, since it has to serialize the TopicPartition and offset that's
> being written to the database.
>
> Assuming this is the cause, I've already pushed a fix to my branch that
> will only call writeOffset once per-batch, and also adds some caching to
> the serialization in writeOffset, that should also enhance performance of
> state commit in the normal write-path.
>
> Please let me know if this addresses the issue!
>
> Regards,
> Nick
>
>
> On Mon, 11 Sept 2023 at 05:38, Colt McNealy  wrote:
>
> > Howdy folks,
> >
> > First I wanted to say fantastic work and thank you to Nick. I built your
> > branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did
> > some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0`
> > branch, and your `kip-892-3.5.0` branch built with Speedb OSS 2.3.0.1.
> And
> > it worked! Including the global store (we don't have any segmented
> stores,
> > unfortunately).
> >
> > The test I ran involved running 3,000 workflows with 100 tasks each, and
> > roughly 650MB state total.
> >
> > With Streams 3.5.0, I indeed verified that unclean shutdown caused a
> fresh
> > restore from scratch. I also benchmarked my application at:
> > - Running the benchmark took 211 seconds
> > - 1,421 tasks per second on one partition
> > - 8 seconds to restore the state (650MB or so)
> >
> > With KIP 892, I verified that unclean shutdown does not cause a fresh
> > restore (). I got the following benchmark results:
> > - Benchmark took 216 seconds
> > - 1,401 tasks per second on one partition
> > - 11 seconds to restore the state
> >
> > I ran the restorations many times to ensure that there was no rounding
> > error or noise; the results were remarkably consistent. Additionally, I
> ran
> > the restorations with KIP-892 built with Speedb OSS. The restoration time
> > consistently came out as 10 seconds, which was an improvement from the 11
> > seconds observed with RocksDB + KIP-892.
> >
> > My application is bottlenecked mostly by serialization and
> deserialization

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-10 Thread Colt McNealy
Howdy folks,

First I wanted to say fantastic work and thank you to Nick. I built your
branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did
some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0`
branch, and your `kip-892-3.5.0` branch built with Speedb OSS 2.3.0.1. And
it worked! Including the global store (we don't have any segmented stores,
unfortunately).

The test I ran involved running 3,000 workflows with 100 tasks each, and
roughly 650MB state total.

With Streams 3.5.0, I indeed verified that unclean shutdown caused a fresh
restore from scratch. I also benchmarked my application at:
- Running the benchmark took 211 seconds
- 1,421 tasks per second on one partition
- 8 seconds to restore the state (650MB or so)

With KIP 892, I verified that unclean shutdown does not cause a fresh
restore (). I got the following benchmark results:
- Benchmark took 216 seconds
- 1,401 tasks per second on one partition
- 11 seconds to restore the state

I ran the restorations many times to ensure that there was no rounding
error or noise; the results were remarkably consistent. Additionally, I ran
the restorations with KIP-892 built with Speedb OSS. The restoration time
consistently came out as 10 seconds, which was an improvement from the 11
seconds observed with RocksDB + KIP-892.

My application is bottlenecked mostly by serialization and deserialization,
so improving the performance of the state store doesn't really impact our
throughput that much. And the processing performance (benchmark time,
tasks/second) are pretty close in KIP-892 vs Streams 3.5.0. However, at
larger state store sizes, RocksDB performance begins to degrade, so that
might not be true once we pass 20GB per partition.

-- QUESTION: Because we observed a significant (30% or so) and reproducible
slowdown during restoration, it seems like KIP-892 uses the checkpointing
behavior during restoration as well? If so, I would argue that this might
not be necessary, because everything we write is already committed, so we
don't need to change the behavior during restoration or standby tasks.
Perhaps we could write the offsets to RocksDB on every batch (or even every
5 seconds or so).

-- Note: This was a very small-scale test, with <1GB of state (as I didn't
have time to spend hours building up state). In the past I have noted that
RocksDB performance degrades significantly after 25GB of state in one
store. Future work involves determining the performance impact of KIP-892
relative to trunk at larger scale, since it's possible that the relative
behaviors are far different (i.e. relative to trunk, 892's processing and
restoration throughput might be much better or much worse).

-- Note: For those who want to replicate the tests, you can find the branch
of our streams app here:
https://github.com/littlehorse-enterprises/littlehorse/tree/minor/testing-streams-forks
. The example I ran was `examples/hundred-tasks`, and I ran the server with
`./local-dev/do-server.sh one-partition`. The `STREAMS_TESTS.md` file has a
detailed breakdown of the testing.

Anyways, I'm super excited about this KIP and if a bit more future testing
goes well, we plan to ship our product with a build of KIP-892, Speedb OSS,
and potentially a few other minor tweaks that we are thinking about.

Thanks Nick!

Ride well,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Aug 24, 2023 at 3:19 AM Nick Telford  wrote:

> Hi Bruno,
>
> Thanks for taking the time to review the KIP. I'm back from leave now and
> intend to move this forwards as quickly as I can.
>
> Addressing your points:
>
> 1.
> Because flush() is part of the StateStore API, it's exposed to custom
> Processors, which might be making calls to flush(). This was actually the
> case in a few integration tests.
> To maintain as much compatibility as possible, I'd prefer not to make this
> an UnsupportedOperationException, as it will cause previously working
> Processors to start throwing exceptions at runtime.
> I agree that it doesn't make sense for it to proxy commit(), though, as
> that would cause it to violate the "StateStores commit only when the Task
> commits" rule.
> Instead, I think we should make this a no-op. That way, existing user
> Processors will continue to work as-before, without violation of store
> consistency that would be caused by premature flush/commit of StateStore
> data to disk.
> What do you think?
>
> 2.
> As stated in the JavaDoc, when a StateStore implementation is
> transactional, but is unable to estimate the uncommitted memory usage, the
> method will return -1.
> The intention here is to permit third-party implementations that may not be
> able to estimate memory usage.
>
> Yes, it will be 0 when nothing has been written to the store yet. I thought
> that was implied by "This method will return an approximation of the memory
> would be freed

[jira] [Created] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS

2023-08-04 Thread Colt McNealy (Jira)
Colt McNealy created KAFKA-15308:


 Summary: Wipe Stores upon OffsetOutOfRangeException in ALOS
 Key: KAFKA-15308
 URL: https://issues.apache.org/jira/browse/KAFKA-15308
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.0, 3.4.0, 3.3.0
Reporter: Colt McNealy


As per this [Confluent Community Slack 
Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ],
 Streams currently does not wipe away RocksDB state upon encountering an 
`OffsetOutOfRangeException` in ALOS.

 

`OffsetOutOfRangeException` is a rare case that occurs when a standby task 
requests offsets that no longer exist in the topic. We should wipe the store 
for three reasons:
 # Not wiping the store can be a violation of ALOS since some of the 
now-missing offsets could have contained tombstone records.
 # Wiping the store has no performance cost since we need to replay the 
entirety of what's in the changelog topic anyways.
 # I have heard (not yet confirmed myself) that we wipe the store in EOS 
anyways, so fixing this bug could remove a bit of complexity from supporting 
EOS and ALOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Streams: Global Store Processors

2023-07-26 Thread Colt McNealy
Hi all,

In this JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-7663 it's
documented that Global Stores are bypassed on restoration.

Consequently, the input topic to a Global Store needs to essentially be a
changelog topic as the keys and values are copied directly into the store.

I heard (perhaps in the Slack) a while ago that there was some conversation
about removing the ability to supply a Processor to the Global Store to
prevent users from tripping over that behavior. However, we currently rely
on the Processor to notify other parts of our application that things have
changed in the store (eg. for cache invalidation, metrics, etc). Obviously,
we make sure to respect the semantics of how the processor+global store
works for restoration etc...

It seems to me like the fact that we can pass in a Processor is a public
API contract, so it should be safe to rely on that...? Would it require a
KIP to change the fact that we can pass in a Processor? How much
deprecation notice would we have before we need to find a new solution?

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-21 Thread Colt McNealy
Sophie—

Thanks for chiming in here. +1 to the idea of specifying the ordering
guarantees that we make in the StorageTypeSpec javadocs.

Quick question then. Is the javadoc that says:

> Order is not guaranteed as bytes lexicographical ordering might not
represent key order.

no longer correct, and should say:

> Order guarantees depend on the underlying implementation of the
ReadOnlyKeyValueStore. For more information, please consult the
[StorageTypeSpec javadocs]()

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jul 20, 2023 at 9:28 PM Sophie Blee-Goldman 
wrote:

> Hey Almog, first off, thanks for the KIP! I (and others) raised concerns
> over how restrictive the default.dsl.store config would be if not
> extendable to custom store types, especially given that this seems to be
> the primary userbase of such a feature. At the time we didn't really have
> any better ideas for a clean way to achieve that, but what you proposed
> makes a lot of sense to me. Happy to see a good solution to this, and
> hopefully others will share my satisfaction :P
>
> I did have one quick piece of feedback which arose from an unrelated
> question posed to the dev mailing list w/ subject line
> "ReadOnlyKeyValueStore#range()
> Semantics"
> <https://lists.apache.org/thread/jbckmth8d3mcgg0rd670cpvsgwzqlwrm>. I
> recommend checking out the full thread for context, but it made me think
> about how we can leverage the new StoreTypeSpec concept as an answer to the
> long-standing question in Streams: where can we put guarantees of the
> public contract for RocksDB (or other store implementations) when all the
> RocksDB stuff is technically internal.
>
> Basically, I'm suggesting two things: first, call out in some way (perhaps
> the StoreTypeSpec javadocs) that each StoreTypeSpec is considered a public
> contract in itself and should outline any semantic guarantees it does, or
> does not, make. Second, we should add a note on ordering guarantees in the
> two OOTB specs: for RocksDB we assert that range queries will honor
> serialized byte ordering, whereas the InMemory flavor gives no ordering
> guarantee whatsoever at this time.
>
> Thoughts?
>
> -Sophie
>
> On Thu, Jul 20, 2023 at 4:28 PM Almog Gavra  wrote:
>
> > Hi All,
> >
> > I would like to propose a KIP to expand support for default store types
> > (KIP-591) to encompass custom store implementations:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types
> >
> > Looking forward to your feedback!
> >
> > Cheers,
> > Almog
> >
>


ReadOnlyKeyValueStore#range() Semantics

2023-07-20 Thread Colt McNealy
Hi all,

The [current documentation](
https://kafka.apache.org/35/javadoc/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.html#range(K,K)
)  for ReadOnlyKeyValueStore#range() states that:

> Order is not guaranteed as bytes lexicographical ordering might not
represent key order.

That makes sense—a the ordering of the two keys inserted via `store.put()`
as determined by the `compareTo()` method is not what determines the
ordering in the store; rather, it's the compareTo() of the serialized
byte[] array that matters.

Some observations after playing with it for over a year:

A ) The behavior when you open a store for IQ and don't specify a specific
partition is that (behind the scenes) a store is opened for one partition,
and when that store is exhausted, then the next partition is opened. No
guarantees about which partition is opened in what order. As such, if you
just System.out.println() all the keys from the iterator, they are not
ordered properly.

B) WITHIN a partition, such as if you do a .withPartition() when requesting
the ReadOnlyKeyValueStore, keys are indeed ordered properly according to
the bytes produced by the key serializer.

We at LittleHorse rely upon that behavior for API pagination, and if that
behavior were to change it would break some things.

After some digging, it turns out that the reason why we *do* indeed get
lexicographical ordering of results according to the byte[] array of the
keys is because that is a public contract exposed by RocksDB.

I had asked Matthias offline if it would be possible to open a PR to
clarify on the documentation that all results *within a partition of the
Store* are ordered by the byte[] representation of the key, since I would
feel more comfortable relying upon a publicly documented API.

However, there are a few counterpoints to this:

- ReadOnlyKeyValueStore is an *interface*, not an implementation. The
lexicographical ordering is something we observe from the RocksDB
implementation. If the store were implemented with, for example, a HashMap,
this would not work.

- The semantics of ordering thus seem to be more associated with the
*implementation* rather than with the *interface*.

- Is it possible at all to add a clarification on the RocksDB store that
this behavior is a guarantee? Would that require a KIP?

I'd be super-happy if I could open a PR to put a public documentation note
somewhere on some implementation of a State Store that documents that this
ordering by byte[] representation is guaranteed for range scans, but I do
recognize that making a public documentation note is a contract, and as
such may require a KIP and/or not be accepted.

Any thoughts?

Thanks for reading,
Colt McNealy

*Founder, LittleHorse.dev*


Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

2023-06-01 Thread Colt McNealy
Hi all,

I've got a rather naive question here. The spiritual goal of this KIP is to
reduce cross-rack traffic (normally, this manifests itself in terms of a
higher AWS/Azure bill as cloud providers charge for cross-AZ traffic).

To generalize, most deployments have three racks, RF=3, and one replica for
each partition in each rack. Therefore, in the steady state (absent any
cluster anomalies such as broker failure, etc) we are pretty confident that
there should be a replica for every partition (input, changelog,
repartition, output topic) on the same rack as a given Streams instance.

Why not just let Sophie's High-Availability Task Assignor do its thing, and
then *set the preferred leader* for each replica to a broker in the same
rack/AZ as the Active Task? This would solve two problems:

1. The current KIP can't make any improvements in the case where a Task has
three involved partitions (eg. input, changelog, output) and the leader for
each partition is in a different rack. With this approach, we could get
pretty close to having zero cross-AZ traffic in a healthy cluster.
2. There needs to be a lot of work done to balance availability, data
movement, and cross-AZ traffic in the current proposal. My proposal doesn't
actually involve any additional data movement; simply reassignment of
partition leadership.

The biggest argument against this proposal is that there could be two
Streams apps using the same topic, which would cause some bickering.
Secondly, some have observed that changing partition leadership can trigger
ProducerFencedExceptions in EOS, which causes a state restoration.

Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jun 1, 2023 at 10:02 AM Hao Li  wrote:

> Hi Bruno,
>
> dropping config rack.aware.assignment.enabled
> and add value NONE to the enum for the possible values of config
> rack.aware.assignment.strategy sounds good to me.
>
> On Thu, Jun 1, 2023 at 12:39 AM Bruno Cadonna  wrote:
>
> > Hi Hao,
> >
> > Thanks for the updates!
> >
> > What do you think about dropping config rack.aware.assignment.enabled
> > and add value NONE to the enum for the possible values of config
> > rack.aware.assignment.strategy?
> >
> > Best,
> > Bruno
> >
> > On 31.05.23 23:31, Hao Li wrote:
> > > Hi all,
> > >
> > > I've updated the KIP based on the feedback. Major changes I made:
> > > 1. Add rack aware assignment to `StickyTaskAssignor`
> > > 2. Reject `Prefer reliability and then find optimal cost` option in
> > standby
> > > task assignment.
> > >
> > >
> > > On Wed, May 31, 2023 at 12:09 PM Hao Li  wrote:
> > >
> > >> Hi all,
> > >>
> > >> Thanks for the feedback! I will update the KIP accordingly.
> > >>
> > >> *For Sophie's comments:*
> > >>
> > >> 1 and 2. Good catch. Fixed these.
> > >>
> > >> 3 and 4 Yes. We can make this public config and call out the
> > >> clientConsumer config users need to set.
> > >>
> > >> 5. It's ideal to take the previous assignment in HAAssignor into
> > >> consideration when we compute our target assignment, the complications
> > come
> > >> with making sure the assignment can eventually converge and we don't
> do
> > >> probing rebalance infinitely. It's not only about storing the previous
> > >> assignment or get it somehow. We can actually get the previous
> > assignment
> > >> now like we do in StickyAssignor. But the previous assignment will
> > change
> > >> in each round of probing rebalance. The proposal which added some
> > weight to
> > >> make the rack aware assignment lean towards the original HAA's target
> > >> assignment will add benefits of stability in some corner cases in case
> > of
> > >> tie in cross rack traffic cost. But it's not sticky. But the bottom
> > line is
> > >> it won't be worse than current HAA's stickiness.
> > >>
> > >> 6. I'm fine with changing the assignor config to public. Actually, I
> > think
> > >> we can min-cost algorithm with StickyAssignor as well to mitigate the
> > >> problem of 5. So we can have one public config to choose an assignor
> and
> > >> one public config to enable the rack aware assignment.
> > >>
> > >> *For Bruno's comments:*
> > >>
> > >> The proposal was to implement all the options and use configs to
> choose
> > >> them during runtime. We can make those configs public as suggested.
> > >> 1, 2, 3, 4, 5: agree and will fix those.
> > >> 6: s

Re: [VOTE] KIP-925: rack aware task assignment in Kafka Streams

2023-05-30 Thread Colt McNealy
+1 (non-binding)

Thank you Hao!

Colt McNealy

*Founder, LittleHorse.dev*


On Tue, May 30, 2023 at 9:50 AM Hao Li  wrote:

> Hi all,
>
> I'd like to open the vote for KIP-925: rack aware task assignment in Kafka
> Streams. The link for the KIP is
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> .
>
> --
> Thanks,
> Hao
>


Re: [DISCUSS] Cluster Linking / Cross-Cluster Replication - Call for Interest & Co-Authors

2023-05-19 Thread Colt McNealy
I'm highly interested in this feature. We are a startup right now so we
can't commit resources to help *yet*, but once we are off the ground in 1-2
years it would be cool to contribute to this.

Colt McNealy
*Founder, LittleHorse.dev*


On Fri, May 19, 2023 at 10:14 AM hzh0425  wrote:

> Hi, Grep Harris.
> When I saw this discussion, I was very excited.
> My team is a Kafka R team, currently planning research and development
> on cluster linking. We believe that it is of great value in scenarios such
> as multi region disaster recovery and cross cloud synchronization.
> Therefore, I hope to join the subsequent design and development of this
> KIP as a co-author for deep participation.
>
> Thanks!
> hzh
>
>
>
>
>
>  回复的原邮件 
> | 发件人 | Greg Harris |
> | 日期 | 2023年05月20日 00:56 |
> | 收件人 | dev@kafka.apache.org |
> | 抄送至 | |
> | 主题 | [DISCUSS] Cluster Linking / Cross-Cluster Replication - Call for
> Interest & Co-Authors |
> Hey all,
>
> I have heard some offline discussion around Cross-Cluster Replication,
> a feature which would enable data to be shared between Kafka clusters
> in a manner which preserves offsets.
> Other names for this feature may include "Cluster Linking", "Remote
> Topics", "Byte-for-Byte Mirroring", and similar.
> I believe that there may be an interest in this feature being added to
> Apache Kafka, but have not seen any mailing list activity to that
> effect yet.
>
> Due to the complexity of the feature, I think it is appropriate to
> gauge the interest in the feature before opening a KIP, and also
> gather a set of contributors which are interested in collaborating on
> and sponsoring its development.
>
> If you are a Kafka user, please:
> 1. Let us know in this thread if you think Cross-Cluster Replication
> is a valuable feature and would like to see it in Apache Kafka.
>
> If you are or want to be an individual contributor, please:
> 1. Discuss with your users and/or product organization if
> Cross-Cluster Replication is appealing, and summarize their level of
> interest in the below thread.
> 2. Consider volunteering in this thread as a reviewer or co-author of
> the KIP and subsequent implementation. Co-authors would share the
> design and implementation workload as their time and interest permits.
> 3. If you have an employer, request that they sponsor this feature
> through your full- or part-time contributions as a reviewer or
> co-author. In exchange for their sponsorship, they can help ensure
> that the feature is delivered with a more predictable timeline.
>
> Please save your technical expectations and details of this feature
> for a subsequent KIP and DISCUSS thread, where that discussion is more
> appropriate.
>
> Thanks Everyone!
>
> Greg Harris
> Aiven, Inc
>


Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

2023-05-09 Thread Colt McNealy
Hello Hao,

First of all, THANK YOU for putting this together. I had been hoping
someone might bring something like this forward. A few comments:

**1: Runtime Complexity
> Klein’s cycle canceling algorithm can solve the min-cost flow problem in
O(E^2CU) time where C is max cost and U is max capacity. In our particular
case, C is 1 and U is at most 3 (A task can have at most 3 topics including
changelog topic?). So the algorithm runs in O(E^2) time for our case.

A Task can have multiple input topics, and also multiple state stores, and
multiple output topics. The most common case is three topics as you
described, but this is not necessarily guaranteed. Also, math is one of my
weak points, but to me O(E^2) is equivalent to O(1), and I struggle to see
how the algorithm isn't at least O(N) where N is the number of Tasks...?

**2: Broker-Side Partition Assignments
Consider the case with just three topics in a Task (one input, one output,
one changelog). If all three partition leaders are in the same Rack (or
better yet, the same broker), then we could get massive savings by
assigning the Task to that Rack/availability zone. But if the leaders for
each partition are spread across multiple zones, how will you handle that?
Is that outside the scope of this KIP, or is it worth introducing a
kafka-streams-generate-rebalance-proposal.sh tool?

Colt McNealy
*Founder, LittleHorse.io*


On Tue, May 9, 2023 at 4:03 PM Hao Li  wrote:

> Hi all,
>
> I have submitted KIP-925 to add rack awareness logic in task assignment in
> Kafka Streams and would like to start a discussion:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
>
> --
> Thanks,
> Hao
>


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-30 Thread Colt McNealy
Nick,

That is a good point, the Checkpoint Files have been a part of the State
Store implementation for a long time and there might be some risk involved
in removing it. And yes, if you go with the idea of moving offset
management out of the state store itself, then you'll need to flush RocksDB
before updating the Checkpoint file.

As far as I understand it, the commit procedure would look like:

1.  Commit Kafka Txn
2.  Commit RocksDB.
2a Flush RocksDB
3.  Update Checkpoint File

Lastly, the online migration process just might be a bit simpler and easier
to test/verify if we don't need to add a column family and copy over the
value from the checkpoint files.

If you're able to resolve the issue with the StreamPartitionAssignor,
however, then I would be totally fine with moving offset management into
the state store, which would allow you to skip steps 2a and 3. But I'm not
a Committer, so you don't need to worry about my vote anyways (:

Thank you for your continued work on this!
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Apr 27, 2023 at 3:35 AM Nick Telford  wrote:

> Hi everyone,
>
> I find myself (again) considering removing the offset management from
> StateStores, and keeping the old checkpoint file system. The reason is that
> the StreamPartitionAssignor directly reads checkpoint files in order to
> determine which instance has the most up-to-date copy of the local state.
> If we move offsets into the StateStore itself, then we will need to open,
> initialize, read offsets and then close each StateStore (that is not
> already assigned and open) for which we have *any* local state, on every
> rebalance.
>
> Generally, I don't think there are many "orphan" stores like this sitting
> around on most instances, but even a few would introduce additional latency
> to an already somewhat lengthy rebalance procedure.
>
> I'm leaning towards Colt's (Slack) suggestion of just keeping things in the
> checkpoint file(s) for now, and not worrying about the race. The downside
> is that we wouldn't be able to remove the explicit RocksDB flush on-commit,
> which likely hurts performance.
>
> If anyone has any thoughts or ideas on this subject, I would appreciate it!
>
> Regards,
> Nick
>
> On Wed, 19 Apr 2023 at 15:05, Nick Telford  wrote:
>
> > Hi Colt,
> >
> > The issue is that if there's a crash between 2 and 3, then you still end
> > up with inconsistent data in RocksDB. The only way to guarantee that your
> > checkpoint offsets and locally stored data are consistent with each other
> > are to atomically commit them, which can be achieved by having the
> offsets
> > stored in RocksDB.
> >
> > The offsets column family is likely to be extremely small (one
> > per-changelog partition + one per Topology input partition for regular
> > stores, one per input partition for global stores). So the overhead will
> be
> > minimal.
> >
> > A major benefit of doing this is that we can remove the explicit calls to
> > db.flush(), which forcibly flushes memtables to disk on-commit. It turns
> > out, RocksDB memtable flushes are largely dictated by Kafka Streams
> > commits, *not* RocksDB configuration, which could be a major source of
> > confusion. Atomic checkpointing makes it safe to remove these explicit
> > flushes, because it no longer matters exactly when RocksDB flushes data
> to
> > disk; since the data and corresponding checkpoint offsets will always be
> > flushed together, the local store is always in a consistent state, and
> > on-restart, it can always safely resume restoration from the on-disk
> > offsets, restoring the small amount of data that hadn't been flushed when
> > the app exited/crashed.
> >
> > Regards,
> > Nick
> >
> > On Wed, 19 Apr 2023 at 14:35, Colt McNealy  wrote:
> >
> >> Nick,
> >>
> >> Thanks for your reply. Ack to A) and B).
> >>
> >> For item C), I see what you're referring to. Your proposed solution will
> >> work, so no need to change it. What I was suggesting was that it might
> be
> >> possible to achieve this with only one column family. So long as:
> >>
> >>- No uncommitted records (i.e. not committed to the changelog) are
> >>*committed* to the state store, AND
> >>- The Checkpoint offset (which refers to the changelog topic) is less
> >>than or equal to the last written changelog offset in rocksdb
> >>
> >> I don't see the need to do the full restoration from scratch. My
> >> understanding was that prior to 844/892, full restorations were required
> >> because there could be uncommitted records written to RocksDB; however,
> >> gi

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-19 Thread Colt McNealy
Nick,

Thanks for your reply. Ack to A) and B).

For item C), I see what you're referring to. Your proposed solution will
work, so no need to change it. What I was suggesting was that it might be
possible to achieve this with only one column family. So long as:

   - No uncommitted records (i.e. not committed to the changelog) are
   *committed* to the state store, AND
   - The Checkpoint offset (which refers to the changelog topic) is less
   than or equal to the last written changelog offset in rocksdb

I don't see the need to do the full restoration from scratch. My
understanding was that prior to 844/892, full restorations were required
because there could be uncommitted records written to RocksDB; however,
given your use of RocksDB transactions, that can be avoided with the
pattern of 1) commit Kafka transaction, 2) commit RocksDB transaction, 3)
update offset in checkpoint file.

Anyways, your proposed solution works equivalently and I don't believe
there is much overhead to an additional column family in RocksDB. Perhaps
it may even perform better than making separate writes to the checkpoint
file.

Colt McNealy
*Founder, LittleHorse.io*


On Wed, Apr 19, 2023 at 5:53 AM Nick Telford  wrote:

> Hi Colt,
>
> A. I've done my best to de-couple the StateStore stuff from the rest of the
> Streams engine. The fact that there will be only one ongoing (write)
> transaction at a time is not guaranteed by any API, and is just a
> consequence of the way Streams operates. To that end, I tried to ensure the
> documentation and guarantees provided by the new APIs are independent of
> this incidental behaviour. In practice, you're right, this essentially
> refers to "interactive queries", which are technically "read transactions",
> even if they don't actually use the transaction API to isolate themselves.
>
> B. Yes, although not ideal. This is for backwards compatibility, because:
> 1) Existing custom StateStore implementations will implement flush(),
> and not commit(), but the Streams engine now calls commit(), so those calls
> need to be forwarded to flush() for these legacy stores.
> 2) Existing StateStore *users*, i.e. outside of the Streams engine
> itself, may depend on explicitly calling flush(), so for these cases,
> flush() needs to be redirected to call commit().
> If anyone has a better way to guarantee compatibility without introducing
> this potential recursion loop, I'm open to changes!
>
> C. This is described in the "Atomic Checkpointing" section. Offsets are
> stored in a separate RocksDB column family, which is guaranteed to be
> atomically flushed to disk with all other column families. The issue of
> checkpoints being written to disk after commit causing inconsistency if it
> crashes in between is the reason why, under EOS, checkpoint files are only
> written on clean shutdown. This is one of the major causes of "full
> restorations", so moving the offsets into a place where they can be
> guaranteed to be atomically written with the data they checkpoint allows us
> to write the checkpoint offsets *on every commit*, not just on clean
> shutdown.
>
> Regards,
> Nick
>
> On Tue, 18 Apr 2023 at 15:39, Colt McNealy  wrote:
>
> > Nick,
> >
> > Thank you for continuing this work. I have a few minor clarifying
> > questions.
> >
> > A) "Records written to any transaction are visible to all other
> > transactions immediately." I am confused here—I thought there could only
> be
> > one transaction going on at a time for a given state store given the
> > threading model for processing records on a Task. Do you mean Interactive
> > Queries by "other transactions"? (If so, then everything makes sense—I
> > thought that since IQ were read-only then they didn't count as
> > transactions).
> >
> > B) Is it intentional that the default implementations of the flush() and
> > commit() methods in the StateStore class refer to each other in some sort
> > of unbounded recursion?
> >
> > C) How will the getCommittedOffset() method work? At first I thought the
> > way to do it would be using a special key in the RocksDB store to store
> the
> > offset, and committing that with the transaction. But upon second
> thought,
> > since restoration from the changelog is an idempotent procedure, I think
> it
> > would be fine to 1) commit the RocksDB transaction and then 2) write the
> > offset to disk in a checkpoint file. If there is a crash between 1) and
> 2),
> > I think the only downside is now we replay a few more records (at a cost
> of
> > <100ms). Am I missing something there?
> >
> > Other than that, everything makes sense to m

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-18 Thread Colt McNealy
Nick,

Thank you for continuing this work. I have a few minor clarifying questions.

A) "Records written to any transaction are visible to all other
transactions immediately." I am confused here—I thought there could only be
one transaction going on at a time for a given state store given the
threading model for processing records on a Task. Do you mean Interactive
Queries by "other transactions"? (If so, then everything makes sense—I
thought that since IQ were read-only then they didn't count as
transactions).

B) Is it intentional that the default implementations of the flush() and
commit() methods in the StateStore class refer to each other in some sort
of unbounded recursion?

C) How will the getCommittedOffset() method work? At first I thought the
way to do it would be using a special key in the RocksDB store to store the
offset, and committing that with the transaction. But upon second thought,
since restoration from the changelog is an idempotent procedure, I think it
would be fine to 1) commit the RocksDB transaction and then 2) write the
offset to disk in a checkpoint file. If there is a crash between 1) and 2),
I think the only downside is now we replay a few more records (at a cost of
<100ms). Am I missing something there?

Other than that, everything makes sense to me.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Tue, Apr 18, 2023 at 3:59 AM Nick Telford  wrote:

> Hi everyone,
>
> I've updated the KIP to reflect the latest version of the design:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>
> There are several changes in there that reflect feedback from this thread,
> and there's a new section and a bunch of interface changes relating to
> Atomic Checkpointing, which is the final piece of the puzzle to making
> everything robust.
>
> Let me know what you think!
>
> Regards,
> Nick
>
> On Tue, 3 Jan 2023 at 11:33, Nick Telford  wrote:
>
> > Hi Lucas,
> >
> > Thanks for looking over my KIP.
> >
> > A) The bound is per-instance, not per-Task. This was a typo in the KIP
> > that I've now corrected. It was originally per-Task, but I changed it to
> > per-instance for exactly the reason you highlighted.
> > B) It's worth noting that transactionality is only enabled under EOS, and
> > in the default mode of operation (ALOS), there should be no change in
> > behavior at all. I think, under EOS, we can mitigate the impact on users
> by
> > sufficiently low default values for the memory bound configuration. I
> > understand your hesitation to include a significant change of behaviour,
> > especially in a minor release, but I suspect that most users will prefer
> > the memory impact (under EOS) to the existing behaviour of frequent state
> > restorations! If this is a problem, the changes can wait until the next
> > major release. I'll be running a patched version of streams in production
> > with these changes as soon as they're ready, so it won't disrupt me :-D
> > C) The main purpose of this sentence was just to note that some changes
> > will need to be made to the way Segments are handled in order to ensure
> > they also benefit from transactions. At the time I wrote it, I hadn't
> > figured out the specific changes necessary, so it was deliberately vague.
> > This is the one outstanding problem I'm currently working on, and I'll
> > update this section with more detail once I have figured out the exact
> > changes required.
> > D) newTransaction() provides the necessary isolation guarantees. While
> > the RocksDB implementation of transactions doesn't technically *need*
> > read-only users to call newTransaction(), other implementations (e.g. a
> > hypothetical PostgresStore) may require it. Calling newTransaction() when
> > no transaction is necessary is essentially free, as it will just return
> > this.
> >
> > I didn't do any profiling of the KIP-844 PoC, but I think it should be
> > fairly obvious where the performance problems stem from: writes under
> > KIP-844 require 3 extra memory-copies: 1 to encode it with the
> > tombstone/record flag, 1 to decode it from the tombstone/record flag,
> and 1
> > to copy the record from the "temporary" store to the "main" store, when
> the
> > transaction commits. The different approach taken by KIP-869 should
> perform
> > much better, as it avoids all these copies, and may actually perform
> > slightly better than trunk, due to batched writes in RocksDB performing
> > better than non-batched writes.[1]
> >
> > Regards,
> > Nick
> >
> > 1:
> https://github.com/adamretter/rocksjava-write-methods-benchmark#results

Re: [VOTE] KIP-890: Transactions Server Side Defense

2023-01-10 Thread Colt McNealy
(non-binding) +1. Thank you for the KIP, Justine! I've read it; it makes
sense to me and I am excited for the implementation.

Colt McNealy
*Founder, LittleHorse.io*


On Tue, Jan 10, 2023 at 10:46 AM Justine Olshan
 wrote:

> Hi everyone,
>
> I would like to start a vote on KIP-890 which aims to prevent some of the
> common causes of hanging transactions and make other general improvements
> to transactions in Kafka.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
>
> Please take a look if you haven't already and vote!
>
> Justine
>


Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-12 Thread Colt McNealy
Patrick—Glad you got it cleared up, and good find re: empty byte[] vs null.

KAFKA-7663 is very interesting. Just yesterday I thought of a use-case
where supplying the processor for the Global State Store would be
useful—what if you have an in-memory object that computes and caches
aggregations of what's in the state store. You can derive that view by
querying the state store, but such queries are expensive; it's far better
to have them cached in an in-memory POJO. But to keep that POJO up-to-date,
you need to be alerted every time an event comes into the state store.

I think the best way to implement that (if I were to submit a KIP) would be
to:

   - Deprecate the ability to add a processor
   - Add an optional "onChange" callback that is called every time a new
   record is processed.

There's lots of details to be ironed out; and furthermore this is a big API
change so it would be slow to implement.

Colt McNealy
*Founder, LittleHorse.io*


On Mon, Dec 12, 2022 at 3:30 AM Patrick D’Addona
 wrote:

> No it does not encrypt the keys. And it works fine for key like "bar"
> where the latest record on the topic is not a tombstone.
>
> But that got me thinking about how the values are actually written and
> read from kafka and I found the issue in my case, it was related to the
> serializer not writing actual "null" values onto the topic, but empty
> byte[] arrays instead.
> The serializer I used looks like this
> ```java
> public byte[] serialize(final String s, final T data) {
> if (data == null) {
> return new byte[0];
> }
> try {
> return objectMapper.writeValueAsBytes(data);
> } catch (final IOException e) {
> throw new SerializationException(e);
> }
> }
> ```
>
> And then during restore in
> `org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState`
> ```java
> for (final ConsumerRecord record :
> records.records(topicPartition)) {
> if (record.key() != null) {
> restoreRecords.add(recordConverter.convert(record));
> }
> }
> ```
> using
> `org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE`
> that empty array is turned into an 8 byte timestamp
>
> ```java
> it actually checks the value for `null` but not an empty array
> final byte[] recordValue = rawValue == null ? null :
> ByteBuffer.allocate(8 + rawValue.length)
> .putLong(timestamp)
> .put(rawValue)
> .array();
> ```
>
> that is then passed to
> `org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch`
> and gets "put" instead of "delete" because it's not null
> ```java
> public void addToBatch(final byte[] key,
>final byte[] value,
>final WriteBatch batch) throws RocksDBException {
> if (value == null) {
> batch.delete(columnFamily, key);
> } else {
>     batch.put(columnFamily, key, value);
> }
> }
> ```
>
> That's why `store.get("foo")` gives "null" because it actually finds the
> empty "byte[]" record and my Deserializer turns it into null.
> So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I
> just did not see it until I found that the values on the topic which akhq
> shows as "null" in the visualization and are treated like null everywhere
> in my applicaton are not actually real tombstones to RocksDB and the
> restore process.
>
> So this thread can be closed, since there is nothing in the way kafka
> streams behaves here, just another implication of KAFKA-7663 that might
> confuse users.
> But also has the chance to lead them onto very interesting deep dives into
> kafka streams ;-)
>
> Thanks for all the responses!
> Kind regards,
> Patrick
>
> 
> From: Colt McNealy 
> Sent: Saturday, December 10, 2022 00:33
> To: dev@kafka.apache.org 
> Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> restores previously deleted records
>
> Does the KeySerde that you provided to your store encrypt the keys? I've
> never done so myself, but I've seen others report similar behavior (the
> store iterator shows the correct values but store.get('foo') returns null)
> in the Confluent Community slack. Here's a relevant message:
>
> > "From the behaviour in your code snippet above, I would say that the key
> is stored in encrypted form. The deserializer decrypts it correctly (thus
> you see the key and value while iterating). But when you requests the key
> individually you are passing it in plain text (the serializer might not be
> encrypting) and it’s not found

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-09 Thread Colt McNealy
Does the KeySerde that you provided to your store encrypt the keys? I've
never done so myself, but I've seen others report similar behavior (the
store iterator shows the correct values but store.get('foo') returns null)
in the Confluent Community slack. Here's a relevant message:

> "From the behaviour in your code snippet above, I would say that the key
is stored in encrypted form. The deserializer decrypts it correctly (thus
you see the key and value while iterating). But when you requests the key
individually you are passing it in plain text (the serializer might not be
encrypting) and it’s not found in the keystore."

I can't help too much beyond that; but you may want to look into that issue.

Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 11:51 PM Patrick D’Addona
 wrote:

> > In your case you also delete if the value is not null and if the value
> not-equals "deleteme", right? Ie, you use non-tombstone records as deletes
> what is just not allowed/supported.
>
> The "deleteme" String was only for testing, the issue also happens without
> it, i.e. if there is a "real" tombstone with `value == null` on the input
> topic.
> I do use the input topic as a changelog for my global table. tombstones
> are sent directly to that topic from a kafka streams operation before the
> actual store.
>
> > I cannot explain why all() and get(key) actually give you different
> result with respect to `key`. If a key is resurrected during a restore,
> both method should return it. Not sure why `get(key)` returns `null`
> even if `all()` contains the key... I would rather expect that both
> return the resurrected key.
>
> That's why I think this is different from KAFKA-7663.
> The **foo.bar.globaltopic** topic currently looks like this
> |timestamp|key|value|
> |2022-08-10T14:23:51.768|foo|foo|
> |2022-08-10T14:23:51.836|foo|foo|
> |2022-08-10T14:23:52.126|bar|bar|
> |2022-08-10T14:23:52.398|foo|foo|
> |2022-08-10T14:23:53.353|bar|bar|
> |2022-08-10T14:23:53.098|foo||
> |2022-08-10T14:23:54.367|bar|bar|
>
> After I delete the kafka-streams.state.dir and restart the application, I
> get
> store.get("foo") -> null
> store.get("bar") -> "bar"
> store.all() -> "foo" and "bar"
>
> Hope that explains it better.
>
> - Patrick
>
>
>
>
> Patrick D’Addona
> Senior Lead IT Architect
>
>
> Mobile: +49 151 544 22 161
> patrick.dadd...@maibornwolff.de
> Theresienhöhe 13, 80339 München
>
> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
> www.maibornwolff.de, Phone +49 89 544 253 000
> USt-ID DE 129 299 525, Munich District Court HRB 98058
> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
> 
>
> 
> From: Matthias J. Sax 
> Sent: Friday, December 9, 2022 01:11
> To: dev@kafka.apache.org 
> Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> restores previously deleted records
>
> > The way I see it, KAFKA-7663 says, "a global store will be exactly the
> input topic after restore, regardless of the processor"
>
> Not sure what you mean by this? The issue the tickets describe is, that
> if you don't do a plain `put(key,value)` in your processor, stuff breaks
> right now. (Note that `delete(key)` and `put(key,null)` is the same).
>
>
> It's a known issue, bad API, and also bad documentation on our side, and
> I guess you can call it a bug if you wish. However, you can only use
> tombstones as deletes right now. Thus, what you do "wrong" is
>
> > if (record.value() == null == record.value().equals("deleteme")) {
> > store.delete(record.key());
> > }
>
> In your case you also delete if the value is not null and if the value
> not-equals "deleteme", right? Ie, you use non-tombstone records as
> deletes what is just not allowed/supported.
>
> The issue is that during restore only `null` values, ie, actual
> tombstones are handled as deletes and thus, if you delete a key using a
> non-tombstone record in your processor, this key can be resurrected
> during restore.
>
>
> I cannot explain why all() and get(key) actually give you different
> result with respect to `key`. If a key is resurrected during a restore,
> both method should return it. Not sure why `get(key)` returns `null`
> even if `all()` contains the key... I would rather expect that both
> return the resurrected key.
>
> Hope this helps.
>
>
> -Matthias
>
>
> On 12/8/22 12:00 PM,

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Colt McNealy
Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
 wrote:

> Hello,
>
> I have a quarkus application using
> **org.apache.kafka:kafka-streams:3.1.0** and found that
> * when creating a global table using a compacted topic as input
> * entries that have been deleted at some point
> * are then no longer returned when iterating over the store with
> **store.all()** - as expected
> * but after the pod restarts and its kafka streams state directory is
> deleted, after restoring from the topic using
> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
> * those formerly deleted records are once again returned by that store
> when using **store.all()** - not expected
> * however they return null, using **store.get("foo")** - as expected
>
> This is somewhat similar to
> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to
> be able to modify this restore behaviour.
> However it is also different, because I think it is not documented
> anywhere and it is unintuitive (to me) - since it changes how the
> application behaves after restarting it even if the kafka cluster itself
> was not changed - so I think it's more of a bug than missing documentation.
>
> Some more information, the topic is configured like this
> ```java
> cleanup.policy: compact
> compression.type: producer
> delete.retention.ms: 8640
> max.compaction.lag.ms: 9223372036854776000
> min.compaction.lag.ms: 0
> retention.bytes: -1
> retention.ms: 8640
> ```
>
> I am adding the global store like so
> ```java
> streamsBuilder.addGlobalStore(
> Stores.timestampedKeyValueStoreBuilder(
> Stores.persistentTimestampedKeyValueStore("foobar"),
> Serdes.String(),
> Serdes.String()),
> "foo.bar.globaltopic",
> Consumed.with(Serdes.String(), Serdes.String()),
> () -> new FooBarUpdateHandler(timeService)
> );
> ```
>
> and here is the definition of 'FooBarUpdateHandler'
> ```java
> import java.time.Instant;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.kafka.streams.processor.api.Processor;
> import org.apache.kafka.streams.processor.api.Record;
> import org.apache.kafka.streams.state.KeyValueIterator;
> import org.apache.kafka.streams.state.TimestampedKeyValueStore;
> import org.apache.kafka.streams.state.ValueAndTimestamp;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> /**
>  * Internal class handling partFamily updates.
>  */
> public class FooBarUpdateHandler implements Processor Void, Void> {
>
> private static final Logger logger =
> LoggerFactory.getLogger(FooBarUpdateHandler.class);
> private TimestampedKeyValueStore store;
>
> @Override
> public void init(final
> org.apache.kafka.streams.processor.api.ProcessorContext
> context) {
> store = context.getStateStore("foobar");
> }
>
> @Override
> public void process(final Record record) {
>
> // handle tombstones from input topic
> if (record.value() == null == record.value().equals("deleteme")) {
> store.delete(record.key());
> } else {
> store.put(
>  

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-12-06 Thread Colt McNealy
Nick,

Thank you for the reply; that makes sense. I was hoping that, since reading
uncommitted records from IQ in EOS isn't part of the documented API, maybe
you *wouldn't* have to wait for the next major release to make that change;
but given that it would be considered a major change, I like your approach
the best.

Wishing you a speedy recovery and happy coding!

Thanks,
Colt McNealy
*Founder, LittleHorse.io*


On Tue, Dec 6, 2022 at 10:30 AM Nick Telford  wrote:

> Hi Colt,
>
> 10: Yes, I agree it's not ideal. I originally intended to try to keep the
> behaviour unchanged as much as possible, otherwise we'd have to wait for a
> major version release to land these changes.
> 20: Good point, ALOS doesn't need the same level of guarantee, and the
> typically longer commit intervals would be problematic when reading only
> "committed" records.
>
> I've been away for 5 days recovering from minor surgery, but I spent a
> considerable amount of that time working through ideas for possible
> solutions in my head. I think your suggestion of keeping ALOS as-is, but
> buffering writes for EOS is the right path forwards, although I have a
> solution that both expands on this, and provides for some more formal
> guarantees.
>
> Essentially, adding support to KeyValueStores for "Transactions", with
> clearly defined IsolationLevels. Using "Read Committed" when under EOS, and
> "Read Uncommitted" under ALOS.
>
> The nice thing about this approach is that it gives us much more clearly
> defined isolation behaviour that can be properly documented to ensure users
> know what to expect.
>
> I'm still working out the kinks in the design, and will update the KIP when
> I have something. The main struggle is trying to implement this without
> making any major changes to the existing interfaces or breaking existing
> implementations, because currently everything expects to operate directly
> on a StateStore, and not a Transaction of that store. I think I'm getting
> close, although sadly I won't be able to progress much until next week due
> to some work commitments.
>
> Regards,
> Nick
>
> On Thu, 1 Dec 2022 at 00:01, Colt McNealy  wrote:
>
> > Nick,
> >
> > Thank you for the explanation, and also for the updated KIP. I am quite
> > eager for this improvement to be released as it would greatly reduce the
> > operational difficulties of EOS streams apps.
> >
> > Two questions:
> >
> > 10)
> > >When reading records, we will use the
> > WriteBatchWithIndex#getFromBatchAndDB
> >  and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure
> > that uncommitted writes are available to query.
> > Why do extra work to enable the reading of uncommitted writes during IQ?
> > Code complexity aside, reading uncommitted writes is, in my opinion, a
> > minor flaw in EOS IQ; it would be very nice to have the guarantee that,
> > with EOS, IQ only reads committed records. In order to avoid dirty reads,
> > one currently must query a standby replica (but this still doesn't fully
> > guarantee monotonic reads).
> >
> > 20) Is it also necessary to enable this optimization on ALOS stores? The
> > motivation of KIP-844 was mainly to reduce the need to restore state from
> > scratch on unclean EOS shutdowns; with ALOS it was acceptable to accept
> > that there may have been uncommitted writes on disk. On a side note, if
> you
> > enable this type of store on ALOS processors, the community would
> > definitely want to enable queries on dirty reads; otherwise users would
> > have to wait 30 seconds (default) to see an update.
> >
> > Thank you for doing this fantastic work!
> > Colt McNealy
> > *Founder, LittleHorse.io*
> >
> >
> > On Wed, Nov 30, 2022 at 10:44 AM Nick Telford 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I've drastically reduced the scope of this KIP to no longer include the
> > > StateStore management of checkpointing. This can be added as a KIP
> later
> > on
> > > to further optimize the consistency and performance of state stores.
> > >
> > > I've also added a section discussing some of the concerns around
> > > concurrency, especially in the presence of Iterators. I'm thinking of
> > > wrapping WriteBatchWithIndex with a reference-counting copy-on-write
> > > implementation (that only makes a copy if there's an active iterator),
> > but
> > > I'm open to suggestions.
> > >
> > > Regards,
> > > Nick
> > >
> > > On Mon, 28 Nov 2022 at 16:36, Nick Telford 
> > wrote:
> >

Re: BUG: eos KeyValueStore::delete() in Punctuator

2022-12-05 Thread Colt McNealy
I re-compiled with the current `trunk` branch and the bug was fixed. Thank
you for pointing that out, Matthias, and sorry for the false alarm!

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Mon, Dec 5, 2022 at 7:42 PM Matthias J. Sax  wrote:

> Thanks for reporting this issue.
>
> It might have been fixed via
> https://issues.apache.org/jira/browse/KAFKA-14294 already.
>
>
> -Matthias
>
>
>
> On 12/3/22 7:05 PM, Colt McNealy wrote:
> > Hi all,
> >
> > I believe I've found a bug in Kafka Streams when:
> >
> > - Running an app in EOS
> > - Calling KeyValueStore::delete(...) on a nonexistent key
> > - Can cause a ProducerFencedException
> >
> > The expected behavior is that the call to delete() returns null (as per
> the
> > javadoc) and doesn't cause a ProducerFencedException.
> >
> > I've created a minimally reproducible example which reliably produces the
> > bug on my own environment at this repository:
> >
> > https://github.com/littlehorse-eng/kafka-punctuator-fencing-issue
> >
> > Could someone please take a look and let me know if you can reliably
> > reproduce it on your end as well, and if so, how to file a bug?
> >
> > Thank you,
> > Colt McNealy
> > *Founder, LittleHorse.io*
> >
>


BUG: eos KeyValueStore::delete() in Punctuator

2022-12-03 Thread Colt McNealy
Hi all,

I believe I've found a bug in Kafka Streams when:

- Running an app in EOS
- Calling KeyValueStore::delete(...) on a nonexistent key
- Can cause a ProducerFencedException

The expected behavior is that the call to delete() returns null (as per the
javadoc) and doesn't cause a ProducerFencedException.

I've created a minimally reproducible example which reliably produces the
bug on my own environment at this repository:

https://github.com/littlehorse-eng/kafka-punctuator-fencing-issue

Could someone please take a look and let me know if you can reliably
reproduce it on your end as well, and if so, how to file a bug?

Thank you,
Colt McNealy
*Founder, LittleHorse.io*


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-30 Thread Colt McNealy
Nick,

Thank you for the explanation, and also for the updated KIP. I am quite
eager for this improvement to be released as it would greatly reduce the
operational difficulties of EOS streams apps.

Two questions:

10)
>When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB
 and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure
that uncommitted writes are available to query.
Why do extra work to enable the reading of uncommitted writes during IQ?
Code complexity aside, reading uncommitted writes is, in my opinion, a
minor flaw in EOS IQ; it would be very nice to have the guarantee that,
with EOS, IQ only reads committed records. In order to avoid dirty reads,
one currently must query a standby replica (but this still doesn't fully
guarantee monotonic reads).

20) Is it also necessary to enable this optimization on ALOS stores? The
motivation of KIP-844 was mainly to reduce the need to restore state from
scratch on unclean EOS shutdowns; with ALOS it was acceptable to accept
that there may have been uncommitted writes on disk. On a side note, if you
enable this type of store on ALOS processors, the community would
definitely want to enable queries on dirty reads; otherwise users would
have to wait 30 seconds (default) to see an update.

Thank you for doing this fantastic work!
Colt McNealy
*Founder, LittleHorse.io*


On Wed, Nov 30, 2022 at 10:44 AM Nick Telford 
wrote:

> Hi everyone,
>
> I've drastically reduced the scope of this KIP to no longer include the
> StateStore management of checkpointing. This can be added as a KIP later on
> to further optimize the consistency and performance of state stores.
>
> I've also added a section discussing some of the concerns around
> concurrency, especially in the presence of Iterators. I'm thinking of
> wrapping WriteBatchWithIndex with a reference-counting copy-on-write
> implementation (that only makes a copy if there's an active iterator), but
> I'm open to suggestions.
>
> Regards,
> Nick
>
> On Mon, 28 Nov 2022 at 16:36, Nick Telford  wrote:
>
> > Hi Colt,
> >
> > I didn't do any profiling, but the 844 implementation:
> >
> >- Writes uncommitted records to a temporary RocksDB instance
> >   - Since tombstones need to be flagged, all record values are
> >   prefixed with a value/tombstone marker. This necessitates a memory
> copy.
> >- On-commit, iterates all records in this temporary instance and
> >writes them to the main RocksDB store.
> >- While iterating, the value/tombstone marker needs to be parsed and
> >the real value extracted. This necessitates another memory copy.
> >
> > My guess is that the cost of iterating the temporary RocksDB store is the
> > major factor, with the 2 extra memory copies per-Record contributing a
> > significant amount too.
> >
> > Regards,
> > Nick
> >
> > On Mon, 28 Nov 2022 at 16:12, Colt McNealy  wrote:
> >
> >> Hi all,
> >>
> >> Out of curiosity, why does the performance of the store degrade so
> >> significantly with the 844 implementation? I wouldn't be too surprised
> by
> >> a
> >> 50-60% drop (caused by each record being written twice), but 96% is
> >> extreme.
> >>
> >> The only thing I can think of which could create such a bottleneck would
> >> be
> >> that perhaps the 844 implementation deserializes and then re-serializes
> >> the
> >> store values when copying from the uncommitted to committed store, but I
> >> wasn't able to figure that out when I scanned the PR.
> >>
> >> Colt McNealy
> >> *Founder, LittleHorse.io*
> >>
> >>
> >> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford 
> >> wrote:
> >>
> >> > Hi everyone,
> >> >
> >> > I've updated the KIP to resolve all the points that have been raised
> so
> >> > far, with one exception: the ALOS default commit interval of 5 minutes
> >> is
> >> > likely to cause WriteBatchWithIndex memory to grow too large.
> >> >
> >> > There's a couple of different things I can think of to solve this:
> >> >
> >> >- We already have a memory/record limit in the KIP to prevent OOM
> >> >errors. Should we choose a default value for these? My concern here
> >> is
> >> > that
> >> >anything we choose might seem rather arbitrary. We could change
> >> >its behaviour such that under ALOS, it only triggers the commit of
> >> the
> >> >StateStore, but under EOS, it triggers a commit of the Kafka
> >> > transaction.

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-28 Thread Colt McNealy
Hi all,

Out of curiosity, why does the performance of the store degrade so
significantly with the 844 implementation? I wouldn't be too surprised by a
50-60% drop (caused by each record being written twice), but 96% is extreme.

The only thing I can think of which could create such a bottleneck would be
that perhaps the 844 implementation deserializes and then re-serializes the
store values when copying from the uncommitted to committed store, but I
wasn't able to figure that out when I scanned the PR.

Colt McNealy
*Founder, LittleHorse.io*


On Mon, Nov 28, 2022 at 7:56 AM Nick Telford  wrote:

> Hi everyone,
>
> I've updated the KIP to resolve all the points that have been raised so
> far, with one exception: the ALOS default commit interval of 5 minutes is
> likely to cause WriteBatchWithIndex memory to grow too large.
>
> There's a couple of different things I can think of to solve this:
>
>- We already have a memory/record limit in the KIP to prevent OOM
>errors. Should we choose a default value for these? My concern here is
> that
>anything we choose might seem rather arbitrary. We could change
>its behaviour such that under ALOS, it only triggers the commit of the
>StateStore, but under EOS, it triggers a commit of the Kafka
> transaction.
>- We could introduce a separate `checkpoint.interval.ms` to allow ALOS
>to commit the StateStores more frequently than the general
>commit.interval.ms? My concern here is that the semantics of this
> config
>would depend on the processing.mode; under ALOS it would allow more
>frequently committing stores, whereas under EOS it couldn't.
>
> Any better ideas?
>
> On Wed, 23 Nov 2022 at 16:25, Nick Telford  wrote:
>
> > Hi Alex,
> >
> > Thanks for the feedback.
> >
> > I've updated the discussion of OOM issues by describing how we'll handle
> > it. Here's the new text:
> >
> > To mitigate this, we will automatically force a Task commit if the total
> >> uncommitted records returned by
> >> StateStore#approximateNumUncommittedEntries()  exceeds a threshold,
> >> configured by max.uncommitted.state.entries.per.task; or the total
> >> memory used for buffering uncommitted records returned by
> >> StateStore#approximateNumUncommittedBytes() exceeds the threshold
> >> configured by max.uncommitted.state.bytes.per.task. This will roughly
> >> bound the memory required per-Task for buffering uncommitted records,
> >> irrespective of the commit.interval.ms, and will effectively bound the
> >> number of records that will need to be restored in the event of a
> failure.
> >>
> >
> >
> > These limits will be checked in StreamTask#process and a premature commit
> >> will be requested via Task#requestCommit().
> >>
> >
> >
> > Note that these new methods provide default implementations that ensure
> >> existing custom stores and non-transactional stores (e.g.
> >> InMemoryKeyValueStore) do not force any early commits.
> >
> >
> > I've chosen to have the StateStore expose approximations of its buffer
> > size/count instead of opaquely requesting a commit in order to delegate
> the
> > decision making to the Task itself. This enables Tasks to look at *all*
> of
> > their StateStores, and determine whether an early commit is necessary.
> > Notably, it enables pre-Task thresholds, instead of per-Store, which
> > prevents Tasks with many StateStores from using much more memory than
> Tasks
> > with one StateStore. This makes sense, since commits are done by-Task,
> not
> > by-Store.
> >
> > Prizes* for anyone who can come up with a better name for the new config
> > properties!
> >
> > Thanks for pointing out the potential performance issues of WBWI. From
> the
> > benchmarks that user posted[1], it looks like WBWI still performs
> > considerably better than individual puts, which is the existing design,
> so
> > I'd actually expect a performance boost from WBWI, just not as great as
> > we'd get from a plain WriteBatch. This does suggest that a good
> > optimization would be to use a regular WriteBatch for restoration (in
> > RocksDBStore#restoreBatch), since we know that those records will never
> be
> > queried before they're committed.
> >
> > 1:
> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >
> > * Just kidding, no prizes, sadly.
> >
> > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov
> >  wrote:
> >
> >> Hey Nick,
> >>
> >> Thank you for the KIP! With such a significant performance degradati

Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-21 Thread Colt McNealy
Sophie,

Regarding item "3" (my last paragraph from the previous email), perhaps I
should give a more general example now that I've had more time to clarify
my thoughts:

In some stateful applications, certain keys have to be findable without any
information about when the relevant data was created. For example, if I'm
running a word-count app and I want to use Interactive Queries to find the
count for "foo", I would need to know whether "foo" first arrived before or
after time T before I could find the correct partition to look up the data.
In this case, I don't think static partitioning is possible. Is this
use-case a non-goal of the KIP, or am I missing something?

Colt McNealy
*Founder, LittleHorse.io*


On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman
 wrote:

> Thanks for the responses guys! I'll get the easy stuff out of the way
> first:
>
> 1) Fixed the KIP so that StaticStreamPartitioner extends StreamPartitioner
> 2) I totally agree with you Colt, the record value might have valuable (no
> pun) information
> in it that is needed to compute the partition without breaking the static
> constraint. As in my
> own example earlier, maybe the userId is a field in the value and not the
> key itself. Actually
> it was that exact thought that made me do a U-turn on this but I forgot to
> update the thread
> 3) Colt, I'm not  sure I follow what you're trying to say in that last
> paragraph, can you expand?
> 4) Lucas, it's a good question as to what kind of guard-rails we could put
> up to enforce or even
> detect a violation of static partitioning. Most likely Streams would need
> to track every key to
> partition mapping in an internal state store, but we have no guarantee the
> key space is bounded
> and the store wouldn't grow out of control. Mostly however I imagine users
> would be frustrated
> to find out there's a secret, extra state store taking up space when you
> enable autoscaling, and
> it's not even to provide functionality but just to make sure users aren't
> doing something wrong.
>
> I wish I had a better idea, but sadly I think the only practical solution
> here is to try and make this
> condition as clear and obvious and easy to understand as possible, perhaps
> by providing an
> example of what does and does not satisfy the constraint in the javadocs.
> I'll work on that
> 5) I covered a bit above the impracticality of storing a potentially
> unbounded keyspace, which
> as you mention would need to be shared by all partitioners as well, so I
> would agree that this
> feels insurmountable. I'm leaning towards only enabling this feature for
> the static partitioning
> case at least in the first iteration, and we can see how things go from
> there -- for example, are
> people generally able to implement it correctly? If we find that the
> feature is working well and
> users are hungry for more, then it would be relatively straightforward to
> open things up to
> stateless applications, or even stateful applications which can withstand
> some "blips" in the
> logic/correctness.
>
> That said, *technically* the feature would be able to be turned on for any
> such case as it is, since
> as discussed above it's difficult to place true guardrails around the
> feature that can enforce
> static partitioning. Perhaps we could put a short note in the
> StaticStreamPartitioner docs that
> explain how and when it's safe to break the static requirement, but that we
> recommend against
> doing so..
>
> Thoughts?
>
> -Sophie
>
> On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy  wrote:
>
> > Sophie,
> >
> > Thank you for your detailed response. That makes sense (one partition per
> > user seems like a lot of extra metadata if you've got millions of users,
> > but I'm guessing that was just for illustrative purposes).
> >
> > In this case I'd like to question one small detail in your kip. The
> > StaticPartitioner takes in just the key and not the value...in an
> > application I've been working on, the "value" is a long-lived entity
> > (spanning hundreds of records over several days) that has timestamp
> > information about the creation of the entity inside of it. The ID itself
> is
> > provided by the end-user of the system and as such isn't guaranteed to
> have
> > timestamp info.
> >
> > This is quite a corner case, but if the StaticStreamPartitioner interface
> > were allowed to peak at the record value, it would be trivial to
> implement
> > logic as follows:
> > ```
> > entity = deserialize(record.value())
> >
> > if entity.created_before(T):
> >   return hash(key) % old_partitions
> > else:
> >   return h

Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-20 Thread Colt McNealy
Sophie,

Thank you for your detailed response. That makes sense (one partition per
user seems like a lot of extra metadata if you've got millions of users,
but I'm guessing that was just for illustrative purposes).

In this case I'd like to question one small detail in your kip. The
StaticPartitioner takes in just the key and not the value...in an
application I've been working on, the "value" is a long-lived entity
(spanning hundreds of records over several days) that has timestamp
information about the creation of the entity inside of it. The ID itself is
provided by the end-user of the system and as such isn't guaranteed to have
timestamp info.

This is quite a corner case, but if the StaticStreamPartitioner interface
were allowed to peak at the record value, it would be trivial to implement
logic as follows:
```
entity = deserialize(record.value())

if entity.created_before(T):
  return hash(key) % old_partitions
else:
  return hash(key) % new_partitions
```

That said, you're a rockstar architect and have seen a lot more system
design than I have (I'm 23 and only 3 years out of school...you implemented
cooperative rebalancing ). So don't make that decision unless you can see
other use-cases where it is appropriate.

Additionally, for my own use-case I'm not sure if static partitioning alone
(as opposed to re-partitioning and re-playing the changelogs into new
stores) would enable auto-scaleout because my system uses Kafka Streams as
the data store *and* a secondary index...for example, when a user wants to
look up all entities where the variable `user_email==f...@bar.com`, we have
an index store that has keys partitioned by and prefixed with `user_email==
f...@bar.com`. Entities with that email (for example) could come before or
after time T.

Anyways, that's just my twopence, if I were a voting committer I'd vote for
this KIP as-is.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman
 wrote:

> Thanks for your questions, I would say that your understanding sounds
> correct based
> on what you described but I'll try to add some clarity. The basic idea is
> that, as you said,
> any keys that are processed before time T will go to partition 1. All of
> those keys should
> then continue to be routed to partition 1 for the remainder of the app's
> lifetime, if you care
> about maintaining correct history/"state" for that key (I'll come back to
> this in the next
> paragraph). After the time T, new keys that weren't processed prior to T
> may be routed to
> either partition, provided they are similarly mapped to the same partition
> forever after. It's
> up to the user to enforce this, perhaps by trying to keep track of all keys
> but that is likely to
> be impractical. This feature is generally more targeted at cases where the
> partition mapping
> is "obvious" enough to compute without needing to maintain a history of all
> keys and their
> original partition: for example, imagine an application that processes user
> account information.
> You can scale out to a partition per user, and add a new partition each
> time someone opens
> a new account. When they open that account they get a userID number,
> starting with #0 and
> counting up from there. In that case, the partition for any records
> pertaining to a given account
> would just be its userID.
>
> I hope that clears up the kind of intended use case we're targeting with
> this feature. That said,
> another important and equally viable use case that I neglected to mention
> in the KIP is fully
> stateless applications. Technically this feature can produce correct
> results for applications that
> are at least one of (a) statically partitioned, or (b) completely
> stateless. However, the stateless
> case is a bit stickier since even if the Streams application itself doesn't
> care about maintaining
> the same mapping of key to partition, it could for example be feeding into
> a downstream
> application which *does* need to maintain state, and which would wind up
> "losing" the history for
> any keys that changed partition.
>
> I kind of felt like opening this feature up to stateless applications would
> be asking for trouble and
> make it too easy for people to shoot themselves in the foot. That said, I'm
> open to discussion on
> this point if you feel like the benefits here outweigh the risks. I'm also
> happy to consider modifying
> the API so that it could naturally be expanded to include stateless
> applications  in the future, even
> if we decide against allowing that use case in the first iteration of the
> feature.
>
> Thoughts?
>
> Sophie
>
> On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy  wrote:
>
> > Sophie,
> >
> > Thank you for the KIP! Choosing 

Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-19 Thread Colt McNealy
Sophie,

Thank you for the KIP! Choosing the number of partitions in a Streams app
is a tricky task because of how difficult it is to re-partition; I'm glad
you're working on an improvement. I've got two questions:

First, `StaticStreamsPartitioner` is an interface that we (Streams users)
must implement, I'm trying to understand how it would work. For example,
let's say there's some point in time 'T' before which we have 1 partition.
Then we decide to increase the partition count to 2 at time T. From my
understanding, all keys that had passed through the Streams app before time
T must end up on partition 1 if they appear again in the input topics; but
any new keys are allowed to be sent to partition 2. Is that correct? And
(pardon the naive question) how is this achieved without keeping track of
all keys that have been seen at any point?

Secondly, will this feature work with applications that use interactive
queries?

Thank you very much,
Colt McNealy
*Founder, LittleHorse.io*


On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman
 wrote:

> Hey all,
>
> I'd like to propose a new autoscaling feature for Kafka Streams
> applications which can follow the constraint of static partitioning. For
> further details please refer to the KIP document:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams
>
> This feature will be targeted for 3.4 but may not be fully implemented
> until the following release, 3.5.
>
> Please give this a read and let me know what you think!
>
> Cheers,
> Sophie
>


Re: Transactions, delivery timeout and changing transactional producer behavior

2022-09-11 Thread Colt McNealy
Hi all—

I'm not a committer so I can't review this PR (or is that not true?).
However, I'd like to bump this as well. I believe that I've encountered
this bug during chaos testing with the transactional producer. I can
sometimes produce this error when killing a broker during a long-running
transaction, which causes a batch to encounter delivery timeout as
described in the Jira. I have observed some inconsistencies with
the consumer offset being advanced prematurely (i.e. perhaps after the
delivery of the EndTxnRequest).

Daniel, thank you for the PR.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*

On Fri, Sep 9, 2022 at 9:54 AM Dániel Urbán  wrote:

> Hi all,
>
> I would like to bump this and bring some attention to the issue.
> This is a nasty bug in the transactional producer, would be nice if I could
> get some feedback on the PR: https://github.com/apache/kafka/pull/12392
>
> Thanks in advance,
> Daniel
>
> Viktor Somogyi-Vass  ezt írta
> (időpont: 2022. júl. 25., H, 15:28):
>
> > Hi Luke & Artem,
> >
> > We prepared the fix, would you please help in getting a
> committer-reviewer
> > to get this issue resolved?
> >
> > Thanks,
> > Viktor
> >
> > On Fri, Jul 8, 2022 at 12:57 PM Dániel Urbán 
> > wrote:
> >
> > > Submitted a PR with the fix:
> https://github.com/apache/kafka/pull/12392
> > > In the PR I tried keeping the producer in a usable state after the
> forced
> > > bump. I understand that it might be the cleanest solution, but the only
> > > other option I know of is to transition into a fatal state, meaning
> that
> > > the producer has to be recreated after a delivery timeout. I think that
> > is
> > > still fine compared to the out-of-order messages.
> > >
> > > Looking forward to your reviews,
> > > Daniel
> > >
> > > Dániel Urbán  ezt írta (időpont: 2022. júl. 7.,
> > Cs,
> > > 12:04):
> > >
> > > > Thanks for the feedback, I created
> > > > https://issues.apache.org/jira/browse/KAFKA-14053 and started
> working
> > on
> > > > a PR.
> > > >
> > > > Luke, for the workaround, we used the transaction admin tool released
> > in
> > > > 3.0 to "abort" these hanging batches manually.
> > > > Naturally, the cluster health should be stabilized. This issue popped
> > up
> > > > most frequently around times when some partitions went into a few
> > minute
> > > > window of unavailability. The infinite retries on the producer side
> > > caused
> > > > a situation where the last retry was still in-flight, but the
> delivery
> > > > timeout was triggered on the client side. We reduced the retries and
> > > > increased the delivery timeout to avoid such situations.
> > > > Still, the issue can occur in other scenarios, for example a client
> > > > queueing up many batches in the producer buffer, and causing those
> > > batches
> > > > to spend most of the delivery timeout window in the client memory.
> > > >
> > > > Thanks,
> > > > Daniel
> > > >
> > > > Luke Chen  ezt írta (időpont: 2022. júl. 7., Cs,
> > > 5:13):
> > > >
> > > >> Hi Daniel,
> > > >>
> > > >> Thanks for reporting the issue, and the investigation.
> > > >> I'm curious, so, what's your workaround for this issue?
> > > >>
> > > >> I agree with Artem, it makes sense. Please file a bug in JIRA.
> > > >> And looking forward to your PR! :)
> > > >>
> > > >> Thank you.
> > > >> Luke
> > > >>
> > > >> On Thu, Jul 7, 2022 at 3:07 AM Artem Livshits
> > > >>  wrote:
> > > >>
> > > >> > Hi Daniel,
> > > >> >
> > > >> > What you say makes sense.  Could you file a bug and put this info
> > > there
> > > >> so
> > > >> > that it's easier to track?
> > > >> >
> > > >> > -Artem
> > > >> >
> > > >> > On Wed, Jul 6, 2022 at 8:34 AM Dániel Urbán <
> urb.dani...@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > Hello everyone,
> > > >> > >
> > > >> > > I've been investigating some transaction related issues in a
> very
> > > >> > > problematic cluster. Besides finding some interesting issues, I
> &g

Re: [VOTE] KIP-844: Transactional State Stores

2022-09-01 Thread Colt McNealy
+1

Hi Alex,

Thank you for your work on the KIP. I'm not a committer so my vote is
non-binding but I strongly support this improvement.

Thank you,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Sep 1, 2022 at 8:20 AM Alexander Sorokoumov
 wrote:

> Hi All,
>
> I would like to start a voting thread on KIP-844, which introduces
> transactional state stores to avoid wiping local state on crash failure
> under EOS.
>
> KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> Discussion thread:
> https://lists.apache.org/thread/4vc18t0o2wsk0n235dd4pd1hlr1p6gm2
> Jira: https://issues.apache.org/jira/browse/KAFKA-12549
>
> Best,
> Alex
>