[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-02-10 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17033915#comment-17033915
 ] 

Sophie Blee-Goldman commented on KAFKA-9450:


[~NaviBrar] I assume that was only merged into the most recent RocksdB version? 
We aren't able to bump the rocksdb dependency further until the next major 
version bump due to some breaking changes in the options-related API. Not sure 
if the rocks folks might be willing to cherry-pick this back to a 5.x version 
and release that, if not maybe you should make a ticket to track this and mark 
as blocked by KAFKA-8897 so it doesn't get forgotten

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-28 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17025550#comment-17025550
 ] 

Navinder Brar commented on KAFKA-9450:
--

Checked Rocksdb code, event listeners are not available in the jni. It's 
probably in the plan but not available in any of the versions yet.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-27 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17024902#comment-17024902
 ] 

Navinder Brar commented on KAFKA-9450:
--

Or do you suggest to never (ie, for EOS and non-EOS case) call 
`innerByteStore#flush()`? This might be possible but would have a negative 
impact on non-EOS as it would make current fault-tolerance mechanism for 
non-EOS less efficient (we would not have a guarantee on commit that data is 
flushed to disk and might need to recover more data from the changelog topic in 
case of failure). 

>>> [~mjsax] do you mean we still write to checkpoint file(for non-EOS) on 
>>> every commit but remove flush. That would be dangerous right? As if the 
>>> data from Rockdsdb is not flushed for the checkpoints that have been 
>>> written in checkpoint file we have lost the data and moved ahead as well. 

 

Can we add event listeners on Rocksdb(EventListener::OnFlushCompleted()) and 
whenever a particular store is flushed, commit the checkpoint for that 
particular store(changelog) in the checkpoint file. Currently, we are 
overriding most performance-based Rocksdb configs(memtable size, max writer 
buffers) by making the commit based on time. If this seems reasonable, I can 
work on this.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17024742#comment-17024742
 ] 

Matthias J. Sax commented on KAFKA-9450:


Yes, the ticket was created because the default EOS commit interval of 100ms 
make the situation much worse. But I agree, that fixing this can also benefit 
the non-EOS case. Hence, I don't think we need a separate ticket.

However, for non-EOS, increasing the commit interval to work against this issue 
is actually more viable than for the EOS case (as for the EOS case, it would 
increase the end-to-end processing latency significantly; what it the reason 
for the 100ms default commit interval to begin with).

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-25 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17023643#comment-17023643
 ] 

Navinder Brar commented on KAFKA-9450:
--

Great that I caught hold of this Jira. I had been meaning to start some 
discussion around decoupling flush and committing. This ticket only deals with 
EOS, is it because the commit interval for non EOS is 30 seconds? Rocksdb 
flushing at 30 seconds also is a pretty big issue for us. I think the default 
"Level0FileNumCompactionTrigger" in Rocksdb is 4. Since the default max writers 
in rocksdbStore.java are 3, so 2 memtables get flushed at every 30 seconds. 
When someone has multiple stores in topology this means that there at least is 
one store that is undergoing compaction every 30 seconds(at most 1 minute). I 
was tracking CPU usage a few days ago while facing increased latencies in the 
99th percentile every 30 seconds which was exactly overlapping with flush(which 
kind of leads to compaction) and there is huge bump in CPU usage every 30 
seconds. So, to overcome for now I have increased the commit interval in our 
system to 30 minutes(and even in 30 minutes our memtables are not full) that 
also is inefficient. Let me know if we can discuss non EOS also here or should 
I create a separate ticket for it. 

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020489#comment-17020489
 ] 

Matthias J. Sax commented on KAFKA-9450:


For EOS, we don't write a checkpoint file, and thus we would also not add the 
metadata as a preserved key in the store – hence, it's unclear to me how 
changing where we store the offset-metadata would help for this ticket? This 
tickets says, we don't want to call `innerByteStore#flush` when we call 
`cachingStore#flush` and `changeloggingStore#flush` if EOS is enabled – 
however, stores themselves are agnostic if EOS is enabled and not (what is a 
good thing IMHO). Hence, we can only avoid calling `innerByteStore#flush()` if 
we decouple the caching/changelog/innerBytesStores from each other and the KS 
runtime does not call a single #flush() on the outer metered store that wraps 
all other stores and implicitly flushes all wrapped store, but KS can access 
each store-layer individually and flush them individually as needed.

Or do you suggest to never (ie, for EOS and non-EOS case) call 
`innerByteStore#flush()`? This might be possible, but would have a negative 
impact on non-EOS as it would make current fault-tolerance mechanism for 
non-EOS less efficient (we would not have a guarantee on commit that data is 
flushed to disk and might need to recover more data from the changelog topic in 
case of failure). 

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018781#comment-17018781
 ] 

Guozhang Wang commented on KAFKA-9450:
--

I think John's idea is to e.g. use a preserved key to store the current offset 
of the changelog as a preserved key, so that each time we update a value, we 
will also update this key for offset update. By doing this we would not need a 
rocksDB#flush when checkpointing, and neither do we need a checkpoint file. One 
tricky thing however, is that the offset of the written changelog is only 
updated and known after the batching produce response returns, which may not be 
immediate after the RocksDB value update.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018750#comment-17018750
 ] 

Matthias J. Sax commented on KAFKA-9450:


[~vvcephei] Your comment seem to be an orthogonal concern (that I actually 
don't share – when we call RocksDB#flush(), it seems to be safe to assume that 
RocksDB persisted the data – why do you doubt that RocksDB does not guarantee 
this; and if is did not persist it, it would be a RocksDB bug IHMO that should 
just get fixed.).

Nevertheless, this ticket is about decoupling of changelog flushing and local 
disk flushing – in contrast your comment is about two aspect of local disk 
flushing, ie, the data itself and the metadata (ie, checkpoint).

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-18 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018602#comment-17018602
 ] 

Ted Yu commented on KAFKA-9450:
---

w.r.t. separate column family, since the data in this family tends to be small 
compared to the data family, wouldn't we end up with small files similar to 
rocksdb memtable flush ?

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-17 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018501#comment-17018501
 ] 

John Roesler commented on KAFKA-9450:
-

This reminds me of an idea I proposed a while back, and haven't been able to 
let go of. I think it's buried in a Jira ticket somewhere.

One practical complexity with flushing is to guarantee that persistent stores 
actually do a fs sync call before we write the checkpoint file, which in turn 
is to guarantee that in a crash-recovery scenario, the offset in the recovered 
checkpoint file is always equal to or before the state of the recovered store.

The same goal could be accomplished without any filesystem intricacies if we 
store the offset in the same store as the data. Think: either a reserved key, 
or a separate column family. This would allow the underlying store to ensure 
the order of data updates with respect to changelog offset updates on its own 
(using its internal translog or whatever).

Anyway, I bring this up right now because offhand, I can't think of any reason 
we'd actually need to flush bytes stores at all if we did things that way.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)