Hi Walker,
Feel free to ask away, either on the mailing list of the Confluent
Community Slack, where I hang out :-)
The implementation is *mostly* complete, although it needs some polishing.
It's worth noting that KIP-1035 is a hard prerequisite for this.
Regards,
Nick
Guozhang—I agree, I am in favor of moving forward with the KIP now that the
Transactional State Stores will be behind a feature flag.
Nick—I just did a bit more light testing of your branch `KIP-892-3.5.0`
with your most recent changes. I couldn't detect a performance difference
versus trunk (in
I'd agree with you guys that as long as we are in agreement about the
configuration semantics, that would be a big win to move forward for
this KIP. As for the TaskCorruptedException handling like wiping state
stores, we can discuss that in the PR rather than in the KIP.
Just to clarify, I'm
Hi Nick,
What you and Lucas wrote about the different configurations of ALOS/EOS
and READ_COMMITTED/READ_UNCOMMITTED make sense to me. My earlier
concerns about changelogs diverging from the content of the local state
stores turned out to not apply. So I think, we can move on with those
HI Nick,
what I meant was, why don't you leave the behavior of Kafka Streams in
this case as is (wipe the state, abort the transaction), since the
contribution of the KIP is to allow transactional state stores, not to
eliminate all cases of state wiping in Kafka Streams. But either way,
that's
Hi Lucas,
TaskCorruptedException is how Streams signals that the Task state needs to
be wiped, so we can't retain that exception without also wiping state on
timeouts.
Regards,
Nick
On Wed, 18 Oct 2023 at 14:48, Lucas Brutschy
wrote:
> Hi Nick,
>
> I think indeed the better behavior would be
Hi Nick,
I think indeed the better behavior would be to retry commitTransaction
until we risk running out of time to meet `max.poll.interval.ms`.
However, if it's handled as a `TaskCorruptedException` at the moment,
I would do the same in this KIP, and leave exception handling
improvements to
Hi Lucas,
Yeah, this is pretty much the direction I'm thinking of going in now. You
make an interesting point about committing on-error under
ALOS/READ_COMMITTED, although I haven't had a chance to think through the
implications yet.
Something that I ran into earlier this week is an issue with
Hi all,
I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED,
but keep wiping the state on error, and I'd vote for this solution
when introducing `default.state.isolation.level`. This way, we'd have
the most low-risk roll-out of this feature (no behavior change without
Hi Guozhang,
The KIP as it stands introduces a new configuration,
default.state.isolation.level, which is independent of processing.mode.
It's intended that this new configuration be used to configure a global IQ
isolation level in the short term, with a future KIP introducing the
capability to
Hello Nick,
First of all, thanks a lot for the great effort you've put in driving
this KIP! I really like it coming through finally, as many people in
the community have raised this. At the same time I honestly feel a bit
ashamed for not putting enough of my time supporting it and pushing it
Hi Bruno,
4.
I'll hold off on making that change until we have a consensus as to what
configuration to use to control all of this, as it'll be affected by the
decision on EOS isolation levels.
5.
Done. I've chosen "committedOffsets".
Regards,
Nick
On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna
Hi Nick,
1.
Yeah, you are probably right that it does not make too much sense.
Thanks for the clarification!
4.
Yes, sorry for the back and forth, but I think for the sake of the KIP
it is better to let the ALOS behavior as it is for now due to the
possible issues you would run into. Maybe
Hi Bruno,
Thanks for getting back to me.
1.
I think this should be possible. Are you thinking of the situation where a
user may downgrade to a previous version of Kafka Streams? In that case,
sadly, the RocksDBStore would get wiped by the older version of Kafka
Streams anyway, because that
Hi Nick,
I think the KIP is converging!
1.
I am wondering whether it makes sense to write the position file during
close as we do for the checkpoint file, so that in case the state store
is replaced with a non-transactional state store the non-transactional
state store finds the position
Hi Sophie,
Thanks for taking the time to review the KIP and catch up.
> my singular goal in responding is to help this KIP past a perceived
impasse so we can finally move on to voting and implementing it
Just so we're clear, is the impasse you're referring to this limitation in
the current
Hey Nick! First of all thanks for taking up this awesome feature, I'm sure
every single
Kafka Streams user and dev would agree that it is sorely needed.
I've just been catching up on the KIP and surrounding discussion, so please
forgive me
for any misunderstandings or misinterpretations of the
Hi Bruno,
Agreed, I can live with that for now.
In an effort to keep the scope of this KIP from expanding, I'm leaning
towards just providing a configurable default.state.isolation.level and
removing IsolationLevel from the StateStoreContext. This would be
compatible with adding support for
Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I think
it is perfectly valid to say InMemoryKeyValueStore do not support
READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto default
at the moment.
Best,
Bruno
On 9/18/23 7:12 PM, Nick Telford wrote:
Oh! One other
Oh! One other concern I haven't mentioned: if we make IsolationLevel a
query-time constraint, then we need to add support for READ_COMMITTED to
InMemoryKeyValueStore too, which will require some changes to the
implementation.
On Mon, 18 Sept 2023 at 17:24, Nick Telford wrote:
> Hi everyone,
>
>
Hi everyone,
I agree that having IsolationLevel be determined at query-time is the ideal
design, but there are a few sticking points:
1.
There needs to be some way to communicate the IsolationLevel down to the
RocksDBStore itself, so that the query can respect it. Since stores are
"layered" in
Hi Nick,
since I last read it in April, the KIP has become much cleaner and
easier to read. Great work!
It feels to me the last big open point is whether we can implement
isolation level as a query parameter. I understand that there are
implementation concerns, but as Colt says, it would be a
> Making IsolationLevel a query-time constraint, rather than linking it to
the processing.guarantee.
As I understand it, would this allow even a user of EOS to control whether
reading committed or uncommitted records? If so, I am highly in favor of
this.
I know that I was one of the early people
Addendum:
I think we would also face the same problem with the approach John outlined
earlier (using the record cache as a transaction buffer and flushing it
straight to SST files). This is because the record cache (the ThreadCache
class) is not thread-safe, so every commit would invalidate open
Hi Bruno,
I've updated the KIP based on our conversation. The only things I've not
yet done are:
1. Using transactions under ALOS and EOS.
2. Making IsolationLevel a query-time constraint, rather than linking it to
the processing.guarantee.
There's a wrinkle that makes this a challenge:
Hi Nick,
6.
Of course, you are right! My bad!
Wiping out the state in the downgrading case is fine.
3a.
Focus on the public facing changes for the KIP. We will manage to get
the internals right. Regarding state stores that do not support
READ_COMMITTED, they should throw an error stating
Bruno,
Thinking about 3a. in addition to adding the IsolationLevel to
QueryStoreParameters and Query, what about also adding a method like
"ReadOnlyKeyValueStore view(IsolationLevel isolationLevel)" to ReadOnlyKeyValueStore?
This would enable us to easily select/switch between IsolationLevels,
Hi Bruno,
Thanks for getting back to me!
2.
The fact that implementations can always track estimated memory usage in
the wrapper is a good point. I can remove -1 as an option, and I'll clarify
the JavaDoc that 0 is not just for non-transactional stores, which is
currently misleading.
6.
The
Hi Nick,
Thanks for the updates and sorry for the delay on my side!
1.
Making the default implementation for flush() a no-op sounds good to me.
2.
I think what was bugging me here is that a third-party state store needs
to implement the state store interface. That means they need to
Nick,
Thanks for the response.
>Can you clarify how much state was restored in those 11 seconds?
That was a full restoration of ~650MB of state after I wiped the state
directory. The restoration after a crash with your branch is nearly
instantaneous, whereas with plain Kafka 3.5.0 a crash
Hi Colt,
Thanks for taking the time to run your benchmarks on this, that's
incredibly helpful.
> With KIP 892, I verified that unclean shutdown does not cause a fresh
> restore (). I got the following benchmark results:
> - Benchmark took 216 seconds
> - 1,401 tasks per second on one
Howdy folks,
First I wanted to say fantastic work and thank you to Nick. I built your
branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did
some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0`
branch, and your `kip-892-3.5.0` branch built with Speedb OSS
Hi Bruno,
Thanks for taking the time to review the KIP. I'm back from leave now and
intend to move this forwards as quickly as I can.
Addressing your points:
1.
Because flush() is part of the StateStore API, it's exposed to custom
Processors, which might be making calls to flush(). This was
Hi Nick!
Thanks for the updates!
1.
Why does StateStore#flush() default to
StateStore#commit(Collections.emptyMap())?
Since calls to flush() will not exist anymore after this KIP is
released, I would rather throw an unsupported operation exception by
default.
2.
When would a state store
One more thing: I noted John's suggestion in the KIP, under "Rejected
Alternatives". I still think it's an idea worth pursuing, but I believe
that it's out of the scope of this KIP, because it solves a different set
of problems to this KIP, and the scope of this one has already grown quite
large!
Hi everyone,
I've updated the KIP (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)
with the latest changes; mostly bringing back "Atomic Checkpointing" (for
what feels like the 10th time!). I think the one thing missing is some
changes to
Hi Bruno
Yes, that's correct, although the impact on IQ is not something I had
considered.
What about atomically updating the state store from the transaction
> buffer every commit interval and writing the checkpoint (thus, flushing
> the memtable) every configured amount of data and/or number
Hi Nick,
Thanks for the insights! Very interesting!
As far as I understand, you want to atomically update the state store
from the transaction buffer, flush the memtable of a state store and
write the checkpoint not after the commit time elapsed but after the
transaction buffer reached a
Hi everyone,
I've begun performance testing my branch on our staging environment,
putting it through its paces in our non-trivial application. I'm already
observing the same increased flush rate that we saw the last time we
attempted to use a version of this KIP, but this time, I think I know
Hi Bruno!
3.
By "less predictable for users", I meant in terms of understanding the
performance profile under various circumstances. The more complex the
solution, the more difficult it would be for users to understand the
performance they see. For example, spilling records to disk when the
Hi Nick,
1.
Yeah, I agree with you. That was actually also my point. I understood
that John was proposing the ingestion path as a way to avoid the early
commits. Probably, I misinterpreted the intent.
2.
I agree with John here, that actually it is public API. My question is
how this usage
No worries, I should have included a ";)" to let you know it was mostly
tongue-in-cheek.
Thanks,
-John
On 6/21/23 12:34, Nick Telford wrote:
Sorry John, I didn't mean to mis-characterize it like that. I was mostly
referring to disabling memtables. AFAIK the SstFileWriter API is primarily
Sorry John, I didn't mean to mis-characterize it like that. I was mostly
referring to disabling memtables. AFAIK the SstFileWriter API is primarily
designed for bulk ingest, e.g. for bootstrapping a database from a backup,
rather than during normal operation of an online database. That said, I was
Thanks Nick,
That sounds good to me.
I can't let (2) slide, though.. Writing and ingesting SST files is not a
RocksDB internal, but rather a supported usage pattern on public APIs.
Regardless, I think your overall preference is fine with me, especially
if we can internalize this change
Hi Bruno,
1.
Isn't this exactly the same issue that WriteBatchWithIndex transactions
have, whereby exceeding (or likely to exceed) configured memory needs to
trigger an early commit?
2.
This is one of my big concerns. Ultimately, any approach based on cracking
open RocksDB internals and using it
Hi John,
Hi Nick,
Thanks for the interesting ideas!
Here my comments.
1.
It is not clear to me what happens if the cache exceeds its configured
size between two commits? Currently, the cache evicts its entries and
writes dirty entries to the state store. Should the cache write those
dirty
Here's what I'm thinking: based on Bruno's earlier feedback, I'm going to
try to simplify my original design down such that it needs no/minimal
changes to the public interface.
If that succeeds, then it should also be possible to transparently
implement the "no memtables" solution as a
Oh, that's a good point.
On the topic of a behavioral switch for disabled caches, the typical use
case for disabling the cache is to cause each individual update to
propagate down the topology, so another thought might be to just go
ahead and add the memory we would have used for the
Potentially we could just go the memorable with Rocks WriteBatches route if
the cache is disabled?
On Tue, 20 Jun 2023, 22:00 John Roesler, wrote:
> Touché!
>
> Ok, I agree that figuring out the case of a disabled cache would be
> non-trivial. Ingesting single-record SST files will probably not
Touché!
Ok, I agree that figuring out the case of a disabled cache would be
non-trivial. Ingesting single-record SST files will probably not be
performant, but benchmarking may prove different. Or maybe we can have
some reserved cache space on top of the user-configured cache, which we
would
> Note that users can disable the cache, which would still be
ok, I think. We wouldn't ingest the SST files on every record, but just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.
In this case, how would uncommitted records be read by
Ah, sorry Nick,
I just meant the regular heap based cache that we maintain in Streams. I
see that it's not called "RecordCache" (my mistake).
The actual cache is ThreadCache:
Hi John,
I think you're referring to the "record cache" that's provided by the
ThreadCache class?
1-3.
I was hoping to (eventually) remove the "flush-on-commit" behaviour from
RocksDbStore, so that RocksDB can choose when to flush memtables, enabling
users to tailor RocksDB performance to their
Hi John,
By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any
class called "RecordCache"...
Cheers,
Nick
On Tue, 20 Jun 2023 at 19:42, John Roesler wrote:
> Hi Nick,
>
> Thanks for picking this up again!
>
> I did have one new thought over the intervening months, which
Hi Nick,
Thanks for picking this up again!
I did have one new thought over the intervening months, which I'd like
your take on.
What if, instead of using the RocksDB atomic write primitive at all, we
instead just:
1. disable memtables entirely
2. directly write the RecordCache into SST
Hi Bruno,
Thanks for reviewing the KIP. It's been a long road, I started working on
this more than a year ago, and most of the time in the last 6 months has
been spent on the "Atomic Checkpointing" stuff that's been benched, so some
of the reasoning behind some of my decisions have been lost, but
Hi Nick,
Thanks for the updates!
I really appreciate that you simplified the KIP by removing some
aspects. As I have already told you, I think the removed aspects are
also good ideas and we can discuss them on follow-up KIPs.
Regarding the current KIP, I have the following feedback.
1.
Is
Hi everyone,
Quick update: I've added a new section to the KIP: "Offsets for Consumer
Rebalances", that outlines my solution to the problem that
StreamsPartitionAssignor needs to read StateStore offsets even if they're
not currently open.
Regards,
Nick
On Wed, 3 May 2023 at 11:34, Nick Telford
Hi Bruno,
Thanks for reviewing my proposal.
1.
The main reason I added it was because it was easy to do. If we see no
value in it, I can remove it.
2.
Global StateStores can have multiple partitions in their input topics
(which function as their changelogs), so they would have more than one
Hi Nick,
Thanks for the updates!
I have a couple of questions/comments.
1.
Why do you propose a configuration that involves max. bytes and max.
reords? I think we are mainly concerned about memory consumption because
we want to limit the off-heap memory used. I cannot think of a case
where
Nick,
That is a good point, the Checkpoint Files have been a part of the State
Store implementation for a long time and there might be some risk involved
in removing it. And yes, if you go with the idea of moving offset
management out of the state store itself, then you'll need to flush RocksDB
Hi everyone,
I find myself (again) considering removing the offset management from
StateStores, and keeping the old checkpoint file system. The reason is that
the StreamPartitionAssignor directly reads checkpoint files in order to
determine which instance has the most up-to-date copy of the local
Hi Colt,
The issue is that if there's a crash between 2 and 3, then you still end up
with inconsistent data in RocksDB. The only way to guarantee that your
checkpoint offsets and locally stored data are consistent with each other
are to atomically commit them, which can be achieved by having the
Nick,
Thanks for your reply. Ack to A) and B).
For item C), I see what you're referring to. Your proposed solution will
work, so no need to change it. What I was suggesting was that it might be
possible to achieve this with only one column family. So long as:
- No uncommitted records (i.e.
Hi Colt,
A. I've done my best to de-couple the StateStore stuff from the rest of the
Streams engine. The fact that there will be only one ongoing (write)
transaction at a time is not guaranteed by any API, and is just a
consequence of the way Streams operates. To that end, I tried to ensure the
Nick,
Thank you for continuing this work. I have a few minor clarifying questions.
A) "Records written to any transaction are visible to all other
transactions immediately." I am confused here—I thought there could only be
one transaction going on at a time for a given state store given the
Hi everyone,
I've updated the KIP to reflect the latest version of the design:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
There are several changes in there that reflect feedback from this thread,
and there's a new section and a bunch of
Hi Lucas,
Thanks for looking over my KIP.
A) The bound is per-instance, not per-Task. This was a typo in the KIP that
I've now corrected. It was originally per-Task, but I changed it to
per-instance for exactly the reason you highlighted.
B) It's worth noting that transactionality is only
Hi Nick,
I'm just starting to read up on the whole discussion about KIP-892 and
KIP-844. Thanks a lot for your work on this, I do think
`WriteBatchWithIndex` may be the way to go here. I do have some
questions about the latest draft.
A) If I understand correctly, you propose to put a bound on
Hi everyone,
I've updated the KIP with a more detailed design, which reflects the
implementation I've been working on:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
This new design should address the outstanding points already made in the
Nick,
Thank you for the reply; that makes sense. I was hoping that, since reading
uncommitted records from IQ in EOS isn't part of the documented API, maybe
you *wouldn't* have to wait for the next major release to make that change;
but given that it would be considered a major change, I like
Hi Colt,
10: Yes, I agree it's not ideal. I originally intended to try to keep the
behaviour unchanged as much as possible, otherwise we'd have to wait for a
major version release to land these changes.
20: Good point, ALOS doesn't need the same level of guarantee, and the
typically longer commit
Nick,
Thank you for the explanation, and also for the updated KIP. I am quite
eager for this improvement to be released as it would greatly reduce the
operational difficulties of EOS streams apps.
Two questions:
10)
>When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB
Hi everyone,
I've drastically reduced the scope of this KIP to no longer include the
StateStore management of checkpointing. This can be added as a KIP later on
to further optimize the consistency and performance of state stores.
I've also added a section discussing some of the concerns around
Hi Colt,
I didn't do any profiling, but the 844 implementation:
- Writes uncommitted records to a temporary RocksDB instance
- Since tombstones need to be flagged, all record values are prefixed
with a value/tombstone marker. This necessitates a memory copy.
- On-commit,
Hi all,
Out of curiosity, why does the performance of the store degrade so
significantly with the 844 implementation? I wouldn't be too surprised by a
50-60% drop (caused by each record being written twice), but 96% is extreme.
The only thing I can think of which could create such a bottleneck
For now, I've settled on choosing an arbitrary default memory limit of
64MiB per-Task for buffering uncommitted records. I noticed that Kafka
Streams already provides some arbitrary default configuration of RocksDB
memory settings (i.e. memtable size etc.), and that many users will already
be
Hi everyone,
I've updated the KIP to resolve all the points that have been raised so
far, with one exception: the ALOS default commit interval of 5 minutes is
likely to cause WriteBatchWithIndex memory to grow too large.
There's a couple of different things I can think of to solve this:
- We
Hi Alex,
Thanks for the feedback.
I've updated the discussion of OOM issues by describing how we'll handle
it. Here's the new text:
To mitigate this, we will automatically force a Task commit if the total
> uncommitted records returned by
> StateStore#approximateNumUncommittedEntries() exceeds
Hey Nick,
Thank you for the KIP! With such a significant performance degradation in
the secondary store approach, we should definitely consider
WriteBatchWithIndex. I also like encapsulating checkpointing inside the
default state store implementation to improve performance.
+1 to John's comment
Hi John,
Thanks for the review and feedback!
1. Custom Stores: I've been mulling over this problem myself. As it stands,
custom stores would essentially lose checkpointing with no indication that
they're expected to make changes, besides a line in the release notes. I
agree that the best
Thanks for publishing this alternative, Nick!
The benchmark you mentioned in the KIP-844 discussion seems like a compelling
reason to revisit the built-in transactionality mechanism. I also appreciate
you analysis, showing that for most use cases, the write batch approach should
be just fine.
Hi everyone,
As I mentioned in the discussion thread for KIP-844, I've been working on
an alternative approach to achieving better transactional semantics for
Kafka Streams StateStores.
I've published this separately as KIP-892: Transactional Semantics for
StateStores
83 matches
Mail list logo