mjsax commented on code in PR #22526:
URL: https://github.com/apache/kafka/pull/22526#discussion_r3384589547


##########
docs/streams/developer-guide/memory-mgmt.md:
##########
@@ -130,6 +130,28 @@ To avoid reading stale data, you can `flush()` the store 
before creating the ite
 
 Each instance of RocksDB allocates off-heap memory for a block cache, index 
and filter blocks, and memtable (write buffer). Critical configs (for RocksDB 
version 4.1.0) include `block_cache_size`, `write_buffer_size` and 
`max_write_buffer_number`. These can be specified through the 
`rocksdb.config.setter` configuration.
 
+### Changelog offset durability and flush frequency 
{#rocksdb-offset-durability}
+
+Since 4.3 
([KIP-1035](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets)),
 Kafka Streams stores each persistent store's changelog offset inside RocksDB 
rather than in a per-task `.checkpoint` file. Kafka Streams runs RocksDB with 
the write-ahead log (WAL) disabled — the changelog topic is the durable log of 
record — so data and the changelog offset become durable on disk only when a 
memtable is **flushed** to an SST file. A flush happens when the memtable fills 
`write_buffer_size` (16 MB by default) or when the store is **closed cleanly** 
(`KafkaStreams#close`). Unlike earlier releases, Kafka Streams no longer 
force-flushes RocksDB on every commit.
+
+The practical implication is that the on-disk changelog offset is only as 
fresh as the last organic flush or clean close. For a high-throughput store the 
memtable fills frequently, so this is not a concern. For a **low-traffic 
store** whose memtable may take a long time to fill, the persisted offset can 
lag the store's actual position for an extended period. If the application then 
exits uncleanly (for example `SIGKILL`, an OOM-kill, or a `KafkaStreams#close` 
that does not complete within the process/pod shutdown grace period), only the 
last-flushed offset survives. Should the changelog topic's log-start offset 
have advanced past that stale offset (via retention or compaction) in the 
meantime, the restore consumer seeks out of range on the next restart, and the 
task is re-initialized from the changelog (logged as 
`OffsetOutOfRangeException` / `TaskCorruptedException`). This recovers 
automatically with no data loss, but causes a full re-restore of the task.
+
+To reduce the likelihood of this for an affected low-traffic store, you can 
make it flush more often by lowering its `write_buffer_size` (and adjusting 
`max_write_buffer_number`) through a custom `RocksDBConfigSetter`:
+
+    public static class CustomRocksDBConfig implements RocksDBConfigSetter {
+        @Override
+        public void setConfig(final String storeName, final Options options, 
final Map<String, Object> configs) {
+            // smaller write buffer => more frequent flushes => fresher 
on-disk changelog offset,
+            // at the cost of more (smaller) SST files and more compaction work
+            options.setWriteBufferSize(4 * 1024 * 1024L);
+        }
+
+        @Override
+        public void close(final String storeName, final Options options) {}
+    }
+
+This is a trade-off: a smaller write buffer bounds how stale the persisted 
offset can get, but increases the number of SST files and the compaction load.  
So tune only the specific low-traffic stores that need it, and size the buffer 
to the store's write rate. Note that flushing is **volume-based, not 
time-based**.  A store that receives only a trickle of writes may still not 
flush until it is closed, so the most reliable protection is a **clean 
shutdown**. Register a shutdown hook that calls `KafkaStreams#close`, and 
ensure it is allowed to complete — on Kubernetes, set the close timeout 
comfortably below the pod termination grace period (default 30s) so the process 
is not `SIGKILL`ed mid-close.

Review Comment:
   Should we add a note, that the buffer fills up with the distinct keys, but 
writes to the same key don't necessarily increase the used bytes, as it would 
overwrite the existing entry in-place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to