[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5650 ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174479455 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH); + + // write batch with disableWAL=true VS write batch disableWAL = true + System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--"); +
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174479415 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH); + + // write batch with disableWAL=true VS write batch disableWAL = true + System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--"); +
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174479344 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); --- End diff -- ð ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174479322 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { --- End diff -- ð ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174231064 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); --- End diff -- replace console output with logging, you can refer to `RocksDBListStatePerformanceTest.java` ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174230739 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { --- End diff -- Need to move the benchmark test to `org.apache.flink.contrib.streaming.state.benchmark` package. ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174231142 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH); + + // write batch with disableWAL=true VS write batch disableWAL = true + System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--"); +
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174231173 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH); + + // write batch with disableWAL=true VS write batch disableWAL = true + System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--"); +
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173049537 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); --- End diff -- About the capacity range, I didn't find a specific value recommend by RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ) ``` Q: What's the fastest way to load data into RocksDB? ... 2. batch hundreds of keys into one write batch ... ``` I found that they use the word `hundreds`. ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173048763 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { + + this.batch.put(handle, key, value); + + if (++currentSize == capacity) { + flush(); + } + } + + public void flush() throws RocksDBException { + this.db.write(options, batch); + batch.clear(); + currentSize = 0; + } + + @Override + public void close() throws RocksDBException { + if (batch != null) { --- End diff -- You are right, this `if` can be removed. ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173048697 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { --- End diff -- Hmm... currently, it is only used in single thread. For the best performance, I wouldn't like to add synchronization for it, I'd like to add annotation for this class that it's not thread safe. We could introduce a new class that is thread safe if we really need it. What do you think? ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172935414 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { + + this.batch.put(handle, key, value); + + if (++currentSize == capacity) { + flush(); + } + } + + public void flush() throws RocksDBException { + this.db.write(options, batch); + batch.clear(); + currentSize = 0; + } + + @Override + public void close() throws RocksDBException { + if (batch != null) { --- End diff -- can batch be null? ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172935214 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { --- End diff -- need synchronization on put() and flush() ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172934683 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); --- End diff -- how is the capacity range determined - is it recommended by RocksDB? the msg should be: "capacity should be between " + MIN + " and " + MAX ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5650 [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve performance for recovery in RocksDB backend ## What is the purpose of the change This PR addresses [FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845), which attempts to use `WriteBatch` to improve the performance for loading data into RocksDB. It's inspired by [RocksDB FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ). ## Brief change log - *Introduce `RocksDBWriteBatchWrapper` to load data into RocksDB in bulk* ## Verifying this change - Introduce `RocksDBWriteBatchWrapperTest.java` to guard `RocksDBWriteBatchWrapper`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink rocksdb_write_batch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5650.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5650 commit e710287495d2a1a12a99b812c9691e12c6c57459 Author: sihuazhouDate: 2018-03-07T05:58:45Z Introduce RocksDBWriteBatchWrapper to speed up write performance. ---