[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502809#comment-16502809 ] Sihua Zhou commented on FLINK-8845: --- Hi [~noliran] Yes, this is on purpose. In fact, we tried([FLINK-8859|https://issues.apache.org/jira/browse/FLINK-8859]) to disable WAL in RocksDBKeyedStateBackend when restoring the backend, but it will cause segfaults on travis([FLINK-8882|https://issues.apache.org/jira/browse/FLINK-8882]), and the reason why it caused the segfaults is still not clear, so we reverted([8922|https://issues.apache.org/jira/browse/FLINK-8922]) it in the end. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502447#comment-16502447 ] Noam Liran commented on FLINK-8845: --- Hi [~srichter], [~sihuazhou], I noticed that the code in RocksDBKeyedStateBackend does not disable WAL, even though it is disabled in other places (and in the benchmarks). Is this on purpose? Thanks, Noam > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487055#comment-16487055 ] ASF GitHub Bot commented on FLINK-8845: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5650 > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486877#comment-16486877 ] ASF GitHub Bot commented on FLINK-8845: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5650 @sihuazhou thanks for this nice contribution. LGTM 👍 Will merge. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16482298#comment-16482298 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5650 A micro-benchmark for this: @StefanRRichter @StephanEwen ``` ---> With disableWAL is false <- Number of values added | time for Put | time for WriteBach | performance improvement of WriteBatch over Put 100010146397 ns 3546287 ns 2.86x 1 118227077 ns26040222 ns 4.54x 10 1838593196 ns 375053755 ns 4.9x 100 8844612079 ns 2014077396 ns 4.39x ---> With disableWAL is true <- 10003955204 ns 2429725 ns 1.62x 1 25618237 ns 16440113 ns 1.55x 10 289153346 ns183712685 ns 1.57x 100 2886298967 ns 1768688571 ns 1.63x ``` > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398676#comment-16398676 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174479415 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH); + + // write batch with d
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398677#comment-16398677 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174479455 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH); + + // write batch with d
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398675#comment-16398675 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174479344 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); --- End diff -- 👍 > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398673#comment-16398673 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174479322 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { --- End diff -- 👍 > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398713#comment-16398713 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5650 @bowenli86 Thanks a lot for you reviews, I've addressed all your comments, I'm waiting the 1.5 to be released, after that maybe @StefanRRichter could also have a look at this. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397394#comment-16397394 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174231064 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); --- End diff -- replace console output with logging, you can refer to `RocksDBListStatePerformanceTest.java` > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397393#comment-16397393 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174231142 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH); + + // write batch with d
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397392#comment-16397392 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174231173 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { + + // put with disableWAL=true VS put with disableWAL=false + System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.PUT); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.PUT); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.PUT); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.PUT); + + // put with disableWAL=true VS write batch with disableWAL=false + System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--"); + benchMarkHelper(1_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(10_000, true, WRITETYPE.PUT); + benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(100_000, true, WRITETYPE.PUT); + benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH); + + benchMarkHelper(1_000_000, true, WRITETYPE.PUT); + benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH); + + // write batch with d
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397395#comment-16397395 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r174230739 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to guard {@link RocksDBWriteBatchWrapper}. + */ +public class RocksDBWriteBatchWrapperTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void basicTest() throws Exception { + + List> data = new ArrayList<>(1); + for (int i = 0; i < 1; ++i) { + data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes())); + } + + try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath()); + WriteOptions options = new WriteOptions().setDisableWAL(true); + ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes())); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) { + + // insert data + for (Tuple2 item : data) { + writeBatchWrapper.put(handle, item.f0, item.f1); + } + writeBatchWrapper.flush(); + + // valid result + for (Tuple2 item : data) { + Assert.assertArrayEquals(item.f1, db.get(handle, item.f0)); + } + } + } + + @Test + @Ignore + public void benchMark() throws Exception { --- End diff -- Need to move the benchmark test to `org.apache.flink.contrib.streaming.state.benchmark` package. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390632#comment-16390632 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173049537 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); --- End diff -- About the capacity range, I didn't find a specific value recommend by RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ) ``` Q: What's the fastest way to load data into RocksDB? ... 2. batch hundreds of keys into one write batch ... ``` I found that they use the word `hundreds`. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390623#comment-16390623 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173048763 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { + + this.batch.put(handle, key, value); + + if (++currentSize == capacity) { + flush(); + } + } + + public void flush() throws RocksDBException { + this.db.write(options, batch); + batch.clear(); + currentSize = 0; + } + + @Override + public void close() throws RocksDBException { + if (batch != null) { --- End diff -- You are right, this `if` can be removed. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390622#comment-16390622 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173048697 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { --- End diff -- Hmm... currently, it is only used in single thread. For the best performance, I wouldn't like to add synchronization for it, I'd like to add annotation for this class that it's not thread safe. We could introduce a new class that is thread safe if we really need it. What do you think? > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390025#comment-16390025 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172935414 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { + + this.batch.put(handle, key, value); + + if (++currentSize == capacity) { + flush(); + } + } + + public void flush() throws RocksDBException { + this.db.write(options, batch); + batch.clear(); + currentSize = 0; + } + + @Override + public void close() throws RocksDBException { + if (batch != null) { --- End diff -- can batch be null? > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390026#comment-16390026 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172935214 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { --- End diff -- need synchronization on put() and flush() > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390024#comment-16390024 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172934683 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); --- End diff -- how is the capacity range determined - is it recommended by RocksDB? the msg should be: "capacity should be between " + MIN + " and " + MAX > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389297#comment-16389297 ] ASF GitHub Bot commented on FLINK-8845: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5650 [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve performance for recovery in RocksDB backend ## What is the purpose of the change This PR addresses [FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845), which attempts to use `WriteBatch` to improve the performance for loading data into RocksDB. It's inspired by [RocksDB FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ). ## Brief change log - *Introduce `RocksDBWriteBatchWrapper` to load data into RocksDB in bulk* ## Verifying this change - Introduce `RocksDBWriteBatchWrapperTest.java` to guard `RocksDBWriteBatchWrapper`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink rocksdb_write_batch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5650.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5650 commit e710287495d2a1a12a99b812c9691e12c6c57459 Author: sihuazhou Date: 2018-03-07T05:58:45Z Introduce RocksDBWriteBatchWrapper to speed up write performance. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)