Repository: hive Updated Branches: refs/heads/master 84e5b9391 -> a3b7a2452
HIVE-19814: RPC Server port is always random for spark (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3b7a245 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3b7a245 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3b7a245 Branch: refs/heads/master Commit: a3b7a2452bacf6d7eeeb42bd9dd68109c90e27a2 Parents: 84e5b93 Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com> Authored: Wed Sep 12 17:50:29 2018 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Wed Sep 12 17:50:54 2018 -0700 ---------------------------------------------------------------------- .../ql/exec/spark/HiveSparkClientFactory.java | 8 ++++- .../session/TestSparkSessionManagerImpl.java | 35 ++++++++++++++++++++ .../hive/spark/client/SparkClientFactory.java | 6 ++++ .../hive/spark/client/rpc/RpcConfiguration.java | 17 +++++----- 4 files changed, 57 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a3b7a245/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 0aae0d8..a49e72d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -198,7 +199,12 @@ public class HiveSparkClientFactory { LOG.debug(String.format( "Pass Oozie configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword(propertyName,value))); } - + if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { + String value = RpcConfiguration.getValue(hiveConf, propertyName); + sparkConf.put(propertyName, value); + LOG.debug(String.format("load RPC property from hive configuration (%s -> %s).", propertyName, + LogUtils.maskIfPassword(propertyName, value))); + } } final boolean optShuffleSerDe = hiveConf.getBoolVar( http://git-wip-us.apache.org/repos/asf/hive/blob/a3b7a245/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 853e4f4..3882b58 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.spark.client.SparkClientFactory; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -31,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -214,6 +217,38 @@ public class TestSparkSessionManagerImpl { assertEquals("0", ss.getSparkSession().getSessionId()); } + @Test + public void testConfigsForInitialization() { + //Test to make sure that configs listed in RpcConfiguration.HIVE_SPARK_RSC_CONFIGS which are passed + // through HiveConf are included in the Spark configuration. + HiveConf hiveConf = getHiveConf(); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + hiveConf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS, "test-rpc-server-address"); + Map<String, String> sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); + assertEquals("49152-49222,49223,49224-49333", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname)); + assertEquals("test-rpc-server-address", sparkConf.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname)); + } + + @Test + public void testServerPortAssignment() throws Exception { + HiveConf conf = getHiveConf(); + conf.setVar(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT, "49152-49222,49223,49224-49333"); + SparkSessionManagerImpl testSessionManager = SparkSessionManagerImpl.getInstance(); + testSessionManager.setup(conf); + + assertTrue("Port should be within configured port range:" + SparkClientFactory.getServerPort(), + SparkClientFactory.getServerPort() >= 49152 && SparkClientFactory.getServerPort() <= 49333); + + //Verify that new spark session can be created to ensure that new SparkSession + // is successfully able to connect to the RpcServer with custom port. + try { + testSessionManager.getSession(null, conf, true); + } catch (HiveException e) { + Assert.fail("Failed test to connect to the RpcServer with custom port"); + } + + testSessionManager.shutdown(); + } private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) { checkHiveException(ss, e, expectedErrMsg, null); } http://git-wip-us.apache.org/repos/asf/hive/blob/a3b7a245/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 54ecdf0..640d058 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -20,6 +20,7 @@ package org.apache.hive.spark.client; import java.io.IOException; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.client.rpc.RpcServer; @@ -94,4 +95,9 @@ public final class SparkClientFactory { HiveConf.HIVE_SPARK_SUBMIT_CLIENT + " or " + HiveConf.HIVE_SPARK_LAUNCHER_CLIENT); } } + + @VisibleForTesting + public static int getServerPort() { + return server.getPort(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/a3b7a245/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index bd3a7a7..eb824ef 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -46,14 +46,15 @@ public final class RpcConfiguration { private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of( - HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, - HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, - HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, - HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, - HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, + HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, + HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname ); public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname,