Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2024-04-17 Thread Nick Telford
Hi Walker, Feel free to ask away, either on the mailing list of the Confluent Community Slack, where I hang out :-) The implementation is *mostly* complete, although it needs some polishing. It's worth noting that KIP-1035 is a hard prerequisite for this. Regards, Nick

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-29 Thread Colt McNealy
Guozhang—I agree, I am in favor of moving forward with the KIP now that the Transactional State Stores will be behind a feature flag. Nick—I just did a bit more light testing of your branch `KIP-892-3.5.0` with your most recent changes. I couldn't detect a performance difference versus trunk (in

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-29 Thread Guozhang Wang
I'd agree with you guys that as long as we are in agreement about the configuration semantics, that would be a big win to move forward for this KIP. As for the TaskCorruptedException handling like wiping state stores, we can discuss that in the PR rather than in the KIP. Just to clarify, I'm

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-19 Thread Bruno Cadonna
Hi Nick, What you and Lucas wrote about the different configurations of ALOS/EOS and READ_COMMITTED/READ_UNCOMMITTED make sense to me. My earlier concerns about changelogs diverging from the content of the local state stores turned out to not apply. So I think, we can move on with those

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-19 Thread Lucas Brutschy
HI Nick, what I meant was, why don't you leave the behavior of Kafka Streams in this case as is (wipe the state, abort the transaction), since the contribution of the KIP is to allow transactional state stores, not to eliminate all cases of state wiping in Kafka Streams. But either way, that's

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-18 Thread Nick Telford
Hi Lucas, TaskCorruptedException is how Streams signals that the Task state needs to be wiped, so we can't retain that exception without also wiping state on timeouts. Regards, Nick On Wed, 18 Oct 2023 at 14:48, Lucas Brutschy wrote: > Hi Nick, > > I think indeed the better behavior would be

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-18 Thread Lucas Brutschy
Hi Nick, I think indeed the better behavior would be to retry commitTransaction until we risk running out of time to meet `max.poll.interval.ms`. However, if it's handled as a `TaskCorruptedException` at the moment, I would do the same in this KIP, and leave exception handling improvements to

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-17 Thread Nick Telford
Hi Lucas, Yeah, this is pretty much the direction I'm thinking of going in now. You make an interesting point about committing on-error under ALOS/READ_COMMITTED, although I haven't had a chance to think through the implications yet. Something that I ran into earlier this week is an issue with

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-16 Thread Lucas Brutschy
Hi all, I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED, but keep wiping the state on error, and I'd vote for this solution when introducing `default.state.isolation.level`. This way, we'd have the most low-risk roll-out of this feature (no behavior change without

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-16 Thread Nick Telford
Hi Guozhang, The KIP as it stands introduces a new configuration, default.state.isolation.level, which is independent of processing.mode. It's intended that this new configuration be used to configure a global IQ isolation level in the short term, with a future KIP introducing the capability to

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Guozhang Wang
Hello Nick, First of all, thanks a lot for the great effort you've put in driving this KIP! I really like it coming through finally, as many people in the community have raised this. At the same time I honestly feel a bit ashamed for not putting enough of my time supporting it and pushing it

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Nick Telford
Hi Bruno, 4. I'll hold off on making that change until we have a consensus as to what configuration to use to control all of this, as it'll be affected by the decision on EOS isolation levels. 5. Done. I've chosen "committedOffsets". Regards, Nick On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Bruno Cadonna
Hi Nick, 1. Yeah, you are probably right that it does not make too much sense. Thanks for the clarification! 4. Yes, sorry for the back and forth, but I think for the sake of the KIP it is better to let the ALOS behavior as it is for now due to the possible issues you would run into. Maybe

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Nick Telford
Hi Bruno, Thanks for getting back to me. 1. I think this should be possible. Are you thinking of the situation where a user may downgrade to a previous version of Kafka Streams? In that case, sadly, the RocksDBStore would get wiped by the older version of Kafka Streams anyway, because that

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Bruno Cadonna
Hi Nick, I think the KIP is converging! 1. I am wondering whether it makes sense to write the position file during close as we do for the checkpoint file, so that in case the state store is replaced with a non-transactional state store the non-transactional state store finds the position

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Nick Telford
Hi Sophie, Thanks for taking the time to review the KIP and catch up. > my singular goal in responding is to help this KIP past a perceived impasse so we can finally move on to voting and implementing it Just so we're clear, is the impasse you're referring to this limitation in the current

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-12 Thread Sophie Blee-Goldman
Hey Nick! First of all thanks for taking up this awesome feature, I'm sure every single Kafka Streams user and dev would agree that it is sorely needed. I've just been catching up on the KIP and surrounding discussion, so please forgive me for any misunderstandings or misinterpretations of the

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-19 Thread Nick Telford
Hi Bruno, Agreed, I can live with that for now. In an effort to keep the scope of this KIP from expanding, I'm leaning towards just providing a configurable default.state.isolation.level and removing IsolationLevel from the StateStoreContext. This would be compatible with adding support for

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-19 Thread Bruno Cadonna
Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I think it is perfectly valid to say InMemoryKeyValueStore do not support READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto default at the moment. Best, Bruno On 9/18/23 7:12 PM, Nick Telford wrote: Oh! One other

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-18 Thread Nick Telford
Oh! One other concern I haven't mentioned: if we make IsolationLevel a query-time constraint, then we need to add support for READ_COMMITTED to InMemoryKeyValueStore too, which will require some changes to the implementation. On Mon, 18 Sept 2023 at 17:24, Nick Telford wrote: > Hi everyone, > >

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-18 Thread Nick Telford
Hi everyone, I agree that having IsolationLevel be determined at query-time is the ideal design, but there are a few sticking points: 1. There needs to be some way to communicate the IsolationLevel down to the RocksDBStore itself, so that the query can respect it. Since stores are "layered" in

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-18 Thread Lucas Brutschy
Hi Nick, since I last read it in April, the KIP has become much cleaner and easier to read. Great work! It feels to me the last big open point is whether we can implement isolation level as a query parameter. I understand that there are implementation concerns, but as Colt says, it would be a

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-17 Thread Colt McNealy
> Making IsolationLevel a query-time constraint, rather than linking it to the processing.guarantee. As I understand it, would this allow even a user of EOS to control whether reading committed or uncommitted records? If so, I am highly in favor of this. I know that I was one of the early people

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-13 Thread Nick Telford
Addendum: I think we would also face the same problem with the approach John outlined earlier (using the record cache as a transaction buffer and flushing it straight to SST files). This is because the record cache (the ThreadCache class) is not thread-safe, so every commit would invalidate open

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-13 Thread Nick Telford
Hi Bruno, I've updated the KIP based on our conversation. The only things I've not yet done are: 1. Using transactions under ALOS and EOS. 2. Making IsolationLevel a query-time constraint, rather than linking it to the processing.guarantee. There's a wrinkle that makes this a challenge:

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-13 Thread Bruno Cadonna
Hi Nick, 6. Of course, you are right! My bad! Wiping out the state in the downgrading case is fine. 3a. Focus on the public facing changes for the KIP. We will manage to get the internals right. Regarding state stores that do not support READ_COMMITTED, they should throw an error stating

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-13 Thread Nick Telford
Bruno, Thinking about 3a. in addition to adding the IsolationLevel to QueryStoreParameters and Query, what about also adding a method like "ReadOnlyKeyValueStore view(IsolationLevel isolationLevel)" to ReadOnlyKeyValueStore? This would enable us to easily select/switch between IsolationLevels,

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-13 Thread Nick Telford
Hi Bruno, Thanks for getting back to me! 2. The fact that implementations can always track estimated memory usage in the wrapper is a good point. I can remove -1 as an option, and I'll clarify the JavaDoc that 0 is not just for non-transactional stores, which is currently misleading. 6. The

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-13 Thread Bruno Cadonna
Hi Nick, Thanks for the updates and sorry for the delay on my side! 1. Making the default implementation for flush() a no-op sounds good to me. 2. I think what was bugging me here is that a third-party state store needs to implement the state store interface. That means they need to

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-11 Thread Colt McNealy
Nick, Thanks for the response. >Can you clarify how much state was restored in those 11 seconds? That was a full restoration of ~650MB of state after I wiped the state directory. The restoration after a crash with your branch is nearly instantaneous, whereas with plain Kafka 3.5.0 a crash

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-11 Thread Nick Telford
Hi Colt, Thanks for taking the time to run your benchmarks on this, that's incredibly helpful. > With KIP 892, I verified that unclean shutdown does not cause a fresh > restore (). I got the following benchmark results: > - Benchmark took 216 seconds > - 1,401 tasks per second on one

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-09-10 Thread Colt McNealy
Howdy folks, First I wanted to say fantastic work and thank you to Nick. I built your branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0` branch, and your `kip-892-3.5.0` branch built with Speedb OSS

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-08-24 Thread Nick Telford
Hi Bruno, Thanks for taking the time to review the KIP. I'm back from leave now and intend to move this forwards as quickly as I can. Addressing your points: 1. Because flush() is part of the StateStore API, it's exposed to custom Processors, which might be making calls to flush(). This was

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-08-14 Thread Bruno Cadonna
Hi Nick! Thanks for the updates! 1. Why does StateStore#flush() default to StateStore#commit(Collections.emptyMap())? Since calls to flush() will not exist anymore after this KIP is released, I would rather throw an unsupported operation exception by default. 2. When would a state store

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-07-21 Thread Nick Telford
One more thing: I noted John's suggestion in the KIP, under "Rejected Alternatives". I still think it's an idea worth pursuing, but I believe that it's out of the scope of this KIP, because it solves a different set of problems to this KIP, and the scope of this one has already grown quite large!

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-07-21 Thread Nick Telford
Hi everyone, I've updated the KIP ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores) with the latest changes; mostly bringing back "Atomic Checkpointing" (for what feels like the 10th time!). I think the one thing missing is some changes to

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-07-03 Thread Nick Telford
Hi Bruno Yes, that's correct, although the impact on IQ is not something I had considered. What about atomically updating the state store from the transaction > buffer every commit interval and writing the checkpoint (thus, flushing > the memtable) every configured amount of data and/or number

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-07-03 Thread Bruno Cadonna
Hi Nick, Thanks for the insights! Very interesting! As far as I understand, you want to atomically update the state store from the transaction buffer, flush the memtable of a state store and write the checkpoint not after the commit time elapsed but after the transaction buffer reached a

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-07-01 Thread Nick Telford
Hi everyone, I've begun performance testing my branch on our staging environment, putting it through its paces in our non-trivial application. I'm already observing the same increased flush rate that we saw the last time we attempted to use a version of this KIP, but this time, I think I know

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-22 Thread Nick Telford
Hi Bruno! 3. By "less predictable for users", I meant in terms of understanding the performance profile under various circumstances. The more complex the solution, the more difficult it would be for users to understand the performance they see. For example, spilling records to disk when the

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-22 Thread Bruno Cadonna
Hi Nick, 1. Yeah, I agree with you. That was actually also my point. I understood that John was proposing the ingestion path as a way to avoid the early commits. Probably, I misinterpreted the intent. 2. I agree with John here, that actually it is public API. My question is how this usage

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-21 Thread John Roesler
No worries, I should have included a ";)" to let you know it was mostly tongue-in-cheek. Thanks, -John On 6/21/23 12:34, Nick Telford wrote: Sorry John, I didn't mean to mis-characterize it like that. I was mostly referring to disabling memtables. AFAIK the SstFileWriter API is primarily

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-21 Thread Nick Telford
Sorry John, I didn't mean to mis-characterize it like that. I was mostly referring to disabling memtables. AFAIK the SstFileWriter API is primarily designed for bulk ingest, e.g. for bootstrapping a database from a backup, rather than during normal operation of an online database. That said, I was

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-21 Thread John Roesler
Thanks Nick, That sounds good to me. I can't let (2) slide, though.. Writing and ingesting SST files is not a RocksDB internal, but rather a supported usage pattern on public APIs. Regardless, I think your overall preference is fine with me, especially if we can internalize this change

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-21 Thread Nick Telford
Hi Bruno, 1. Isn't this exactly the same issue that WriteBatchWithIndex transactions have, whereby exceeding (or likely to exceed) configured memory needs to trigger an early commit? 2. This is one of my big concerns. Ultimately, any approach based on cracking open RocksDB internals and using it

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-21 Thread Bruno Cadonna
Hi John, Hi Nick, Thanks for the interesting ideas! Here my comments. 1. It is not clear to me what happens if the cache exceeds its configured size between two commits? Currently, the cache evicts its entries and writes dirty entries to the state store. Should the cache write those dirty

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Here's what I'm thinking: based on Bruno's earlier feedback, I'm going to try to simplify my original design down such that it needs no/minimal changes to the public interface. If that succeeds, then it should also be possible to transparently implement the "no memtables" solution as a

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread John Roesler
Oh, that's a good point. On the topic of a behavioral switch for disabled caches, the typical use case for disabling the cache is to cause each individual update to propagate down the topology, so another thought might be to just go ahead and add the memory we would have used for the

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Potentially we could just go the memorable with Rocks WriteBatches route if the cache is disabled? On Tue, 20 Jun 2023, 22:00 John Roesler, wrote: > Touché! > > Ok, I agree that figuring out the case of a disabled cache would be > non-trivial. Ingesting single-record SST files will probably not

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread John Roesler
Touché! Ok, I agree that figuring out the case of a disabled cache would be non-trivial. Ingesting single-record SST files will probably not be performant, but benchmarking may prove different. Or maybe we can have some reserved cache space on top of the user-configured cache, which we would

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
> Note that users can disable the cache, which would still be ok, I think. We wouldn't ingest the SST files on every record, but just append to them and only ingest them on commit, when we're already waiting for acks and a RocksDB commit. In this case, how would uncommitted records be read by

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread John Roesler
Ah, sorry Nick, I just meant the regular heap based cache that we maintain in Streams. I see that it's not called "RecordCache" (my mistake). The actual cache is ThreadCache:

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Hi John, I think you're referring to the "record cache" that's provided by the ThreadCache class? 1-3. I was hoping to (eventually) remove the "flush-on-commit" behaviour from RocksDbStore, so that RocksDB can choose when to flush memtables, enabling users to tailor RocksDB performance to their

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Hi John, By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any class called "RecordCache"... Cheers, Nick On Tue, 20 Jun 2023 at 19:42, John Roesler wrote: > Hi Nick, > > Thanks for picking this up again! > > I did have one new thought over the intervening months, which

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread John Roesler
Hi Nick, Thanks for picking this up again! I did have one new thought over the intervening months, which I'd like your take on. What if, instead of using the RocksDB atomic write primitive at all, we instead just: 1. disable memtables entirely 2. directly write the RecordCache into SST

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Nick Telford
Hi Bruno, Thanks for reviewing the KIP. It's been a long road, I started working on this more than a year ago, and most of the time in the last 6 months has been spent on the "Atomic Checkpointing" stuff that's been benched, so some of the reasoning behind some of my decisions have been lost, but

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-20 Thread Bruno Cadonna
Hi Nick, Thanks for the updates! I really appreciate that you simplified the KIP by removing some aspects. As I have already told you, I think the removed aspects are also good ideas and we can discuss them on follow-up KIPs. Regarding the current KIP, I have the following feedback. 1. Is

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-05-15 Thread Nick Telford
Hi everyone, Quick update: I've added a new section to the KIP: "Offsets for Consumer Rebalances", that outlines my solution to the problem that StreamsPartitionAssignor needs to read StateStore offsets even if they're not currently open. Regards, Nick On Wed, 3 May 2023 at 11:34, Nick Telford

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-05-03 Thread Nick Telford
Hi Bruno, Thanks for reviewing my proposal. 1. The main reason I added it was because it was easy to do. If we see no value in it, I can remove it. 2. Global StateStores can have multiple partitions in their input topics (which function as their changelogs), so they would have more than one

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-05-02 Thread Bruno Cadonna
Hi Nick, Thanks for the updates! I have a couple of questions/comments. 1. Why do you propose a configuration that involves max. bytes and max. reords? I think we are mainly concerned about memory consumption because we want to limit the off-heap memory used. I cannot think of a case where

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-30 Thread Colt McNealy
Nick, That is a good point, the Checkpoint Files have been a part of the State Store implementation for a long time and there might be some risk involved in removing it. And yes, if you go with the idea of moving offset management out of the state store itself, then you'll need to flush RocksDB

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-27 Thread Nick Telford
Hi everyone, I find myself (again) considering removing the offset management from StateStores, and keeping the old checkpoint file system. The reason is that the StreamPartitionAssignor directly reads checkpoint files in order to determine which instance has the most up-to-date copy of the local

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-19 Thread Nick Telford
Hi Colt, The issue is that if there's a crash between 2 and 3, then you still end up with inconsistent data in RocksDB. The only way to guarantee that your checkpoint offsets and locally stored data are consistent with each other are to atomically commit them, which can be achieved by having the

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-19 Thread Colt McNealy
Nick, Thanks for your reply. Ack to A) and B). For item C), I see what you're referring to. Your proposed solution will work, so no need to change it. What I was suggesting was that it might be possible to achieve this with only one column family. So long as: - No uncommitted records (i.e.

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-19 Thread Nick Telford
Hi Colt, A. I've done my best to de-couple the StateStore stuff from the rest of the Streams engine. The fact that there will be only one ongoing (write) transaction at a time is not guaranteed by any API, and is just a consequence of the way Streams operates. To that end, I tried to ensure the

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-18 Thread Colt McNealy
Nick, Thank you for continuing this work. I have a few minor clarifying questions. A) "Records written to any transaction are visible to all other transactions immediately." I am confused here—I thought there could only be one transaction going on at a time for a given state store given the

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-04-18 Thread Nick Telford
Hi everyone, I've updated the KIP to reflect the latest version of the design: https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores There are several changes in there that reflect feedback from this thread, and there's a new section and a bunch of

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-01-03 Thread Nick Telford
Hi Lucas, Thanks for looking over my KIP. A) The bound is per-instance, not per-Task. This was a typo in the KIP that I've now corrected. It was originally per-Task, but I changed it to per-instance for exactly the reason you highlighted. B) It's worth noting that transactionality is only

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-01-02 Thread Lucas Brutschy
Hi Nick, I'm just starting to read up on the whole discussion about KIP-892 and KIP-844. Thanks a lot for your work on this, I do think `WriteBatchWithIndex` may be the way to go here. I do have some questions about the latest draft. A) If I understand correctly, you propose to put a bound on

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-12-22 Thread Nick Telford
Hi everyone, I've updated the KIP with a more detailed design, which reflects the implementation I've been working on: https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores This new design should address the outstanding points already made in the

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-12-06 Thread Colt McNealy
Nick, Thank you for the reply; that makes sense. I was hoping that, since reading uncommitted records from IQ in EOS isn't part of the documented API, maybe you *wouldn't* have to wait for the next major release to make that change; but given that it would be considered a major change, I like

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-12-06 Thread Nick Telford
Hi Colt, 10: Yes, I agree it's not ideal. I originally intended to try to keep the behaviour unchanged as much as possible, otherwise we'd have to wait for a major version release to land these changes. 20: Good point, ALOS doesn't need the same level of guarantee, and the typically longer commit

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-30 Thread Colt McNealy
Nick, Thank you for the explanation, and also for the updated KIP. I am quite eager for this improvement to be released as it would greatly reduce the operational difficulties of EOS streams apps. Two questions: 10) >When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-30 Thread Nick Telford
Hi everyone, I've drastically reduced the scope of this KIP to no longer include the StateStore management of checkpointing. This can be added as a KIP later on to further optimize the consistency and performance of state stores. I've also added a section discussing some of the concerns around

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-28 Thread Nick Telford
Hi Colt, I didn't do any profiling, but the 844 implementation: - Writes uncommitted records to a temporary RocksDB instance - Since tombstones need to be flagged, all record values are prefixed with a value/tombstone marker. This necessitates a memory copy. - On-commit,

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-28 Thread Colt McNealy
Hi all, Out of curiosity, why does the performance of the store degrade so significantly with the 844 implementation? I wouldn't be too surprised by a 50-60% drop (caused by each record being written twice), but 96% is extreme. The only thing I can think of which could create such a bottleneck

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-28 Thread Nick Telford
For now, I've settled on choosing an arbitrary default memory limit of 64MiB per-Task for buffering uncommitted records. I noticed that Kafka Streams already provides some arbitrary default configuration of RocksDB memory settings (i.e. memtable size etc.), and that many users will already be

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-28 Thread Nick Telford
Hi everyone, I've updated the KIP to resolve all the points that have been raised so far, with one exception: the ALOS default commit interval of 5 minutes is likely to cause WriteBatchWithIndex memory to grow too large. There's a couple of different things I can think of to solve this: - We

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-23 Thread Nick Telford
Hi Alex, Thanks for the feedback. I've updated the discussion of OOM issues by describing how we'll handle it. Here's the new text: To mitigate this, we will automatically force a Task commit if the total > uncommitted records returned by > StateStore#approximateNumUncommittedEntries() exceeds

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-23 Thread Alexander Sorokoumov
Hey Nick, Thank you for the KIP! With such a significant performance degradation in the secondary store approach, we should definitely consider WriteBatchWithIndex. I also like encapsulating checkpointing inside the default state store implementation to improve performance. +1 to John's comment

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-22 Thread Nick Telford
Hi John, Thanks for the review and feedback! 1. Custom Stores: I've been mulling over this problem myself. As it stands, custom stores would essentially lose checkpointing with no indication that they're expected to make changes, besides a line in the release notes. I agree that the best

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-22 Thread John Roesler
Thanks for publishing this alternative, Nick! The benchmark you mentioned in the KIP-844 discussion seems like a compelling reason to revisit the built-in transactionality mechanism. I also appreciate you analysis, showing that for most use cases, the write batch approach should be just fine.

[DISCUSS] KIP-892: Transactional Semantics for StateStores

2022-11-21 Thread Nick Telford
Hi everyone, As I mentioned in the discussion thread for KIP-844, I've been working on an alternative approach to achieving better transactional semantics for Kafka Streams StateStores. I've published this separately as KIP-892: Transactional Semantics for StateStores