This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 5522ef0 PHOENIX-5810 PhoenixMRJobSubmitter is not working on a cluster with a single yarn RM 5522ef0 is described below commit 5522ef075a5d7155ac2a563e4b79d455847c7964 Author: Richard Antal <antal97rich...@gmail.com> AuthorDate: Tue Mar 31 17:26:21 2020 +0200 PHOENIX-5810 PhoenixMRJobSubmitter is not working on a cluster with a single yarn RM --- .../index/automation/PhoenixMRJobSubmitter.java | 5 +-- .../org/apache/phoenix/util/PhoenixMRJobUtil.java | 42 +++++++++++-------- .../apache/phoenix/util/PhoenixMRJobUtilTest.java | 48 ++++++++++++++++++++++ 3 files changed, 74 insertions(+), 21 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java index c0fdcbb..3d8962c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java @@ -306,13 +306,12 @@ public class PhoenixMRJobSubmitter { } public Set<String> getSubmittedYarnApps() throws Exception { - String rmHost = PhoenixMRJobUtil.getActiveResourceManagerHost(conf, zkQuorum); + String rmAddress = PhoenixMRJobUtil.getActiveResourceManagerAddress(conf, zkQuorum); Map<String, String> urlParams = new HashMap<String, String>(); urlParams.put(YarnApplication.APP_STATES_ELEMENT, YarnApplication.state.NEW.toString() + "," + YarnApplication.state.ACCEPTED + "," + YarnApplication.state.SUBMITTED + "," + YarnApplication.state.RUNNING); - int rmPort = PhoenixMRJobUtil.getRMPort(conf); - String response = PhoenixMRJobUtil.getJobsInformationFromRM(rmHost, rmPort, urlParams); + String response = PhoenixMRJobUtil.getJobsInformationFromRM(rmAddress, urlParams); LOGGER.debug("Already Submitted/Running Apps = " + response); JSONObject jobsJson = new JSONObject(response); JSONObject appsJson = jobsJson.optJSONObject(YarnApplication.APPS_ELEMENT); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java index 4422103..e0ed22d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java @@ -79,9 +79,27 @@ public class PhoenixMRJobUtil { CAPACITY, FAIR, NONE }; - public static String getActiveResourceManagerHost(Configuration config, String zkQuorum) + public static String getRMWebAddress(Configuration config){ + return config.get(YarnConfiguration.RM_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS); + } + + public static String getRMWebAddress(Configuration config, String Rmid){ + return config.get(YarnConfiguration.RM_WEBAPP_ADDRESS + "." + Rmid, + YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS); + } + + public static String getActiveResourceManagerAddress(Configuration config, String zkQuorum) throws IOException, InterruptedException, JSONException, KeeperException, InvalidProtocolBufferException, ZooKeeperConnectionException { + // In case of yarn HA is NOT enabled + String resourceManager = PhoenixMRJobUtil.getRMWebAddress(config); + + LOGGER.info("ResourceManagerAddress from config = " + resourceManager); + if(!resourceManager.equals(YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS)){ + return resourceManager; + } + // In case of yarn HA is enabled ZooKeeperWatcher zkw = null; ZooKeeper zk = null; String activeRMHost = null; @@ -100,11 +118,10 @@ public class PhoenixMRJobUtil { + ACTIVE_STANDBY_ELECTOR_LOCK; byte[] data = zk.getData(path, zkw, new Stat()); ActiveRMInfoProto proto = ActiveRMInfoProto.parseFrom(data); - proto.getRmId(); - LOGGER.info("Active RmId : " + proto.getRmId()); + String RmId = proto.getRmId(); + LOGGER.info("Active RmId : " + RmId); - activeRMHost = - config.get(YarnConfiguration.RM_HOSTNAME + "." + proto.getRmId()); + activeRMHost = PhoenixMRJobUtil.getRMWebAddress(config, RmId); LOGGER.info("activeResourceManagerHostname = " + activeRMHost); } @@ -118,7 +135,7 @@ public class PhoenixMRJobUtil { return activeRMHost; } - public static String getJobsInformationFromRM(String rmhost, int rmport, + public static String getJobsInformationFromRM(String rmAddress, Map<String, String> urlParams) throws MalformedURLException, ProtocolException, UnsupportedEncodingException, IOException { HttpURLConnection con = null; @@ -128,7 +145,7 @@ public class PhoenixMRJobUtil { try { StringBuilder urlBuilder = new StringBuilder(); - urlBuilder.append(RM_HTTP_SCHEME + "://").append(rmhost).append(":").append(rmport) + urlBuilder.append(RM_HTTP_SCHEME + "://").append(rmAddress) .append(RM_APPS_GET_ENDPOINT); if (urlParams != null && urlParams.size() != 0) { @@ -195,17 +212,6 @@ public class PhoenixMRJobUtil { } } - public static int getRMPort(Configuration conf) throws IOException { - String rmHostPortStr = conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS); - String[] rmHostPort = rmHostPortStr.split(":"); - if (rmHostPort == null || rmHostPort.length != 2) { - throw new IOException("Invalid value for property " - + YarnConfiguration.RM_WEBAPP_ADDRESS + " = " + rmHostPortStr); - } - int rmPort = Integer.parseInt(rmHostPort[1]); - return rmPort; - } - /** * This method set the configuration values for Capacity scheduler. * @param conf - Configuration to which Capacity Queue information to be added diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixMRJobUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixMRJobUtilTest.java new file mode 100644 index 0000000..27d66a2 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixMRJobUtilTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PhoenixMRJobUtilTest { + @Test + public void testGetRMHostName(){ + Configuration config = HBaseConfiguration.create(); + String testRMAddress = "testRMhostName:portnumber"; + + String address = PhoenixMRJobUtil.getRMWebAddress(config); + assertEquals(YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, address); + + config.set(YarnConfiguration.RM_WEBAPP_ADDRESS, testRMAddress); + address = PhoenixMRJobUtil.getRMWebAddress(config); + assertEquals(testRMAddress, address); + + //HA mode + address = PhoenixMRJobUtil.getRMWebAddress(config, "rm11"); + assertEquals(YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, address); + + config.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm11", testRMAddress); + address = PhoenixMRJobUtil.getRMWebAddress(config, "rm11"); + assertEquals(testRMAddress, address); + } +}