[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-06-05 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502809#comment-16502809
 ] 

Sihua Zhou commented on FLINK-8845:
---

Hi [~noliran] Yes, this is on purpose. In fact, we 
tried([FLINK-8859|https://issues.apache.org/jira/browse/FLINK-8859]) to disable 
WAL in RocksDBKeyedStateBackend when restoring the backend, but it will cause 
segfaults on 
travis([FLINK-8882|https://issues.apache.org/jira/browse/FLINK-8882]), and the 
reason why it caused the segfaults is still not clear, so we 
reverted([8922|https://issues.apache.org/jira/browse/FLINK-8922]) it in the 
end. 

> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-06-05 Thread Noam Liran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502447#comment-16502447
 ] 

Noam Liran commented on FLINK-8845:
---

Hi [~srichter], [~sihuazhou],

I noticed that the code in RocksDBKeyedStateBackend does not disable WAL, even 
though it is disabled in other places (and in the benchmarks).

Is this on purpose?

 

Thanks,

Noam

 

> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487055#comment-16487055
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5650


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486877#comment-16486877
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5650
  
@sihuazhou thanks for this nice contribution. LGTM  Will merge.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16482298#comment-16482298
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5650
  
A micro-benchmark for this: @StefanRRichter @StephanEwen 
```
---> With disableWAL is false <-
Number of values added | time for Put   |  time for WriteBach | 
performance improvement of WriteBatch over Put
100010146397 ns 3546287 ns  
2.86x
1   118227077 ns26040222 ns 
4.54x
10  1838593196 ns   375053755 ns
4.9x
100 8844612079 ns   2014077396 ns   
4.39x

---> With disableWAL is true <-
10003955204 ns  2429725 ns  
1.62x
1   25618237 ns 16440113 ns 
1.55x
10  289153346 ns183712685 ns
1.57x
100 2886298967 ns   1768688571 ns   
1.63x
```


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398676#comment-16398676
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174479415
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);

[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398677#comment-16398677
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174479455
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);

[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398675#comment-16398675
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174479344
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
--- End diff --

 


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>

[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398673#comment-16398673
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174479322
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
--- End diff --

 


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398713#comment-16398713
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5650
  
@bowenli86 Thanks a lot for you reviews, I've addressed all your comments, 
I'm waiting the 1.5 to be released, after that maybe @StefanRRichter could also 
have a look at this.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397394#comment-16397394
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231064
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
--- End diff --

replace console output with logging, you can refer to 
`RocksDBListStatePerformanceTest.java`


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>   

[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397393#comment-16397393
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231142
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);

[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397392#comment-16397392
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231173
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);

[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397395#comment-16397395
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174230739
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
--- End diff --

Need to move the benchmark test to 
`org.apache.flink.contrib.streaming.state.benchmark` package.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390632#comment-16390632
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173049537
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

About the capacity range, I didn't find a specific value recommend by 
RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ)
```
Q: What's the fastest way to load data into RocksDB?
...
2. batch hundreds of keys into one write batch
...
```
I found that they use the word `hundreds`.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390623#comment-16390623
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173048763
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

You are right, this `if` can be removed.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390622#comment-16390622
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173048697
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

Hmm... currently, it is only used in single thread. For the best 
performance, I wouldn't like to add synchronization for it, I'd like to add 
annotation for this class that it's not thread safe. We could introduce a new 
class that is thread safe if we really need it. What do you think? 


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390025#comment-16390025
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935414
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

can batch be null?


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390026#comment-16390026
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935214
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

need synchronization on put() and flush()


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390024#comment-16390024
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172934683
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

how is the capacity range determined - is it recommended by RocksDB?

the msg should be: "capacity should be between " + MIN + " and " + MAX


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389297#comment-16389297
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5650

[FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve 
performance for recovery in RocksDB backend

## What is the purpose of the change

This PR addresses 
[FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845), which attempts 
to use `WriteBatch` to improve the performance for loading data into RocksDB. 
It's inspired by [RocksDB 
FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ).

## Brief change log

  - *Introduce `RocksDBWriteBatchWrapper` to load data into RocksDB in bulk*

## Verifying this change

- Introduce `RocksDBWriteBatchWrapperTest.java` to guard 
`RocksDBWriteBatchWrapper`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation
none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink rocksdb_write_batch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5650


commit e710287495d2a1a12a99b812c9691e12c6c57459
Author: sihuazhou 
Date:   2018-03-07T05:58:45Z

Introduce RocksDBWriteBatchWrapper to speed up write performance.




> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)