[FLINK-3389] [rocksdb] Add pre-defined option profiles.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be72758d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be72758d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be72758d

Branch: refs/heads/master
Commit: be72758d1104400c8a48554d717c5b8cea5b3617
Parents: 82c7383
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 11 15:30:56 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 11 21:34:03 2016 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/OptionsFactory.java | 32 ++++++-
 .../streaming/state/PredefinedOptions.java      | 91 ++++++++++++++++++++
 .../streaming/state/RocksDBStateBackend.java    | 76 ++++++++++++++--
 3 files changed, 190 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 73b1e5d..3e52f1f 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -24,8 +24,36 @@ import org.rocksdb.Options;
  * A factory for {@link Options} to be passed to the {@link 
RocksDBStateBackend}.
  * Options have to be created lazily by this factory, because the {@code 
Options}
  * class is not serializable and holds pointers to native code.
+ * 
+ * <p>A typical pattern to use this OptionsFactory is as follows:
+ * 
+ * <h3>Java 8:</h3>
+ * <pre>{@code
+ * rocksDbBackend.setOptions( (currentOptions) -> 
currentOptions.setMaxOpenFiles(1024) );
+ * }</pre>
+ * 
+ * <h3>Java 7:</h3>
+ * <pre>{@code
+ * rocksDbBackend.setOptions(new OptionsFactory() {
+ *     
+ *     public Options setOptions(Options currentOptions) {
+ *         return currentOptions.setMaxOpenFiles(1024);
+ *     }
+ * })
+ * }</pre>
  */
 public interface OptionsFactory extends java.io.Serializable {
-       
-       Options createOptions();
+
+       /**
+        * This method should set the additional options on top of the current 
options object.
+        * The current options object may contain pre-defined options based on 
flags that have
+        * been configured on the state backend.
+        * 
+        * <p>It is important to set the options on the current object and 
return the result from
+        * the setter methods, otherwise the pre-defined options may get lost.
+        * 
+        * @param currentOptions The options object with the pre-defined 
options. 
+        * @return The options object on which the additional options are set.
+        */
+       Options createOptions(Options currentOptions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
new file mode 100644
index 0000000..383f043
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+
+/**
+ * The {@code PredefinedOptions} are configuration settings for the {@link 
RocksDBStateBackend}. 
+ * The various pre-defined choices are configurations that have been 
empirically
+ * determined to be beneficial for performance under different settings.
+ * 
+ * <p>Some of these settings are based on experiments by the Flink community, 
some follow
+ * guides from the RocksDB project.
+ */
+public enum PredefinedOptions {
+
+       /**
+        * Default options for all settings.
+        */
+       DEFAULT {
+               
+               @Override
+               public Options createOptions() {
+                       return new Options();
+               }
+       },
+
+       /**
+        * Pre-defined options for regular spinning hard disks.
+        * 
+        * <p>This constant configures RocksDB with some options that lead 
empirically
+        * to better performance when the machines executing the system use
+        * regular spinning hard disks. The following options are set:
+        * <ul>
+        *     <li>Optimized level-style compactions</li>
+        * </ul>
+        */
+       SPINNING_DISK_OPTIMIZED {
+
+               @Override
+               public Options createOptions() {
+                       return new Options()
+                                       
.setCompactionStyle(CompactionStyle.LEVEL)
+                                       .optimizeLevelStyleCompaction();
+               }
+       },
+
+       /**
+        * Pre-defined options for Flash SSDs.
+        *
+        * <p>This constant configures RocksDB with some options that lead 
empirically
+        * to better performance when the machines executing the system use 
SSDs.
+        * The following options are set:
+        * <ul>
+        *     <li>none</li>
+        * </ul>
+        */
+       FLASH_SSD_OPTIMIZED {
+
+               @Override
+               public Options createOptions() {
+                       return new Options();
+               }
+       };
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates the options for this pre-defined setting.
+        * 
+        * @return The pre-defined options object. 
+        */
+       public abstract Options createOptions();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 8c0171a..eddd8c0 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.api.common.state.StateBackend;
+
 import org.rocksdb.Options;
 import org.rocksdb.StringAppendOperator;
 
@@ -45,6 +46,10 @@ import static java.util.Objects.requireNonNull;
  * For persistence against loss of machines, checkpoints take a snapshot of the
  * RocksDB database, and persist that snapshot in a file system (by default) or
  * another configurable state backend.
+ * 
+ * <p>The behavior of the RocksDB instances can be parametrized by setting 
RocksDB Options
+ * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
+ * {@link #setOptions(OptionsFactory)}.
  */
 public class RocksDBStateBackend extends AbstractStateBackend {
        private static final long serialVersionUID = 1L;
@@ -62,6 +67,9 @@ public class RocksDBStateBackend extends AbstractStateBackend 
{
        private JobID jobId;
 
        private AbstractStateBackend backingStateBackend;
+
+       /** The pre-configured option settings */
+       private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
        
        /** The options factory to create the RocksDB options in the cluster */
        private OptionsFactory optionsFactory;
@@ -69,6 +77,7 @@ public class RocksDBStateBackend extends AbstractStateBackend 
{
        /** The options from the options factory, cached */
        private transient Options rocksDbOptions;
        
+       // 
------------------------------------------------------------------------
 
        public RocksDBStateBackend(String dbBasePath, String 
checkpointDirectory, AbstractStateBackend backingStateBackend) {
                this.dbBasePath = requireNonNull(dbBasePath);
@@ -76,10 +85,14 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                this.backingStateBackend = requireNonNull(backingStateBackend);
        }
 
+       // 
------------------------------------------------------------------------
+       
        @Override
-       public void initializeForJob(Environment env,
-               String operatorIdentifier,
-               TypeSerializer<?> keySerializer) throws Exception {
+       public void initializeForJob(
+                       Environment env, 
+                       String operatorIdentifier,
+                       TypeSerializer<?> keySerializer) throws Exception
+       {
                super.initializeForJob(env, operatorIdentifier, keySerializer);
                this.operatorIdentifier = operatorIdentifier.replace(" ", "");
                backingStateBackend.initializeForJob(env, operatorIdentifier, 
keySerializer);
@@ -106,6 +119,10 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                return checkpointDirectory + "/" + jobId.toString() + "/" + 
operatorIdentifier + "/" + stateName;
        }
 
+       // 
------------------------------------------------------------------------
+       //  State factories
+       // 
------------------------------------------------------------------------
+       
        @Override
        protected <N, T> ValueState<T> createValueState(TypeSerializer<N> 
namespaceSerializer,
                ValueStateDescriptor<T> stateDesc) throws Exception {
@@ -154,9 +171,42 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        // 
------------------------------------------------------------------------
 
        /**
-        * Defines the {@link org.rocksdb.Options} for the RocksDB instances.
+        * Sets the predefined options for RocksDB.
+        * 
+        * <p>If a user-defined options factory is set (via {@link 
#setOptions(OptionsFactory)}),
+        * then the options from the factory are applied on top of the here 
specified
+        * predefined options.
+        * 
+        * @param options The options to set (must not be null).
+        */
+       public void setPredefinedOptions(PredefinedOptions options) {
+               predefinedOptions = requireNonNull(options);
+       }
+
+       /**
+        * Gets the currently set predefined options for RocksDB.
+        * The default options (if nothing was set via {@link 
#setPredefinedOptions(PredefinedOptions)})
+        * are {@link PredefinedOptions#DEFAULT}.
+        * 
+        * <p>If a user-defined  options factory is set (via {@link 
#setOptions(OptionsFactory)}),
+        * then the options from the factory are applied on top of the 
predefined options.
+        * 
+        * @return The currently set predefined options for RocksDB.
+        */
+       public PredefinedOptions getPredefinedOptions() {
+               return predefinedOptions;
+       }
+
+       /**
+        * Sets {@link org.rocksdb.Options} for the RocksDB instances.
         * Because the options are not serializable and hold native code 
references,
-        * they must be specified through a factory. 
+        * they must be specified through a factory.
+        * 
+        * <p>The options created by the factory here are applied on top of the 
pre-defined 
+        * options profile selected via {@link 
#setPredefinedOptions(PredefinedOptions)}.
+        * If the pre-defined options profile is the default
+        * ({@link PredefinedOptions#DEFAULT}), then the factory fully controls 
the RocksDB
+        * options.
         * 
         * @param optionsFactory The options factory that lazily creates the 
RocksDB options.
         */
@@ -172,12 +222,24 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        public OptionsFactory getOptions() {
                return optionsFactory;
        }
-       
+
+       /**
+        * Gets the RocksDB Options to be used for all RocksDB instances.
+        */
        Options getRocksDBOptions() {
                if (rocksDbOptions == null) {
-                       Options opt = optionsFactory == null ? new Options() : 
optionsFactory.createOptions();
+                       // initial options from pre-defined profile
+                       Options opt = predefinedOptions.createOptions();
+
+                       // add user-defined options, if specified
+                       if (optionsFactory != null) {
+                               opt = optionsFactory.createOptions(opt);
+                       }
+                       
+                       // add necessary default options
                        opt = opt.setCreateIfMissing(true);
                        opt = opt.setMergeOperator(new StringAppendOperator());
+                       
                        rocksDbOptions = opt;
                }
                return rocksDbOptions;

Reply via email to