[FLINK-6518] Port blobserver config parameters to ConfigOptions

This closes #3865.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d719118
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d719118
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d719118

Branch: refs/heads/master
Commit: 4d719118cee3a256722e2bacf276568d601f2b65
Parents: 707f25f
Author: zentol <ches...@apache.org>
Authored: Wed May 10 10:26:19 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Sat May 13 17:53:20 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/BlobServerOptions.java  | 76 ++++++++++++++++++++
 .../flink/configuration/ConfigConstants.java    | 40 ++++++-----
 .../apache/flink/runtime/blob/BlobCache.java    |  9 ++-
 .../apache/flink/runtime/blob/BlobClient.java   |  5 +-
 .../apache/flink/runtime/blob/BlobServer.java   | 22 +++---
 .../flink/runtime/blob/BlobClientSslTest.java   |  5 +-
 .../flink/runtime/blob/BlobServerRangeTest.java |  8 +--
 .../jobmanager/JobManagerStartupTest.java       |  4 +-
 8 files changed, 125 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
new file mode 100644
index 0000000..e27c29f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for the BlobServer.
+ */
+@PublicEvolving
+public class BlobServerOptions {
+
+       /**
+        * The config parameter defining the storage directory to be used by 
the blob server.
+        */
+       public static final ConfigOption<String> STORAGE_DIRECTORY =
+               key("blob.storage.directory")
+                       .noDefaultValue();
+
+       /**
+        * The config parameter defining number of retires for failed BLOB 
fetches.
+        */
+       public static final ConfigOption<Integer> FETCH_RETRIES =
+               key("blob.fetch.retries")
+                       .defaultValue(5);
+
+       /**
+        * The config parameter defining the maximum number of concurrent BLOB 
fetches that the JobManager serves.
+        */
+       public static final ConfigOption<Integer> FETCH_CONCURRENT =
+               key("blob.fetch.num-concurrent")
+                       .defaultValue(50);
+
+       /**
+        * The config parameter defining the backlog of BLOB fetches on the 
JobManager.
+        */
+       public static final ConfigOption<Integer> FETCH_BACKLOG =
+               key("blob.fetch.backlog")
+                       .defaultValue(1000);
+
+       /**
+        * The config parameter defining the server port of the blob service.
+        * The port can either be a port, such as "9123",
+        * a range of ports: "50100-50200"
+        * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+        *
+        * Setting the port to 0 will let the OS choose an available port.
+        */
+       public static final ConfigOption<String> PORT =
+               key("blob.server.port")
+                       .defaultValue("0");
+
+       /**
+        * Flag to override ssl support for the blob service transport.
+        */
+       public static final ConfigOption<Boolean> SSL_ENABLED =
+               key("blob.service.ssl.enabled")
+                       .defaultValue(true);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c3704be..b5b5486 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -139,36 +139,39 @@ public final class ConfigConstants {
        public static final String RESOURCE_MANAGER_IPC_PORT_KEY = 
"resourcemanager.rpc.port";
 
        /**
-        * The config parameter defining the storage directory to be used by 
the blob server.
+        * @deprecated use {@link BlobServerOptions#STORAGE_DIRECTORY} instead
         */
+       @Deprecated
        public static final String BLOB_STORAGE_DIRECTORY_KEY = 
"blob.storage.directory";
 
        /**
-        * The config parameter defining number of retires for failed BLOB 
fetches.
+        * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead
         */
+       @Deprecated
        public static final String BLOB_FETCH_RETRIES_KEY = 
"blob.fetch.retries";
 
        /**
-        * The config parameter defining the maximum number of concurrent BLOB 
fetches that the JobManager serves.
+        * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead
         */
+       @Deprecated
        public static final String BLOB_FETCH_CONCURRENT_KEY = 
"blob.fetch.num-concurrent";
 
        /**
-        * The config parameter defining the backlog of BLOB fetches on the 
JobManager
+        * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead
         */
+       @Deprecated
        public static final String BLOB_FETCH_BACKLOG_KEY = 
"blob.fetch.backlog";
 
        /**
-        * The config parameter defining the server port of the blob service.
-        * The port can either be a port, such as "9123",
-        * a range of ports: "50100-50200"
-        * or a list of ranges and or points: "50100-50200,50300-50400,51234"
-        *
-        * Setting the port to 0 will let the OS choose an available port.
+        * @deprecated use {@link BlobServerOptions#PORT} instead
         */
+       @Deprecated
        public static final String BLOB_SERVER_PORT = "blob.server.port";
 
-       /** Flag to override ssl support for the blob service transport */
+       /**
+        * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead
+        */
+       @Deprecated
        public static final String BLOB_SERVICE_SSL_ENABLED = 
"blob.service.ssl.enabled";
 
        /**
@@ -1094,28 +1097,33 @@ public final class ConfigConstants {
        public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
 
        /**
-        * The default value to override ssl support for blob service transport
+        * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead
         */
+       @Deprecated
        public static final boolean DEFAULT_BLOB_SERVICE_SSL_ENABLED = true;
 
        /**
-        * Default number of retries for failed BLOB fetches.
+        * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead
         */
+       @Deprecated
        public static final int DEFAULT_BLOB_FETCH_RETRIES = 5;
 
        /**
-        * Default number of concurrent BLOB fetch operations.
+        * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead
         */
+       @Deprecated
        public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50;
 
        /**
-        * Default BLOB fetch connection backlog.
+        * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead
         */
+       @Deprecated
        public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
 
        /**
-        * Default BLOB server port. 0 means ephemeral port.
+        * @deprecated use {@link BlobServerOptions#PORT} instead
         */
+       @Deprecated
        public static final String DEFAULT_BLOB_SERVER_PORT = "0";
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 2587b15..23c7e63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.FileUtils;
@@ -129,19 +129,18 @@ public final class BlobCache implements BlobService {
                this.blobStore = blobStore;
 
                // configure and create the storage directory
-               String storageDirectory = 
blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+               String storageDirectory = 
blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB cache storage directory " + storageDir);
 
                // configure the number of fetch retries
-               final int fetchRetries = blobClientConfig.getInteger(
-                       ConfigConstants.BLOB_FETCH_RETRIES_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES);
+               final int fetchRetries = 
blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
                if (fetchRetries >= 0) {
                        this.numFetchRetries = fetchRetries;
                }
                else {
                        LOG.warn("Invalid value for {}. System will attempt no 
retires on failed fetches of BLOBs.",
-                               ConfigConstants.BLOB_FETCH_RETRIES_KEY);
+                               BlobServerOptions.FETCH_RETRIES.key());
                        this.numFetchRetries = 0;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index ea90f54..49e54a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -92,8 +92,7 @@ public final class BlobClient implements Closeable {
                        // Check if ssl is enabled
                        SSLContext clientSSLContext = null;
                        if (clientConfig != null &&
-                               
clientConfig.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
-                                               
ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
+                               
clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 
                                clientSSLContext = 
SSLUtils.createSSLClientContext(clientConfig);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 8a70559..0e15777 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -111,34 +111,32 @@ public class BlobServer extends Thread implements 
BlobService {
                this.blobStore = checkNotNull(blobStore);
 
                // configure and create the storage directory
-               String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+               String storageDirectory = 
config.getString(BlobServerOptions.STORAGE_DIRECTORY);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
                // configure the maximum number of concurrent connections
-               final int maxConnections = config.getInteger(
-                               ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
+               final int maxConnections = 
config.getInteger(BlobServerOptions.FETCH_CONCURRENT);
                if (maxConnections >= 1) {
                        this.maxConnections = maxConnections;
                }
                else {
                        LOG.warn("Invalid value for maximum connections in BLOB 
server: {}. Using default value of {}",
-                                       maxConnections, 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
-                       this.maxConnections = 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT;
+                                       maxConnections, 
BlobServerOptions.FETCH_CONCURRENT.defaultValue());
+                       this.maxConnections = 
BlobServerOptions.FETCH_CONCURRENT.defaultValue();
                }
 
                // configure the backlog of connections
-               int backlog = 
config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
+               int backlog = 
config.getInteger(BlobServerOptions.FETCH_BACKLOG);
                if (backlog < 1) {
                        LOG.warn("Invalid value for BLOB connection backlog: 
{}. Using default value of {}",
-                                       backlog, 
ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
-                       backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
+                                       backlog, 
BlobServerOptions.FETCH_BACKLOG.defaultValue());
+                       backlog = 
BlobServerOptions.FETCH_BACKLOG.defaultValue();
                }
 
                this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 
-               if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
-                               
ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
+               if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
                        try {
                                serverSSLContext = 
SSLUtils.createSSLServerContext(config);
                        } catch (Exception e) {
@@ -148,7 +146,7 @@ public class BlobServer extends Thread implements 
BlobService {
 
                //  ----------------------- start the server -------------------
 
-               String serverPortRange = 
config.getString(ConfigConstants.BLOB_SERVER_PORT, 
ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
+               String serverPortRange = 
config.getString(BlobServerOptions.PORT);
 
                Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(serverPortRange);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 5054107..27603d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -31,6 +31,7 @@ import java.security.MessageDigest;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
@@ -91,7 +92,7 @@ public class BlobClientSslTest {
                try {
                        Configuration config = new Configuration();
                        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-                       
config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+                       config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
                        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
                        
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
                        
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
@@ -104,7 +105,7 @@ public class BlobClientSslTest {
 
                clientConfig = new Configuration();
                clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-               
clientConfig.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+               clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false);
                clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
                
clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index ea0eb94..c3762aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -38,7 +38,7 @@ public class BlobServerRangeTest extends TestLogger {
        @Test
        public void testOnEphemeralPort() throws IOException {
                Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
+               conf.setString(BlobServerOptions.PORT, "0");
                BlobServer srv = new BlobServer(conf);
                srv.shutdown();
        }
@@ -59,7 +59,7 @@ public class BlobServerRangeTest extends TestLogger {
                }
 
                Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.BLOB_SERVER_PORT, 
String.valueOf(socket.getLocalPort()));
+               conf.setString(BlobServerOptions.PORT, 
String.valueOf(socket.getLocalPort()));
 
                // this thing is going to throw an exception
                try {
@@ -88,7 +88,7 @@ public class BlobServerRangeTest extends TestLogger {
                }
                int availablePort = NetUtils.getAvailablePort();
                Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.BLOB_SERVER_PORT, 
sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + 
availablePort);
+               conf.setString(BlobServerOptions.PORT, 
sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + 
availablePort);
 
                // this thing is going to throw an exception
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 9ac6873..a906d9b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -30,7 +30,7 @@ import java.util.List;
 
 import com.google.common.io.Files;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.StartupUtils;
 import org.apache.flink.util.NetUtils;
@@ -130,7 +130,7 @@ public class JobManagerStartupTest extends TestLogger {
                }
                Configuration failConfig = new Configuration();
                String nonExistDirectory = new File(blobStorageDirectory, 
DOES_NOT_EXISTS_NO_SIR).getAbsolutePath();
-               
failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, 
nonExistDirectory);
+               failConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, 
nonExistDirectory);
 
                try {
                        JobManager.runJobManager(failConfig, 
JobManagerMode.CLUSTER, "localhost", portNum);

Reply via email to