mjsax commented on code in PR #22526:
URL: https://github.com/apache/kafka/pull/22526#discussion_r3390764327
##########
docs/streams/developer-guide/memory-mgmt.md:
##########
@@ -130,6 +130,28 @@ To avoid reading stale data, you can `flush()` the store
before creating the ite
Each instance of RocksDB allocates off-heap memory for a block cache, index
and filter blocks, and memtable (write buffer). Critical configs (for RocksDB
version 4.1.0) include `block_cache_size`, `write_buffer_size` and
`max_write_buffer_number`. These can be specified through the
`rocksdb.config.setter` configuration.
+### Changelog offset durability and flush frequency
{#rocksdb-offset-durability}
+
+Since 4.3
([KIP-1035](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets)),
Kafka Streams stores each persistent store's changelog offset inside RocksDB
rather than in a per-task `.checkpoint` file. Kafka Streams runs RocksDB with
the write-ahead log (WAL) disabled — the changelog topic is the durable log of
record — so data and the changelog offset become durable on disk only when a
memtable is **flushed** to an SST file. A flush happens when the memtable fills
`write_buffer_size` (16 MB by default) or when the store is **closed cleanly**
(`KafkaStreams#close`). Unlike earlier releases, Kafka Streams no longer
force-flushes RocksDB on every commit.
+
+The practical implication is that the on-disk changelog offset is only as
fresh as the last organic flush or clean close. For a high-throughput store the
memtable fills frequently, so this is not a concern. For a **low-traffic
store** whose memtable may take a long time to fill, the persisted offset can
lag the store's actual position for an extended period. If the application then
exits uncleanly (for example `SIGKILL`, an OOM-kill, or a `KafkaStreams#close`
that does not complete within the process/pod shutdown grace period), only the
last-flushed offset survives. Should the changelog topic's log-start offset
have advanced past that stale offset (via retention or compaction) in the
meantime, the restore consumer seeks out of range on the next restart, and the
task is re-initialized from the changelog (logged as
`OffsetOutOfRangeException` / `TaskCorruptedException`). This recovers
automatically with no data loss, but causes a full re-restore of the task.
+
+To reduce the likelihood of this for an affected low-traffic store, you can
make it flush more often by lowering its `write_buffer_size` (and adjusting
`max_write_buffer_number`) through a custom `RocksDBConfigSetter`:
+
+ public static class CustomRocksDBConfig implements RocksDBConfigSetter {
+ @Override
+ public void setConfig(final String storeName, final Options options,
final Map<String, Object> configs) {
+ // smaller write buffer => more frequent flushes => fresher
on-disk changelog offset,
+ // at the cost of more (smaller) SST files and more compaction work
+ options.setWriteBufferSize(4 * 1024 * 1024L);
+ }
+
+ @Override
+ public void close(final String storeName, final Options options) {}
+ }
+
+This is a trade-off: a smaller write buffer bounds how stale the persisted
offset can get, but increases the number of SST files and the compaction load.
So tune only the specific low-traffic stores that need it, and size the buffer
to the store's write rate. Note that flushing is **volume-based, not
time-based**. A store that receives only a trickle of writes may still not
flush until it is closed, so the most reliable protection is a **clean
shutdown**. Register a shutdown hook that calls `KafkaStreams#close`, and
ensure it is allowed to complete — on Kubernetes, set the close timeout
comfortably below the pod termination grace period (default 30s) so the process
is not `SIGKILL`ed mid-close. Note that the record cache
(`statestore.cache.max.bytes`) in front of the store coalesces repeated updates
to the same key in place before they are written to RocksDB. So for an
update-heavy, small-keyspace workload even fewer bytes reach the memtable per
commit, making it fill more slow
ly.
Review Comment:
Seems I was wrong...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]