[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r354209108 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ## @@ -113,4 +126,16 @@ public void close() throws RocksDBException { } IOUtils.closeQuietly(batch); } + + private void flushIfNeeded() throws RocksDBException { + boolean needFlush = batch.count() == capacity || (batchSize > 0 && batch.getDataSize() >= batchSize); + if (needFlush) { + flush(); + } + } + + @VisibleForTesting + long getDataSize() { + return batch.getDataSize(); Review comment: IMO the `AverageTime` mode reflects the real world usage of `RocksDBWriteBatchWrapper` better than `SingleShot`, and normally we see 2% as a normal disturbance. With these perf data I'm biased on using the JNI call directly. Wdyt? @StephanEwen @klion26 Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r354204039 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend( this.kvStateInformation = kvStateInformation; this.writeOptions = new WriteOptions().setDisableWAL(true); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); Review comment: Ok, makes sense, thanks for the clarification. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r354204499 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ## @@ -113,4 +126,16 @@ public void close() throws RocksDBException { } IOUtils.closeQuietly(batch); } + + private void flushIfNeeded() throws RocksDBException { + boolean needFlush = batch.count() == capacity || (batchSize > 0 && batch.getDataSize() >= batchSize); Review comment: Ok, makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353878204 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ## @@ -113,4 +126,16 @@ public void close() throws RocksDBException { } IOUtils.closeQuietly(batch); } + + private void flushIfNeeded() throws RocksDBException { + boolean needFlush = batch.count() == capacity || (batchSize > 0 && batch.getDataSize() >= batchSize); + if (needFlush) { + flush(); + } + } + + @VisibleForTesting + long getDataSize() { + return batch.getDataSize(); Review comment: A little bit curious about how much the new implementation (additional calculation on data size) could improve the performance. Mind adding some comparison between the old (jni call) and new implementation here through micro benchmark? @klion26 Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353849218 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -110,6 +112,7 @@ private static boolean rocksDbInitialized = false; private static final int UNDEFINED_NUMBER_OF_TRANSFERING_THREADS = -1; + private static final long UNDEFINED_BATCH_SIZE = -1; Review comment: UNDEFINED_BATCH_SIZE -> UNDEFINED_WRITE_BATCH_SIZE This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353830704 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend( this.kvStateInformation = kvStateInformation; this.writeOptions = new WriteOptions().setDisableWAL(true); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); Review comment: Let me confirm, does the `@Nonnull` annotation work out of IntelliJ? In previous review I was also suggested to remove null pointer checker with `@Nonnull` annotation, maybe this a diverge of code style taste and we should make some discussion in ML and mark them down in our code convention? Another fact is that there're many `@Nonnegative` annotated methods without additional checking, like [HeapPriorityQueueStateSnapshot](https://github.com/apache/flink/blob/e505bef16c2c11250a2420610c1605a4e64b6d10/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java#L71), [KeyGroupPartitioner](https://github.com/apache/flink/blob/e505bef16c2c11250a2420610c1605a4e64b6d10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java#L108), etc. It might be good if we align our code style? Wdyt? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353662612 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -131,7 +135,8 @@ public RocksDBKeyedStateBackendBuilder( MetricGroup metricGroup, @Nonnull Collection stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, - CloseableRegistry cancelStreamRegistry) { + CloseableRegistry cancelStreamRegistry, + @Nonnegative long writeBatchSize) { Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353667305 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -397,7 +406,8 @@ private static void checkAndCreateDirectory(File directory) throws IOException { nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager); + ttlCompactFiltersManager, + writeBatchSize); Review comment: Can we use `this.writeBatchSize` directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353670935 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java ## @@ -119,6 +127,8 @@ public RocksDBFullRestoreOperation( metricGroup, restoreStateHandles, ttlCompactFiltersManager); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative."); Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353669969 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ## @@ -49,22 +53,35 @@ private final int capacity; - public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB) { - this(rocksDB, null, 500); + @Nonnegative + private final long batchSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, long writeBatchSize) { + this(rocksDB, null, 500, writeBatchSize); } public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options) { - this(rocksDB, options, 500); + this(rocksDB, options, 500, DEFAULT_BATCH_SIZE); + } + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options, long batchSize) { + this(rocksDB, options, 500, batchSize); } - public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options, int capacity) { + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options, int capacity, long batchSize) { Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, "capacity should be between " + MIN_CAPACITY + " and " + MAX_CAPACITY); + Preconditions.checkArgument(batchSize >= 0, "Max batch size have to be no negative."); Review comment: We can use `@Nonnegative` for all function parameters instead of `checkArgument` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353662612 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -131,7 +135,8 @@ public RocksDBKeyedStateBackendBuilder( MetricGroup metricGroup, @Nonnull Collection stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, - CloseableRegistry cancelStreamRegistry) { + CloseableRegistry cancelStreamRegistry, + @Nonnegative long writeBatchSize) { Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353673315 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ## @@ -65,4 +69,52 @@ public void basicTest() throws Exception { } } } + + /** +* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory consumed exceeds the preconfigured value. +*/ + @Test + public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws Exception { + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200, 50)) { + // sequence (8 bytes) + count (4 bytes) + // more information please ref to write_batch.cc in RocksDB + assertEquals(12, writeBatchWrapper.getDataSize()); + byte[] dummy = new byte[6]; + ThreadLocalRandom.current().nextBytes(dummy); + // will add 1 + 1 + 1 + 6 + 1 + 6 = 16 bytes for each KV + // format is [handleType|kvType|keyLen|key|valueLen|value] + // more information please ref write_batch.cc in RocksDB + writeBatchWrapper.put(handle, dummy, dummy); + assertEquals(28, writeBatchWrapper.getDataSize()); + writeBatchWrapper.put(handle, dummy, dummy); + assertEquals(44, writeBatchWrapper.getDataSize()); + writeBatchWrapper.put(handle, dummy, dummy); + // will flush all, then an empty write batch + assertEquals(12, writeBatchWrapper.getDataSize()); + } + } + + /** +* Tests that {@link RocksDBWriteBatchWrapper} flushes after the kv count exceeds the preconfigured value. +*/ + @Test + public void testWriteBatchWrapperFlushAfterCountExceed() throws Exception { + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 100, 5)) { + byte[] dummy = new byte[2]; + ThreadLocalRandom.current().nextBytes(dummy); + for (int i = 1; i < 100; ++i) { + writeBatchWrapper.put(handle, dummy, dummy); + // init 12 bytes, each kv consumes 8 bytes + assertEquals(12 + 8 * i, writeBatchWrapper.getDataSize()); Review comment: Ditto. Recording the init size of empty `writeBatchWrapper` instead using the hard-coded value `12` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353671065 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java ## @@ -125,6 +129,8 @@ public RocksDBIncrementalRestoreOperation( this.restoredSstFiles = new TreeMap<>(); this.lastCompletedCheckpointId = -1L; this.backendUID = UUID.randomUUID(); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative."); Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353669265 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -166,6 +169,11 @@ /** Whether we already lazily initialized our local storage directories. */ private transient boolean isInitialized; + /** +* Max consumed memory size for one batch in {@link RocksDBWriteBatchWrapper}, default value 2mb. +*/ + private long batchSize; Review comment: Suggest to change all "batchSize/BATCH_SIZE" to "writeBatchSize/WRITE_BATCH_SIZE" in this class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353667120 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -414,7 +424,8 @@ private static void checkAndCreateDirectory(File directory) throws IOException { nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager); + ttlCompactFiltersManager, + writeBatchSize); Review comment: Can we use `this.writeBatchSize` directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353670772 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ## @@ -113,4 +126,16 @@ public void close() throws RocksDBException { } IOUtils.closeQuietly(batch); } + + private void flushIfNeeded() throws RocksDBException { + boolean needFlush = batch.count() == capacity || (batchSize > 0 && batch.getDataSize() >= batchSize); Review comment: No need to check whether `batchSize` is larger than 0 since we already have the `@Nonnegative` annotation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353663839 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend( this.kvStateInformation = kvStateInformation; this.writeOptions = new WriteOptions().setDisableWAL(true); + checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); Review comment: I think we don't need to `checkArgument` when the passed by argument is annotated as `@Nonnegative` since this is already a contract. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#discussion_r353672881 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ## @@ -65,4 +69,52 @@ public void basicTest() throws Exception { } } } + + /** +* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory consumed exceeds the preconfigured value. +*/ + @Test + public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws Exception { + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200, 50)) { + // sequence (8 bytes) + count (4 bytes) + // more information please ref to write_batch.cc in RocksDB + assertEquals(12, writeBatchWrapper.getDataSize()); Review comment: This assertion might fail if the rocksdb implementation changes and is unnecessary. We could simply record an `initialSize` for `writeBatchWrapper.getDataSize()` and check against it after flush is triggered. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services