This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 5522ef0  PHOENIX-5810 PhoenixMRJobSubmitter is not working on a 
cluster with a single yarn RM
5522ef0 is described below

commit 5522ef075a5d7155ac2a563e4b79d455847c7964
Author: Richard Antal <antal97rich...@gmail.com>
AuthorDate: Tue Mar 31 17:26:21 2020 +0200

    PHOENIX-5810 PhoenixMRJobSubmitter is not working on a cluster with a 
single yarn RM
---
 .../index/automation/PhoenixMRJobSubmitter.java    |  5 +--
 .../org/apache/phoenix/util/PhoenixMRJobUtil.java  | 42 +++++++++++--------
 .../apache/phoenix/util/PhoenixMRJobUtilTest.java  | 48 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 21 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
index c0fdcbb..3d8962c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
@@ -306,13 +306,12 @@ public class PhoenixMRJobSubmitter {
     }
 
     public Set<String> getSubmittedYarnApps() throws Exception {
-        String rmHost = PhoenixMRJobUtil.getActiveResourceManagerHost(conf, 
zkQuorum);
+        String rmAddress = 
PhoenixMRJobUtil.getActiveResourceManagerAddress(conf, zkQuorum);
         Map<String, String> urlParams = new HashMap<String, String>();
         urlParams.put(YarnApplication.APP_STATES_ELEMENT, 
YarnApplication.state.NEW.toString()
                 + "," + YarnApplication.state.ACCEPTED + "," + 
YarnApplication.state.SUBMITTED
                 + "," + YarnApplication.state.RUNNING);
-        int rmPort = PhoenixMRJobUtil.getRMPort(conf);
-        String response = PhoenixMRJobUtil.getJobsInformationFromRM(rmHost, 
rmPort, urlParams);
+        String response = PhoenixMRJobUtil.getJobsInformationFromRM(rmAddress, 
urlParams);
         LOGGER.debug("Already Submitted/Running Apps = " + response);
         JSONObject jobsJson = new JSONObject(response);
         JSONObject appsJson = 
jobsJson.optJSONObject(YarnApplication.APPS_ELEMENT);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
index 4422103..e0ed22d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
@@ -79,9 +79,27 @@ public class PhoenixMRJobUtil {
         CAPACITY, FAIR, NONE
     };
 
-    public static String getActiveResourceManagerHost(Configuration config, 
String zkQuorum)
+    public static String getRMWebAddress(Configuration config){
+        return config.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
+                YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
+    }
+
+    public static String getRMWebAddress(Configuration config, String Rmid){
+        return config.get(YarnConfiguration.RM_WEBAPP_ADDRESS + "." + Rmid,
+                YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
+    }
+
+    public static String getActiveResourceManagerAddress(Configuration config, 
String zkQuorum)
             throws IOException, InterruptedException, JSONException, 
KeeperException,
             InvalidProtocolBufferException, ZooKeeperConnectionException {
+        // In case of yarn HA is NOT enabled
+        String resourceManager = PhoenixMRJobUtil.getRMWebAddress(config);
+
+        LOGGER.info("ResourceManagerAddress from config = " + resourceManager);
+        
if(!resourceManager.equals(YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS)){
+            return resourceManager;
+        }
+        // In case of yarn HA is enabled
         ZooKeeperWatcher zkw = null;
         ZooKeeper zk = null;
         String activeRMHost = null;
@@ -100,11 +118,10 @@ public class PhoenixMRJobUtil {
                                         + ACTIVE_STANDBY_ELECTOR_LOCK;
                         byte[] data = zk.getData(path, zkw, new Stat());
                         ActiveRMInfoProto proto = 
ActiveRMInfoProto.parseFrom(data);
-                        proto.getRmId();
-                        LOGGER.info("Active RmId : " + proto.getRmId());
+                        String RmId = proto.getRmId();
+                        LOGGER.info("Active RmId : " + RmId);
 
-                        activeRMHost =
-                                config.get(YarnConfiguration.RM_HOSTNAME + "." 
+ proto.getRmId());
+                        activeRMHost = 
PhoenixMRJobUtil.getRMWebAddress(config, RmId);
                         LOGGER.info("activeResourceManagerHostname = " + 
activeRMHost);
 
                     }
@@ -118,7 +135,7 @@ public class PhoenixMRJobUtil {
         return activeRMHost;
     }
 
-    public static String getJobsInformationFromRM(String rmhost, int rmport,
+    public static String getJobsInformationFromRM(String rmAddress,
             Map<String, String> urlParams) throws MalformedURLException, 
ProtocolException,
             UnsupportedEncodingException, IOException {
         HttpURLConnection con = null;
@@ -128,7 +145,7 @@ public class PhoenixMRJobUtil {
         try {
             StringBuilder urlBuilder = new StringBuilder();
 
-            urlBuilder.append(RM_HTTP_SCHEME + 
"://").append(rmhost).append(":").append(rmport)
+            urlBuilder.append(RM_HTTP_SCHEME + "://").append(rmAddress)
                     .append(RM_APPS_GET_ENDPOINT);
 
             if (urlParams != null && urlParams.size() != 0) {
@@ -195,17 +212,6 @@ public class PhoenixMRJobUtil {
         }
     }
 
-    public static int getRMPort(Configuration conf) throws IOException {
-        String rmHostPortStr = conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS);
-        String[] rmHostPort = rmHostPortStr.split(":");
-        if (rmHostPort == null || rmHostPort.length != 2) {
-            throw new IOException("Invalid value for property "
-                    + YarnConfiguration.RM_WEBAPP_ADDRESS + " = " + 
rmHostPortStr);
-        }
-        int rmPort = Integer.parseInt(rmHostPort[1]);
-        return rmPort;
-    }
-
     /**
      * This method set the configuration values for Capacity scheduler.
      * @param conf - Configuration to which Capacity Queue information to be 
added
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixMRJobUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixMRJobUtilTest.java
new file mode 100644
index 0000000..27d66a2
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixMRJobUtilTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PhoenixMRJobUtilTest {
+    @Test
+    public void testGetRMHostName(){
+        Configuration config = HBaseConfiguration.create();
+        String testRMAddress = "testRMhostName:portnumber";
+
+        String address = PhoenixMRJobUtil.getRMWebAddress(config);
+        assertEquals(YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, address);
+
+        config.set(YarnConfiguration.RM_WEBAPP_ADDRESS, testRMAddress);
+        address = PhoenixMRJobUtil.getRMWebAddress(config);
+        assertEquals(testRMAddress, address);
+
+        //HA mode
+        address = PhoenixMRJobUtil.getRMWebAddress(config, "rm11");
+        assertEquals(YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, address);
+
+        config.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm11", 
testRMAddress);
+        address = PhoenixMRJobUtil.getRMWebAddress(config, "rm11");
+        assertEquals(testRMAddress, address);
+    }
+}

Reply via email to