This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 792f2a2d9c0b5c8bba881455dedbed09b02d363c
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Mon May 13 14:52:19 2019 +0200

    [FLINK-12468][yarn] Derive HistoryServer's URL from HistoryServerOptions
    
    This commit derives the HistoryServer's URL from the availabe 
HistoryServerOptions.
    
    This closes #8396.
---
 .../generated/yarn_config_configuration.html       |   5 -
 .../runtime/webmonitor/history/HistoryServer.java  |   2 +-
 .../webmonitor/history/HistoryServerUtils.java     |  82 +++++++++++++++
 .../webmonitor/history/HistoryServerUtilsTest.java | 114 +++++++++++++++++++++
 .../org/apache/flink/yarn/YarnResourceManager.java |  10 +-
 .../yarn/configuration/YarnConfigOptions.java      |   9 --
 6 files changed, 205 insertions(+), 17 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html 
b/docs/_includes/generated/yarn_config_configuration.html
index 4943ef8..40dfc09 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -48,11 +48,6 @@
             <td>Time between heartbeats with the ResourceManager in 
seconds.</td>
         </tr>
         <tr>
-            <td><h5>yarn.history.server.address</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>The address of Flink HistoryServer.</td>
-        </tr>
-        <tr>
             <td><h5>yarn.maximum-failed-containers</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Maximum number of containers the system is going to reallocate 
in case of a failure.</td>
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 0bb98e8..12a5b56 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -144,7 +144,7 @@ public class HistoryServer {
                Preconditions.checkNotNull(numFinishedPolls);
 
                this.config = config;
-               if 
(config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && 
SSLUtils.isRestSSLEnabled(config)) {
+               if (HistoryServerUtils.isSSLEnabled(config)) {
                        LOG.info("Enabling SSL for the history server.");
                        try {
                                this.serverSSLFactory = 
SSLUtils.createRestServerSSLEngineFactory(config);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java
new file mode 100644
index 0000000..e790ba6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+/**
+ * Utility class for the HistoryServer.
+ */
+public enum HistoryServerUtils {
+       ;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerUtils.class);
+
+       public static boolean isSSLEnabled(Configuration config) {
+               return 
config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && 
SSLUtils.isRestSSLEnabled(config);
+       }
+
+       public static Optional<URL> getHistoryServerURL(Configuration 
configuration) {
+               final String hostname = getHostname(configuration);
+
+               if (hostname != null) {
+                       final String protocol = getProtocol(configuration);
+                       final int port = getPort(configuration);
+
+                       try {
+                               return Optional.of(new URL(protocol, hostname, 
port, ""));
+                       } catch (MalformedURLException e) {
+                               LOG.debug("Could not create the HistoryServer's 
URL from protocol: {}, hostname: {} and port: {}.", protocol, hostname, port, 
e);
+                               return Optional.empty();
+                       }
+               } else {
+                       LOG.debug("Not hostname has been specified for the 
HistoryServer. This indicates that it has not been started.");
+                       return Optional.empty();
+               }
+       }
+
+       private static int getPort(Configuration configuration) {
+               return 
configuration.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
+       }
+
+       @Nullable
+       private static String getHostname(Configuration configuration) {
+               return 
configuration.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
+       }
+
+       private static String getProtocol(Configuration configuration) {
+               if (isSSLEnabled(configuration)) {
+                       return "https";
+               } else {
+                       return "http";
+               }
+       }
+
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java
new file mode 100644
index 0000000..dbee4e5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link HistoryServerUtils}.
+ */
+public class HistoryServerUtilsTest extends TestLogger {
+
+       private static final String HOSTNAME = "foobar";
+       private static final int PORT = 1234;
+
+       @Test
+       public void testIsSSLEnabledDefault() {
+               final Configuration configuration = new Configuration();
+
+               assertThat(HistoryServerUtils.isSSLEnabled(configuration), 
is(false));
+       }
+
+       @Test
+       public void testIsSSLEnabledWithoutRestSSL() {
+               final Configuration configuration = new Configuration();
+               
configuration.setBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED, 
true);
+
+               assertThat(HistoryServerUtils.isSSLEnabled(configuration), 
is(false));
+       }
+
+       @Test
+       public void testIsSSLEnabledOnlyRestSSL() {
+               final Configuration configuration = new Configuration();
+               configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
true);
+
+               assertThat(HistoryServerUtils.isSSLEnabled(configuration), 
is(false));
+       }
+
+       @Test
+       public void testIsSSLEnabled() {
+               final Configuration configuration = new Configuration();
+               enableSSL(configuration);
+
+               assertThat(HistoryServerUtils.isSSLEnabled(configuration), 
is(true));
+       }
+
+       private void enableSSL(Configuration configuration) {
+               
configuration.setBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED, 
true);
+               configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
true);
+       }
+
+       @Test
+       public void testGetHistoryServerURL() throws MalformedURLException {
+               final Configuration configuration = 
createDefaultConfiguration();
+
+               final Optional<URL> historyServerURL = 
HistoryServerUtils.getHistoryServerURL(configuration);
+
+               assertThat(historyServerURL.get(), is(new URL("http", HOSTNAME, 
PORT, "")));
+       }
+
+       @Test
+       public void testGetHistoryServerURLWithSSL() throws 
MalformedURLException {
+               final Configuration configuration = 
createDefaultConfiguration();
+               enableSSL(configuration);
+
+               final Optional<URL> historyServerURL = 
HistoryServerUtils.getHistoryServerURL(configuration);
+
+               assertThat(historyServerURL.get(), is(new URL("https", 
HOSTNAME, PORT, "")));
+       }
+
+       @Test
+       public void testGetHistoryServerURLWithoutHS() {
+               final Configuration configuration = new Configuration();
+
+               
assertThat(HistoryServerUtils.getHistoryServerURL(configuration).isPresent(), 
is(false));
+       }
+
+       @Nonnull
+       private Configuration createDefaultConfiguration() {
+               final Configuration configuration = new Configuration();
+               
configuration.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, 
HOSTNAME);
+               
configuration.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, PORT);
+               return configuration;
+       }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index b9ea02b..65baab5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -70,6 +72,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -292,9 +295,12 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
                log.info("Unregister application from the YARN Resource Manager 
with final status {}.", yarnStatus);
 
-               String historyServerAddress = 
flinkConfig.getString(YarnConfigOptions.APPLICATION_HISTORY_SERVER_ADDRESS);
+               final Optional<URL> historyServerURL = 
HistoryServerUtils.getHistoryServerURL(flinkConfig);
+
+               final String appTrackingUrl = 
historyServerURL.map(URL::toString).orElse("");
+
                try {
-                       
resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, 
historyServerAddress);
+                       
resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, 
appTrackingUrl);
                } catch (Throwable t) {
                        log.error("Could not unregister the application 
master.", t);
                }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index ff6697fd..0f46a57 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -180,15 +180,6 @@ public class YarnConfigOptions {
                .defaultValue("")
                .withDescription("A comma-separated list of tags to apply to 
the Flink YARN application.");
 
-       /**
-        * The address of Flink HistoryServer.
-        * This configuration parameter allows setting the appTrackingUrl for 
finished YARN applications.
-        */
-       public static final ConfigOption<String> 
APPLICATION_HISTORY_SERVER_ADDRESS =
-               key("yarn.history.server.address")
-               .defaultValue("")
-               .withDescription("The address of Flink HistoryServer.");
-
        // 
------------------------------------------------------------------------
 
        /** This class is not meant to be instantiated. */

Reply via email to