[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-05 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r354209108
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ##
 @@ -113,4 +126,16 @@ public void close() throws RocksDBException {
}
IOUtils.closeQuietly(batch);
}
+
+   private void flushIfNeeded() throws RocksDBException {
+   boolean needFlush = batch.count() == capacity || (batchSize > 0 
&& batch.getDataSize() >= batchSize);
+   if (needFlush) {
+   flush();
+   }
+   }
+
+   @VisibleForTesting
+   long getDataSize() {
+   return batch.getDataSize();
 
 Review comment:
   IMO the `AverageTime` mode reflects the real world usage of 
`RocksDBWriteBatchWrapper` better than `SingleShot`, and normally we see 2% as 
a normal disturbance. With these perf data I'm biased on using the JNI call 
directly. Wdyt? @StephanEwen @klion26 Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-05 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r354204039
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend(
this.kvStateInformation = kvStateInformation;
 
this.writeOptions = new WriteOptions().setDisableWAL(true);
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
 
 Review comment:
   Ok, makes sense, thanks for the clarification.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-05 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r354204499
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ##
 @@ -113,4 +126,16 @@ public void close() throws RocksDBException {
}
IOUtils.closeQuietly(batch);
}
+
+   private void flushIfNeeded() throws RocksDBException {
+   boolean needFlush = batch.count() == capacity || (batchSize > 0 
&& batch.getDataSize() >= batchSize);
 
 Review comment:
   Ok, makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353878204
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ##
 @@ -113,4 +126,16 @@ public void close() throws RocksDBException {
}
IOUtils.closeQuietly(batch);
}
+
+   private void flushIfNeeded() throws RocksDBException {
+   boolean needFlush = batch.count() == capacity || (batchSize > 0 
&& batch.getDataSize() >= batchSize);
+   if (needFlush) {
+   flush();
+   }
+   }
+
+   @VisibleForTesting
+   long getDataSize() {
+   return batch.getDataSize();
 
 Review comment:
   A little bit curious about how much the new implementation (additional 
calculation on data size) could improve the performance. Mind adding some 
comparison between the old (jni call) and new implementation here through micro 
benchmark? @klion26  Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353849218
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -110,6 +112,7 @@
private static boolean rocksDbInitialized = false;
 
private static final int UNDEFINED_NUMBER_OF_TRANSFERING_THREADS = -1;
+   private static final long UNDEFINED_BATCH_SIZE = -1;
 
 Review comment:
   UNDEFINED_BATCH_SIZE -> UNDEFINED_WRITE_BATCH_SIZE


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353830704
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend(
this.kvStateInformation = kvStateInformation;
 
this.writeOptions = new WriteOptions().setDisableWAL(true);
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
 
 Review comment:
   Let me confirm, does the `@Nonnull` annotation work out of IntelliJ? In 
previous review I was also suggested to remove null pointer checker with 
`@Nonnull` annotation, maybe this a diverge of code style taste and we should 
make some discussion in ML and mark them down in our code convention?
   
   Another fact is that there're many `@Nonnegative` annotated methods without 
additional checking, like 
[HeapPriorityQueueStateSnapshot](https://github.com/apache/flink/blob/e505bef16c2c11250a2420610c1605a4e64b6d10/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java#L71),
 
[KeyGroupPartitioner](https://github.com/apache/flink/blob/e505bef16c2c11250a2420610c1605a4e64b6d10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java#L108),
 etc. It might be good if we align our code style? Wdyt? Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353662612
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 ##
 @@ -131,7 +135,8 @@ public RocksDBKeyedStateBackendBuilder(
MetricGroup metricGroup,
@Nonnull Collection stateHandles,
StreamCompressionDecorator keyGroupCompressionDecorator,
-   CloseableRegistry cancelStreamRegistry) {
+   CloseableRegistry cancelStreamRegistry,
+   @Nonnegative long writeBatchSize) {
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353667305
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 ##
 @@ -397,7 +406,8 @@ private static void checkAndCreateDirectory(File 
directory) throws IOException {
nativeMetricOptions,
metricGroup,
restoreStateHandles,
-   ttlCompactFiltersManager);
+   ttlCompactFiltersManager,
+   writeBatchSize);
 
 Review comment:
   Can we use `this.writeBatchSize` directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353670935
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
 ##
 @@ -119,6 +127,8 @@ public RocksDBFullRestoreOperation(
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager);
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative.");
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353669969
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ##
 @@ -49,22 +53,35 @@
 
private final int capacity;
 
-   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB) {
-   this(rocksDB, null, 500);
+   @Nonnegative
+   private final long batchSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, long 
writeBatchSize) {
+   this(rocksDB, null, 500, writeBatchSize);
}
 
public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable 
WriteOptions options) {
-   this(rocksDB, options, 500);
+   this(rocksDB, options, 500, DEFAULT_BATCH_SIZE);
+   }
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable 
WriteOptions options, long batchSize) {
+   this(rocksDB, options, 500, batchSize);
}
 
-   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable 
WriteOptions options, int capacity) {
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable 
WriteOptions options, int capacity, long batchSize) {
Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
"capacity should be between " + MIN_CAPACITY + " and " 
+ MAX_CAPACITY);
+   Preconditions.checkArgument(batchSize >= 0, "Max batch size 
have to be no negative.");
 
 Review comment:
   We can use `@Nonnegative` for all function parameters instead of 
`checkArgument`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353662612
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 ##
 @@ -131,7 +135,8 @@ public RocksDBKeyedStateBackendBuilder(
MetricGroup metricGroup,
@Nonnull Collection stateHandles,
StreamCompressionDecorator keyGroupCompressionDecorator,
-   CloseableRegistry cancelStreamRegistry) {
+   CloseableRegistry cancelStreamRegistry,
+   @Nonnegative long writeBatchSize) {
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353673315
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ##
 @@ -65,4 +69,52 @@ public void basicTest() throws Exception {
}
}
}
+
+   /**
+* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory 
consumed exceeds the preconfigured value.
+*/
+   @Test
+   public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws 
Exception {
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200, 50)) {
+   // sequence (8 bytes) + count (4 bytes)
+   // more information please ref to write_batch.cc in 
RocksDB
+   assertEquals(12, writeBatchWrapper.getDataSize());
+   byte[] dummy = new byte[6];
+   ThreadLocalRandom.current().nextBytes(dummy);
+   // will add 1 + 1 + 1 + 6 + 1 + 6 = 16 bytes for each KV
+   // format is 
[handleType|kvType|keyLen|key|valueLen|value]
+   // more information please ref write_batch.cc in RocksDB
+   writeBatchWrapper.put(handle, dummy, dummy);
+   assertEquals(28, writeBatchWrapper.getDataSize());
+   writeBatchWrapper.put(handle, dummy, dummy);
+   assertEquals(44, writeBatchWrapper.getDataSize());
+   writeBatchWrapper.put(handle, dummy, dummy);
+   // will flush all, then an empty write batch
+   assertEquals(12, writeBatchWrapper.getDataSize());
+   }
+   }
+
+   /**
+* Tests that {@link RocksDBWriteBatchWrapper} flushes after the kv 
count exceeds the preconfigured value.
+*/
+   @Test
+   public void testWriteBatchWrapperFlushAfterCountExceed() throws 
Exception {
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 100, 5)) {
+   byte[] dummy = new byte[2];
+   ThreadLocalRandom.current().nextBytes(dummy);
+   for (int i = 1; i < 100; ++i) {
+   writeBatchWrapper.put(handle, dummy, dummy);
+   // init 12 bytes, each kv consumes 8 bytes
+   assertEquals(12 + 8 * i, 
writeBatchWrapper.getDataSize());
 
 Review comment:
   Ditto. Recording the init size of empty `writeBatchWrapper` instead using 
the hard-coded value `12`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353671065
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 ##
 @@ -125,6 +129,8 @@ public RocksDBIncrementalRestoreOperation(
this.restoredSstFiles = new TreeMap<>();
this.lastCompletedCheckpointId = -1L;
this.backendUID = UUID.randomUUID();
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative.");
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353669265
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -166,6 +169,11 @@
/** Whether we already lazily initialized our local storage 
directories. */
private transient boolean isInitialized;
 
+   /**
+* Max consumed memory size for one batch in {@link 
RocksDBWriteBatchWrapper}, default value 2mb.
+*/
+   private long batchSize;
 
 Review comment:
   Suggest to change all "batchSize/BATCH_SIZE" to 
"writeBatchSize/WRITE_BATCH_SIZE" in this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353667120
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 ##
 @@ -414,7 +424,8 @@ private static void checkAndCreateDirectory(File 
directory) throws IOException {
nativeMetricOptions,
metricGroup,
restoreStateHandles,
-   ttlCompactFiltersManager);
+   ttlCompactFiltersManager,
+   writeBatchSize);
 
 Review comment:
   Can we use `this.writeBatchSize` directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353670772
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ##
 @@ -113,4 +126,16 @@ public void close() throws RocksDBException {
}
IOUtils.closeQuietly(batch);
}
+
+   private void flushIfNeeded() throws RocksDBException {
+   boolean needFlush = batch.count() == capacity || (batchSize > 0 
&& batch.getDataSize() >= batchSize);
 
 Review comment:
   No need to check whether `batchSize` is larger than 0 since we already have 
the `@Nonnegative` annotation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353663839
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -244,6 +252,8 @@ public RocksDBKeyedStateBackend(
this.kvStateInformation = kvStateInformation;
 
this.writeOptions = new WriteOptions().setDisableWAL(true);
+   checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
 
 Review comment:
   I think we don't need to `checkArgument` when the passed by argument is 
annotated as `@Nonnegative` since this is already a contract.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-04 Thread GitBox
carp84 commented on a change in pull request #10329: 
[FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of 
unmanaged memory
URL: https://github.com/apache/flink/pull/10329#discussion_r353672881
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ##
 @@ -65,4 +69,52 @@ public void basicTest() throws Exception {
}
}
}
+
+   /**
+* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory 
consumed exceeds the preconfigured value.
+*/
+   @Test
+   public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws 
Exception {
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200, 50)) {
+   // sequence (8 bytes) + count (4 bytes)
+   // more information please ref to write_batch.cc in 
RocksDB
+   assertEquals(12, writeBatchWrapper.getDataSize());
 
 Review comment:
   This assertion might fail if the rocksdb implementation changes and is 
unnecessary. We could simply record an `initialSize` for 
`writeBatchWrapper.getDataSize()` and check against it after flush is triggered.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services