This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 792f2a2d9c0b5c8bba881455dedbed09b02d363c Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Mon May 13 14:52:19 2019 +0200 [FLINK-12468][yarn] Derive HistoryServer's URL from HistoryServerOptions This commit derives the HistoryServer's URL from the availabe HistoryServerOptions. This closes #8396. --- .../generated/yarn_config_configuration.html | 5 - .../runtime/webmonitor/history/HistoryServer.java | 2 +- .../webmonitor/history/HistoryServerUtils.java | 82 +++++++++++++++ .../webmonitor/history/HistoryServerUtilsTest.java | 114 +++++++++++++++++++++ .../org/apache/flink/yarn/YarnResourceManager.java | 10 +- .../yarn/configuration/YarnConfigOptions.java | 9 -- 6 files changed, 205 insertions(+), 17 deletions(-) diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html index 4943ef8..40dfc09 100644 --- a/docs/_includes/generated/yarn_config_configuration.html +++ b/docs/_includes/generated/yarn_config_configuration.html @@ -48,11 +48,6 @@ <td>Time between heartbeats with the ResourceManager in seconds.</td> </tr> <tr> - <td><h5>yarn.history.server.address</h5></td> - <td style="word-wrap: break-word;">(none)</td> - <td>The address of Flink HistoryServer.</td> - </tr> - <tr> <td><h5>yarn.maximum-failed-containers</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>Maximum number of containers the system is going to reallocate in case of a failure.</td> diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 0bb98e8..12a5b56 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -144,7 +144,7 @@ public class HistoryServer { Preconditions.checkNotNull(numFinishedPolls); this.config = config; - if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) { + if (HistoryServerUtils.isSSLEnabled(config)) { LOG.info("Enabling SSL for the history server."); try { this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java new file mode 100644 index 0000000..e790ba6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.runtime.net.SSLUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +/** + * Utility class for the HistoryServer. + */ +public enum HistoryServerUtils { + ; + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerUtils.class); + + public static boolean isSSLEnabled(Configuration config) { + return config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config); + } + + public static Optional<URL> getHistoryServerURL(Configuration configuration) { + final String hostname = getHostname(configuration); + + if (hostname != null) { + final String protocol = getProtocol(configuration); + final int port = getPort(configuration); + + try { + return Optional.of(new URL(protocol, hostname, port, "")); + } catch (MalformedURLException e) { + LOG.debug("Could not create the HistoryServer's URL from protocol: {}, hostname: {} and port: {}.", protocol, hostname, port, e); + return Optional.empty(); + } + } else { + LOG.debug("Not hostname has been specified for the HistoryServer. This indicates that it has not been started."); + return Optional.empty(); + } + } + + private static int getPort(Configuration configuration) { + return configuration.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT); + } + + @Nullable + private static String getHostname(Configuration configuration) { + return configuration.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS); + } + + private static String getProtocol(Configuration configuration) { + if (isSSLEnabled(configuration)) { + return "https"; + } else { + return "http"; + } + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java new file mode 100644 index 0000000..dbee4e5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link HistoryServerUtils}. + */ +public class HistoryServerUtilsTest extends TestLogger { + + private static final String HOSTNAME = "foobar"; + private static final int PORT = 1234; + + @Test + public void testIsSSLEnabledDefault() { + final Configuration configuration = new Configuration(); + + assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false)); + } + + @Test + public void testIsSSLEnabledWithoutRestSSL() { + final Configuration configuration = new Configuration(); + configuration.setBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED, true); + + assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false)); + } + + @Test + public void testIsSSLEnabledOnlyRestSSL() { + final Configuration configuration = new Configuration(); + configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + + assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false)); + } + + @Test + public void testIsSSLEnabled() { + final Configuration configuration = new Configuration(); + enableSSL(configuration); + + assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(true)); + } + + private void enableSSL(Configuration configuration) { + configuration.setBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED, true); + configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + } + + @Test + public void testGetHistoryServerURL() throws MalformedURLException { + final Configuration configuration = createDefaultConfiguration(); + + final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(configuration); + + assertThat(historyServerURL.get(), is(new URL("http", HOSTNAME, PORT, ""))); + } + + @Test + public void testGetHistoryServerURLWithSSL() throws MalformedURLException { + final Configuration configuration = createDefaultConfiguration(); + enableSSL(configuration); + + final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(configuration); + + assertThat(historyServerURL.get(), is(new URL("https", HOSTNAME, PORT, ""))); + } + + @Test + public void testGetHistoryServerURLWithoutHS() { + final Configuration configuration = new Configuration(); + + assertThat(HistoryServerUtils.getHistoryServerURL(configuration).isPresent(), is(false)); + } + + @Nonnull + private Configuration createDefaultConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, HOSTNAME); + configuration.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, PORT); + return configuration; + } +} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index b9ea02b..65baab5 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -70,6 +72,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -292,9 +295,12 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus); log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus); - String historyServerAddress = flinkConfig.getString(YarnConfigOptions.APPLICATION_HISTORY_SERVER_ADDRESS); + final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(flinkConfig); + + final String appTrackingUrl = historyServerURL.map(URL::toString).orElse(""); + try { - resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, historyServerAddress); + resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, appTrackingUrl); } catch (Throwable t) { log.error("Could not unregister the application master.", t); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index ff6697fd..0f46a57 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -180,15 +180,6 @@ public class YarnConfigOptions { .defaultValue("") .withDescription("A comma-separated list of tags to apply to the Flink YARN application."); - /** - * The address of Flink HistoryServer. - * This configuration parameter allows setting the appTrackingUrl for finished YARN applications. - */ - public static final ConfigOption<String> APPLICATION_HISTORY_SERVER_ADDRESS = - key("yarn.history.server.address") - .defaultValue("") - .withDescription("The address of Flink HistoryServer."); - // ------------------------------------------------------------------------ /** This class is not meant to be instantiated. */