[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-05-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5650


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174479455
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with disableWAL=true VS write batch disableWAL = 
true
+   System.out.println("--> write batch with disableWAL=true VS 
write batch disableWAL = true <--");
+   

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174479415
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with disableWAL=true VS write batch disableWAL = 
true
+   System.out.println("--> write batch with disableWAL=true VS 
write batch disableWAL = true <--");
+   

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174479344
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
--- End diff --

👍 


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174479322
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
--- End diff --

👍 


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231064
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
--- End diff --

replace console output with logging, you can refer to 
`RocksDBListStatePerformanceTest.java`


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174230739
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
--- End diff --

Need to move the benchmark test to 
`org.apache.flink.contrib.streaming.state.benchmark` package.


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231142
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with disableWAL=true VS write batch disableWAL = 
true
+   System.out.println("--> write batch with disableWAL=true VS 
write batch disableWAL = true <--");
+   

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231173
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with disableWAL=true VS write batch disableWAL = 
true
+   System.out.println("--> write batch with disableWAL=true VS 
write batch disableWAL = true <--");
+   

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173049537
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

About the capacity range, I didn't find a specific value recommend by 
RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ)
```
Q: What's the fastest way to load data into RocksDB?
...
2. batch hundreds of keys into one write batch
...
```
I found that they use the word `hundreds`.


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173048763
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

You are right, this `if` can be removed.


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173048697
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

Hmm... currently, it is only used in single thread. For the best 
performance, I wouldn't like to add synchronization for it, I'd like to add 
annotation for this class that it's not thread safe. We could introduce a new 
class that is thread safe if we really need it. What do you think? 


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935414
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

can batch be null?


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935214
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

need synchronization on put() and flush()


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172934683
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

how is the capacity range determined - is it recommended by RocksDB?

the msg should be: "capacity should be between " + MIN + " and " + MAX


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5650

[FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve 
performance for recovery in RocksDB backend

## What is the purpose of the change

This PR addresses 
[FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845), which attempts 
to use `WriteBatch` to improve the performance for loading data into RocksDB. 
It's inspired by [RocksDB 
FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ).

## Brief change log

  - *Introduce `RocksDBWriteBatchWrapper` to load data into RocksDB in bulk*

## Verifying this change

- Introduce `RocksDBWriteBatchWrapperTest.java` to guard 
`RocksDBWriteBatchWrapper`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation
none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink rocksdb_write_batch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5650


commit e710287495d2a1a12a99b812c9691e12c6c57459
Author: sihuazhou 
Date:   2018-03-07T05:58:45Z

Introduce RocksDBWriteBatchWrapper to speed up write performance.




---