Repository: hive
Updated Branches:
  refs/heads/master 84e5b9391 -> a3b7a2452


HIVE-19814: RPC Server port is always random for spark (Bharathkrishna 
Guruvayoor Murali, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3b7a245
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3b7a245
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3b7a245

Branch: refs/heads/master
Commit: a3b7a2452bacf6d7eeeb42bd9dd68109c90e27a2
Parents: 84e5b93
Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com>
Authored: Wed Sep 12 17:50:29 2018 -0700
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Wed Sep 12 17:50:54 2018 -0700

----------------------------------------------------------------------
 .../ql/exec/spark/HiveSparkClientFactory.java   |  8 ++++-
 .../session/TestSparkSessionManagerImpl.java    | 35 ++++++++++++++++++++
 .../hive/spark/client/SparkClientFactory.java   |  6 ++++
 .../hive/spark/client/rpc/RpcConfiguration.java | 17 +++++-----
 4 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a3b7a245/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 0aae0d8..a49e72d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.spark.client.SparkClientUtilities;
+import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -198,7 +199,12 @@ public class HiveSparkClientFactory {
         LOG.debug(String.format(
           "Pass Oozie configuration (%s -> %s).", propertyName, 
LogUtils.maskIfPassword(propertyName,value)));
       }
-
+      if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
+        String value = RpcConfiguration.getValue(hiveConf, propertyName);
+        sparkConf.put(propertyName, value);
+        LOG.debug(String.format("load RPC property from hive configuration (%s 
-> %s).", propertyName,
+            LogUtils.maskIfPassword(propertyName, value)));
+      }
     }
 
     final boolean optShuffleSerDe = hiveConf.getBoolVar(

http://git-wip-us.apache.org/repos/asf/hive/blob/a3b7a245/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
index 853e4f4..3882b58 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.StringUtils;
 
+import org.apache.hive.spark.client.SparkClientFactory;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -31,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeoutException;
 
@@ -214,6 +217,38 @@ public class TestSparkSessionManagerImpl {
     assertEquals("0", ss.getSparkSession().getSessionId());
   }
 
+  @Test
+  public void testConfigsForInitialization() {
+    //Test to make sure that configs listed in 
RpcConfiguration.HIVE_SPARK_RSC_CONFIGS which are passed
+    // through HiveConf are included in the Spark configuration.
+    HiveConf hiveConf = getHiveConf();
+    hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, 
"49152-49222,49223,49224-49333");
+    hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS, 
"test-rpc-server-address");
+    Map<String, String> sparkConf = 
HiveSparkClientFactory.initiateSparkConf(hiveConf, null);
+    assertEquals("49152-49222,49223,49224-49333", 
sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname));
+    assertEquals("test-rpc-server-address", 
sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname));
+  }
+
+  @Test
+  public void testServerPortAssignment() throws Exception {
+    HiveConf conf = getHiveConf();
+    conf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, 
"49152-49222,49223,49224-49333");
+    SparkSessionManagerImpl testSessionManager = 
SparkSessionManagerImpl.getInstance();
+    testSessionManager.setup(conf);
+
+    assertTrue("Port should be within configured port range:" + 
SparkClientFactory.getServerPort(),
+        SparkClientFactory.getServerPort() >= 49152 && 
SparkClientFactory.getServerPort() <= 49333);
+
+    //Verify that new spark session can be created to ensure that new 
SparkSession
+    // is successfully able to connect to the RpcServer with custom port.
+    try {
+      testSessionManager.getSession(null, conf, true);
+    } catch (HiveException e) {
+      Assert.fail("Failed test to connect to the RpcServer with custom port");
+    }
+
+    testSessionManager.shutdown();
+  }
   private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg 
expectedErrMsg) {
     checkHiveException(ss, e, expectedErrMsg, null);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3b7a245/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index 54ecdf0..640d058 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -20,6 +20,7 @@ package org.apache.hive.spark.client;
 import java.io.IOException;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.client.rpc.RpcServer;
@@ -94,4 +95,9 @@ public final class SparkClientFactory {
               HiveConf.HIVE_SPARK_SUBMIT_CLIENT + " or " + 
HiveConf.HIVE_SPARK_LAUNCHER_CLIENT);
     }
   }
+
+  @VisibleForTesting
+  public static int getServerPort() {
+    return server.getPort();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3b7a245/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index bd3a7a7..eb824ef 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -46,14 +46,15 @@ public final class RpcConfiguration {
   private static final Logger LOG = 
LoggerFactory.getLogger(RpcConfiguration.class);
 
   public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = 
ImmutableSet.of(
-    HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,
-    HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
-    HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname,
-    HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
-    HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
-    HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
-    HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname,
-    HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname
+      HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,
+      HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
+      HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname,
+      HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
+      HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
+      HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
+      HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname,
+      HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname,
+      HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname
   );
   public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = 
ImmutableSet.of(
     HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,

Reply via email to