HIVE-18533: Add option to use InProcessLauncher to submit spark jobs (Sahil Takiar, reviewed by Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/da663866 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/da663866 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/da663866 Branch: refs/heads/master Commit: da66386662fbbcbde9501b4a7b27d076bcc790d4 Parents: f80cff9 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Tue Jun 5 08:00:54 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Tue Jun 5 08:00:54 2018 -0500 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 12 +- itests/qtest-spark/pom.xml | 6 + .../test/resources/testconfiguration.properties | 3 +- .../clientpositive/spark_in_process_launcher.q | 6 + .../spark/spark_in_process_launcher.q.out | 96 +++ spark-client/pom.xml | 6 + .../hive/spark/client/AbstractSparkClient.java | 600 ++++++++++++++++ .../apache/hive/spark/client/JobHandleImpl.java | 8 +- .../apache/hive/spark/client/SparkClient.java | 7 + .../hive/spark/client/SparkClientFactory.java | 16 +- .../hive/spark/client/SparkClientImpl.java | 703 ------------------- .../spark/client/SparkLauncherSparkClient.java | 220 ++++++ .../spark/client/SparkSubmitSparkClient.java | 237 +++++++ .../apache/hive/spark/client/TestJobHandle.java | 2 +- .../hive/spark/client/TestSparkClient.java | 25 +- .../client/TestSparkLauncherSparkClient.java | 77 ++ 16 files changed, 1306 insertions(+), 718 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3295d1d..56d2de0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -316,6 +316,9 @@ public class HiveConf extends Configuration { public static final String HIVE_SERVER2_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME = "hive.server2.authentication.ldap.userMembershipKey"; + public static final String HIVE_SPARK_SUBMIT_CLIENT = "spark-submit"; + public static final String HIVE_SPARK_LAUNCHER_CLIENT = "spark-launcher"; + /** * dbVars are the parameters can be set per database. If these * parameters are set as a database property, when switching to that @@ -4245,6 +4248,11 @@ public class HiveConf extends Configuration { "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."), SPARK_STAGE_MAX_TASKS("hive.spark.stage.max.tasks", -1, "The maximum number of tasks a stage in a Spark job may have.\n" + "If a Spark job stage contains more tasks than the maximum, the job will be cancelled. A value of -1 means no limit."), + SPARK_CLIENT_TYPE("hive.spark.client.type", HIVE_SPARK_SUBMIT_CLIENT, + "Controls how the Spark application is launched. If " + HIVE_SPARK_SUBMIT_CLIENT + " is " + + "specified (default) then the spark-submit shell script is used to launch the Spark " + + "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " + + "InProcessLauncher is used to programmatically launch the app."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, @@ -4335,7 +4343,8 @@ public class HiveConf extends Configuration { "Comma separated list of variables which are used internally and should not be configurable."), HIVE_SPARK_RSC_CONF_LIST("hive.spark.rsc.conf.list", SPARK_OPTIMIZE_SHUFFLE_SERDE.varname + "," + - SPARK_CLIENT_FUTURE_TIMEOUT.varname, + SPARK_CLIENT_FUTURE_TIMEOUT.varname + "," + + SPARK_CLIENT_TYPE.varname, "Comma separated list of variables which are related to remote spark context.\n" + "Changing these variables will result in re-creating the spark session."), HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s", @@ -5802,5 +5811,4 @@ public class HiveConf extends Configuration { } return ret; } - } http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/itests/qtest-spark/pom.xml ---------------------------------------------------------------------- diff --git a/itests/qtest-spark/pom.xml b/itests/qtest-spark/pom.xml index d0e7eb8..8ed3171 100644 --- a/itests/qtest-spark/pom.xml +++ b/itests/qtest-spark/pom.xml @@ -64,6 +64,12 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index f3cb9de..b6a1c9b 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1586,7 +1586,8 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ spark_use_ts_stats_for_mapjoin.q,\ spark_use_op_stats.q,\ spark_explain_groupbyshuffle.q,\ - spark_opt_shuffle_serde.q + spark_opt_shuffle_serde.q,\ + spark_in_process_launcher.q miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ bucket4.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/ql/src/test/queries/clientpositive/spark_in_process_launcher.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/spark_in_process_launcher.q b/ql/src/test/queries/clientpositive/spark_in_process_launcher.q new file mode 100644 index 0000000..368e135 --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_in_process_launcher.q @@ -0,0 +1,6 @@ +--! qt:dataset:src + +set hive.spark.client.type=spark-launcher; + +explain select key, count(*) from src group by key order by key limit 10; +select key, count(*) from src group by key order by key limit 10; http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out b/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out new file mode 100644 index 0000000..5ae2f26 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_in_process_launcher.q.out @@ -0,0 +1,96 @@ +PREHOOK: query: explain select key, count(*) from src group by key order by key limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain select key, count(*) from src group by key order by key limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 4) + Reducer 3 <- Reducer 2 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col1 (type: bigint) + Execution mode: vectorized + Reducer 2 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col1 (type: bigint) + Reducer 3 + Execution mode: vectorized + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: select key, count(*) from src group by key order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select key, count(*) from src group by key order by key limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 3 +10 1 +100 2 +103 2 +104 2 +105 1 +11 1 +111 1 +113 2 +114 1 http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/pom.xml ---------------------------------------------------------------------- diff --git a/spark-client/pom.xml b/spark-client/pom.xml index c4d8178..9a1e1c2 100644 --- a/spark-client/pom.xml +++ b/spark-client/pom.xml @@ -100,6 +100,12 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java new file mode 100644 index 0000000..ed9222c --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME; + +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.google.common.io.Resources; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.io.Writer; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hive.spark.client.rpc.Rpc; +import org.apache.hive.spark.client.rpc.RpcConfiguration; +import org.apache.hive.spark.client.rpc.RpcServer; +import org.apache.spark.SparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract implementation of {@link SparkClient} that allows sub-classes to override how the + * spark application is launched. It provides the following functionality: (1) creating the client + * connection to the {@link RemoteDriver} and managing its lifecycle, (2) monitoring the thread + * used to submit the Spark application, (3) safe shutdown of the {@link RemoteDriver}, and (4) + * configuration handling for submitting the Spark application. + * + * <p> + * This class contains the client protocol used to communicate with the {@link RemoteDriver}. + * It uses this protocol to submit {@link Job}s to the {@link RemoteDriver}. + * </p> + */ +abstract class AbstractSparkClient implements SparkClient { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkClient.class); + + private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds + + private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; + private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions"; + private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions"; + private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; + private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; + private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; + + protected final Map<String, String> conf; + private final HiveConf hiveConf; + private final Future<Void> driverFuture; + private final Map<String, JobHandleImpl<?>> jobs; + private final Rpc driverRpc; + private final ClientProtocol protocol; + protected volatile boolean isAlive; + + protected AbstractSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf, + String sessionid) throws IOException { + this.conf = conf; + this.hiveConf = hiveConf; + this.jobs = Maps.newConcurrentMap(); + + String secret = rpcServer.createSecret(); + this.driverFuture = startDriver(rpcServer, sessionid, secret); + this.protocol = new ClientProtocol(); + + try { + // The RPC server will take care of timeouts here. + this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get(); + } catch (Throwable e) { + String errorMsg; + if (e.getCause() instanceof TimeoutException) { + errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " + + "include network issues, errors in remote driver, cluster has no available resources, etc." + + "\nPlease check YARN or Spark driver's logs for further information."; + } else if (e.getCause() instanceof InterruptedException) { + errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " + + "that the query was cancelled which would cause the Spark Session to close."; + } else { + errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2."; + } + LOG.error(errorMsg, e); + driverFuture.cancel(true); + try { + driverFuture.get(); + } catch (InterruptedException ie) { + // Give up. + LOG.warn("Interrupted before driver thread was finished.", ie); + } catch (ExecutionException ee) { + LOG.error("Driver thread failed", ee); + } + throw Throwables.propagate(e); + } + + LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress()); + + driverRpc.addListener(new Rpc.Listener() { + @Override + public void rpcClosed(Rpc rpc) { + if (isAlive) { + LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress()); + isAlive = false; + } + } + + @Override + public String toString() { + return "Connection to Remote Spark Driver Closed Unexpectedly"; + } + }); + isAlive = true; + } + + @Override + public <T extends Serializable> JobHandle<T> submit(Job<T> job) { + return protocol.submit(job, Collections.<JobHandle.Listener<T>>emptyList()); + } + + @Override + public <T extends Serializable> JobHandle<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) { + return protocol.submit(job, listeners); + } + + @Override + public <T extends Serializable> Future<T> run(Job<T> job) { + return protocol.run(job); + } + + @Override + public void stop() { + if (isAlive) { + isAlive = false; + try { + protocol.endSession(); + } catch (Exception e) { + LOG.warn("Exception while waiting for end session reply.", e); + } finally { + driverRpc.close(); + } + } + + try { + driverFuture.get(DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + LOG.error("Exception while waiting for driver future to complete", e); + } catch (TimeoutException e) { + LOG.warn("Timed out shutting down remote driver, cancelling..."); + driverFuture.cancel(true); + } catch (InterruptedException ie) { + LOG.debug("Interrupted before driver thread was finished."); + driverFuture.cancel(true); + } + } + + @Override + public Future<?> addJar(URI uri) { + return run(new AddJarJob(uri.toString())); + } + + @Override + public Future<?> addFile(URI uri) { + return run(new AddFileJob(uri.toString())); + } + + @Override + public Future<Integer> getExecutorCount() { + return run(new GetExecutorCountJob()); + } + + @Override + public Future<Integer> getDefaultParallelism() { + return run(new GetDefaultParallelismJob()); + } + + @Override + public boolean isActive() { + return isAlive && driverRpc.isActive(); + } + + @Override + public void cancel(String jobId) { + protocol.cancel(jobId); + } + + private Future<Void> startDriver(final RpcServer rpcServer, final String clientId, + final String secret) throws IOException { + final String serverAddress = rpcServer.getAddress(); + final String serverPort = String.valueOf(rpcServer.getPort()); + + String sparkHome = getSparkHome(); + + String sparkLogDir = conf.get("hive.spark.log.dir"); + if (sparkLogDir == null) { + if (sparkHome == null) { + sparkLogDir = "./target/"; + } else { + sparkLogDir = sparkHome + "/logs/"; + } + } + + String osxTestOpts = ""; + if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) { + osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS)); + } + + String driverJavaOpts = Joiner.on(" ").skipNulls().join( + "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY)); + String executorJavaOpts = Joiner.on(" ").skipNulls().join( + "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY)); + + // Create a file with all the job properties to be read by spark-submit. Change the + // file's permissions so that only the owner can read it. This avoid having the + // connection secret show up in the child process's command line. + File properties = File.createTempFile("spark-submit.", ".properties"); + if (!properties.setReadable(false) || !properties.setReadable(true, true)) { + throw new IOException("Cannot change permissions of job properties file."); + } + properties.deleteOnExit(); + + Properties allProps = new Properties(); + // first load the defaults from spark-defaults.conf if available + try { + URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf"); + if (sparkDefaultsUrl != null) { + LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl); + allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl))); + } + } catch (Exception e) { + String msg = "Exception trying to load spark-defaults.conf: " + e; + throw new IOException(msg, e); + } + // then load the SparkClientImpl config + for (Map.Entry<String, String> e : conf.entrySet()) { + allProps.put(e.getKey(), conf.get(e.getKey())); + } + allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId); + allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret); + allProps.put(DRIVER_OPTS_KEY, driverJavaOpts); + allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts); + + String isTesting = conf.get("spark.testing"); + if (isTesting != null && isTesting.equalsIgnoreCase("true")) { + String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH")); + if (!hiveHadoopTestClasspath.isEmpty()) { + String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH)); + if (extraDriverClasspath.isEmpty()) { + allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath); + } else { + extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator; + allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath); + } + + String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH)); + if (extraExecutorClasspath.isEmpty()) { + allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath); + } else { + extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator; + allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath); + } + } + } + + Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8); + try { + allProps.store(writer, "Spark Context configuration"); + } finally { + writer.close(); + } + + // Define how to pass options to the child process. If launching in client (or local) + // mode, the driver options need to be passed directly on the command line. Otherwise, + // SparkSubmit will take care of that for us. + String master = conf.get("spark.master"); + Preconditions.checkArgument(master != null, "spark.master is not defined."); + String deployMode = conf.get(SPARK_DEPLOY_MODE); + + if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) { + String executorCores = conf.get("spark.executor.cores"); + if (executorCores != null) { + addExecutorCores(executorCores); + } + + String executorMemory = conf.get("spark.executor.memory"); + if (executorMemory != null) { + addExecutorMemory(executorMemory); + } + + String numOfExecutors = conf.get("spark.executor.instances"); + if (numOfExecutors != null) { + addNumExecutors(numOfExecutors); + } + } + // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh + // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or + // delegation token renewal, but not both. Since doAs is a more common case, if both + // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command, + // otherwise, we pass the principal/keypad to spark to support the token renewal for + // long-running application. + if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) { + String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), + "0.0.0.0"); + String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + boolean isDoAsEnabled = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); + if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) { + addKeytabAndPrincipal(isDoAsEnabled, keyTabFile, principal); + } + } + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + try { + String currentUser = Utils.getUGI().getShortUserName(); + // do not do impersonation in CLI mode + if (!currentUser.equals(System.getProperty("user.name"))) { + LOG.info("Attempting impersonation of " + currentUser); + addProxyUser(currentUser); + } + } catch (Exception e) { + String msg = "Cannot obtain username: " + e; + throw new IllegalStateException(msg, e); + } + } + + String regStr = conf.get("spark.kryo.registrator"); + if (HIVE_KRYO_REG_NAME.equals(regStr)) { + addJars(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); + } + + addPropertiesFile(properties.getAbsolutePath()); + addClass(RemoteDriver.class.getName()); + + String jar = "spark-internal"; + if (SparkContext.jarOfClass(this.getClass()).isDefined()) { + jar = SparkContext.jarOfClass(this.getClass()).get(); + } + addExecutableJar(jar); + + + addAppArg(RemoteDriver.REMOTE_DRIVER_HOST_CONF); + addAppArg(serverAddress); + addAppArg(RemoteDriver.REMOTE_DRIVER_PORT_CONF); + addAppArg(serverPort); + + //hive.spark.* keys are passed down to the RemoteDriver via REMOTE_DRIVER_CONF + // so that they are not used in sparkContext but only in remote driver, + //as --properties-file contains the spark.* keys that are meant for SparkConf object. + for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) { + String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey); + addAppArg(RemoteDriver.REMOTE_DRIVER_CONF); + addAppArg(String.format("%s=%s", hiveSparkConfKey, value)); + } + + return launchDriver(isTesting, rpcServer, clientId); + } + + protected abstract Future<Void> launchDriver(String isTesting, RpcServer rpcServer, String + clientId) throws IOException; + + protected abstract String getSparkHome(); + + protected abstract void addAppArg(String arg); + + protected abstract void addExecutableJar(String jar); + + protected abstract void addPropertiesFile(String absolutePath); + + protected abstract void addClass(String name); + + protected abstract void addJars(String jars); + + protected abstract void addProxyUser(String proxyUser); + + protected abstract void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, + String principal); + + protected abstract void addNumExecutors(String numOfExecutors); + + protected abstract void addExecutorMemory(String executorMemory); + + protected abstract void addExecutorCores(String executorCores); + + private class ClientProtocol extends BaseProtocol { + + <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) { + final String jobId = UUID.randomUUID().toString(); + final Promise<T> promise = driverRpc.createPromise(); + final JobHandleImpl<T> handle = + new JobHandleImpl<T>(AbstractSparkClient.this, promise, jobId, listeners); + jobs.put(jobId, handle); + + final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job)); + LOG.debug("Send JobRequest[{}].", jobId); + + // Link the RPC and the promise so that events from one are propagated to the other as + // needed. + rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() { + @Override + public void operationComplete(io.netty.util.concurrent.Future<Void> f) { + if (f.isSuccess()) { + // If the spark job finishes before this listener is called, the QUEUED status will not be set + handle.changeState(JobHandle.State.QUEUED); + } else if (!promise.isDone()) { + promise.setFailure(f.cause()); + } + } + }); + promise.addListener(new GenericFutureListener<Promise<T>>() { + @Override + public void operationComplete(Promise<T> p) { + if (jobId != null) { + jobs.remove(jobId); + } + if (p.isCancelled() && !rpc.isDone()) { + rpc.cancel(true); + } + } + }); + return handle; + } + + <T extends Serializable> Future<T> run(Job<T> job) { + @SuppressWarnings("unchecked") + final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>) + driverRpc.call(new SyncJobRequest(job), Serializable.class); + return rpc; + } + + void cancel(String jobId) { + driverRpc.call(new CancelJob(jobId)); + } + + Future<?> endSession() { + return driverRpc.call(new EndSession()); + } + + private void handle(ChannelHandlerContext ctx, Error msg) { + LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause); + } + + private void handle(ChannelHandlerContext ctx, JobMetrics msg) { + JobHandleImpl<?> handle = jobs.get(msg.jobId); + if (handle != null) { + handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics); + } else { + LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId); + } + } + + private void handle(ChannelHandlerContext ctx, JobResult msg) { + JobHandleImpl<?> handle = jobs.remove(msg.id); + if (handle != null) { + LOG.debug("Received result for client job {}", msg.id); + handle.setSparkCounters(msg.sparkCounters); + Throwable error = msg.error; + if (error == null) { + handle.setSuccess(msg.result); + } else { + handle.setFailure(error); + } + } else { + LOG.warn("Received result for unknown client job {}", msg.id); + } + } + + private void handle(ChannelHandlerContext ctx, JobStarted msg) { + JobHandleImpl<?> handle = jobs.get(msg.id); + if (handle != null) { + handle.changeState(JobHandle.State.STARTED); + } else { + LOG.warn("Received event for unknown client job {}", msg.id); + } + } + + private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { + JobHandleImpl<?> handle = jobs.get(msg.clientJobId); + if (handle != null) { + LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId); + handle.addSparkJobId(msg.sparkJobId); + } else { + LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId); + } + } + + @Override + protected String name() { + return "HiveServer2 to Remote Spark Driver Connection"; + } + } + + private static class AddJarJob implements Job<Serializable> { + private static final long serialVersionUID = 1L; + + private final String path; + + AddJarJob() { + this(null); + } + + AddJarJob(String path) { + this.path = path; + } + + @Override + public Serializable call(JobContext jc) throws Exception { + jc.sc().addJar(path); + // Following remote job may refer to classes in this jar, and the remote job would be executed + // in a different thread, so we add this jar path to JobContext for further usage. + jc.getAddedJars().put(path, System.currentTimeMillis()); + return null; + } + + } + + private static class AddFileJob implements Job<Serializable> { + private static final long serialVersionUID = 1L; + + private final String path; + + AddFileJob() { + this(null); + } + + AddFileJob(String path) { + this.path = path; + } + + @Override + public Serializable call(JobContext jc) throws Exception { + jc.sc().addFile(path); + return null; + } + + } + + private static class GetExecutorCountJob implements Job<Integer> { + private static final long serialVersionUID = 1L; + + @Override + public Integer call(JobContext jc) throws Exception { + // minus 1 here otherwise driver is also counted as an executor + int count = jc.sc().sc().getExecutorMemoryStatus().size() - 1; + return Integer.valueOf(count); + } + + } + + private static class GetDefaultParallelismJob implements Job<Integer> { + private static final long serialVersionUID = 1L; + + @Override + public Integer call(JobContext jc) throws Exception { + return jc.sc().sc().defaultParallelism(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 2881252..61489a3 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -34,7 +34,7 @@ import org.apache.hive.spark.counter.SparkCounters; */ class JobHandleImpl<T extends Serializable> implements JobHandle<T> { - private final SparkClientImpl client; + private final SparkClient client; private final String jobId; private final MetricsCollection metrics; private final Promise<T> promise; @@ -43,8 +43,8 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> { private volatile State state; private volatile SparkCounters sparkCounters; - JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId, - List<Listener<T>> listeners) { + JobHandleImpl(SparkClient client, Promise<T> promise, String jobId, + List<Listener<T>> listeners) { this.client = client; this.jobId = jobId; this.promise = promise; @@ -233,7 +233,7 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> { } } - /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */ + /** Last attempt at preventing stray jobs from accumulating in SparkClient. */ @Override protected void finalize() { if (!isDone()) { http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java index 1922e41..9138899 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java @@ -110,4 +110,11 @@ public interface SparkClient extends Serializable { * Check if remote context is still active. */ boolean isActive(); + + /** + * Cancel the specified jobId + * + * @param jobId the jobId to cancel + */ + void cancel(String jobId); } http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 88b5c95..1974e88 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -18,13 +18,11 @@ package org.apache.hive.spark.client; import java.io.IOException; -import java.io.PrintStream; import java.util.Map; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.client.rpc.RpcServer; -import org.apache.spark.SparkException; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -82,10 +80,18 @@ public final class SparkClientFactory { * @param hiveConf Configuration for Hive, contains hive.* properties. */ public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf, - String sessionId) - throws IOException, SparkException { + String sessionId) throws IOException { Preconditions.checkState(server != null, "Invalid state: Hive on Spark RPC Server has not been initialized"); - return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); + switch (hiveConf.getVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE)) { + case HiveConf.HIVE_SPARK_SUBMIT_CLIENT: + return new SparkSubmitSparkClient(server, sparkConf, hiveConf, sessionId); + case HiveConf.HIVE_SPARK_LAUNCHER_CLIENT: + return new SparkLauncherSparkClient(server, sparkConf, hiveConf, sessionId); + default: + throw new IllegalArgumentException("Unknown Hive on Spark launcher type " + hiveConf.getVar( + HiveConf.ConfVars.SPARK_CLIENT_TYPE) + " valid options are " + + HiveConf.HIVE_SPARK_SUBMIT_CLIENT + " or " + HiveConf.HIVE_SPARK_LAUNCHER_CLIENT); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java deleted file mode 100644 index 847c82b..0000000 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ /dev/null @@ -1,703 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.spark.client; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME; - -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Resources; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.io.Writer; -import java.net.URI; -import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.common.log.LogRedirector; -import org.apache.hadoop.hive.conf.Constants; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hive.spark.client.rpc.Rpc; -import org.apache.hive.spark.client.rpc.RpcConfiguration; -import org.apache.hive.spark.client.rpc.RpcServer; -import org.apache.spark.SparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class SparkClientImpl implements SparkClient { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); - - private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds - - private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; - private static final String SPARK_HOME_ENV = "SPARK_HOME"; - private static final String SPARK_HOME_KEY = "spark.home"; - private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions"; - private static final String EXECUTOR_OPTS_KEY = "spark.executor.extraJavaOptions"; - private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; - private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; - - private final Map<String, String> conf; - private final HiveConf hiveConf; - private final Thread driverThread; - private final Map<String, JobHandleImpl<?>> jobs; - private final Rpc driverRpc; - private final ClientProtocol protocol; - private volatile boolean isAlive; - - SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf, - String sessionid) throws IOException { - this.conf = conf; - this.hiveConf = hiveConf; - this.jobs = Maps.newConcurrentMap(); - - String secret = rpcServer.createSecret(); - this.driverThread = startDriver(rpcServer, sessionid, secret); - this.protocol = new ClientProtocol(); - - try { - // The RPC server will take care of timeouts here. - this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get(); - } catch (Throwable e) { - String errorMsg; - if (e.getCause() instanceof TimeoutException) { - errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " + - "include network issues, errors in remote driver, cluster has no available resources, etc." + - "\nPlease check YARN or Spark driver's logs for further information."; - } else if (e.getCause() instanceof InterruptedException) { - errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " + - "that the query was cancelled which would cause the Spark Session to close."; - } else { - errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2."; - } - LOG.error(errorMsg, e); - driverThread.interrupt(); - try { - driverThread.join(); - } catch (InterruptedException ie) { - // Give up. - LOG.warn("Interrupted before driver thread was finished.", ie); - } - throw Throwables.propagate(e); - } - - LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress()); - - driverRpc.addListener(new Rpc.Listener() { - @Override - public void rpcClosed(Rpc rpc) { - if (isAlive) { - LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress()); - isAlive = false; - } - } - - @Override - public String toString() { - return "Connection to Remote Spark Driver Closed Unexpectedly"; - } - }); - isAlive = true; - } - - @Override - public <T extends Serializable> JobHandle<T> submit(Job<T> job) { - return protocol.submit(job, Collections.<JobHandle.Listener<T>>emptyList()); - } - - @Override - public <T extends Serializable> JobHandle<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) { - return protocol.submit(job, listeners); - } - - @Override - public <T extends Serializable> Future<T> run(Job<T> job) { - return protocol.run(job); - } - - @Override - public void stop() { - if (isAlive) { - isAlive = false; - try { - protocol.endSession(); - } catch (Exception e) { - LOG.warn("Exception while waiting for end session reply.", e); - } finally { - driverRpc.close(); - } - } - - long endTime = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT; - try { - driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT); - } catch (InterruptedException ie) { - LOG.debug("Interrupted before driver thread was finished."); - } - if (endTime - System.currentTimeMillis() <= 0) { - LOG.warn("Timed out shutting down remote driver, interrupting..."); - driverThread.interrupt(); - } - } - - @Override - public Future<?> addJar(URI uri) { - return run(new AddJarJob(uri.toString())); - } - - @Override - public Future<?> addFile(URI uri) { - return run(new AddFileJob(uri.toString())); - } - - @Override - public Future<Integer> getExecutorCount() { - return run(new GetExecutorCountJob()); - } - - @Override - public Future<Integer> getDefaultParallelism() { - return run(new GetDefaultParallelismJob()); - } - - @Override - public boolean isActive() { - return isAlive && driverRpc.isActive(); - } - - void cancel(String jobId) { - protocol.cancel(jobId); - } - - private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret) - throws IOException { - Runnable runnable; - final String serverAddress = rpcServer.getAddress(); - final String serverPort = String.valueOf(rpcServer.getPort()); - - // If a Spark installation is provided, use the spark-submit script. Otherwise, call the - // SparkSubmit class directly, which has some caveats (like having to provide a proper - // version of Guava on the classpath depending on the deploy mode). - String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY)); - if (sparkHome == null) { - sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV)); - } - if (sparkHome == null) { - sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY)); - } - String sparkLogDir = conf.get("hive.spark.log.dir"); - if (sparkLogDir == null) { - if (sparkHome == null) { - sparkLogDir = "./target/"; - } else { - sparkLogDir = sparkHome + "/logs/"; - } - } - - String osxTestOpts = ""; - if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) { - osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS)); - } - - String driverJavaOpts = Joiner.on(" ").skipNulls().join( - "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY)); - String executorJavaOpts = Joiner.on(" ").skipNulls().join( - "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY)); - - // Create a file with all the job properties to be read by spark-submit. Change the - // file's permissions so that only the owner can read it. This avoid having the - // connection secret show up in the child process's command line. - File properties = File.createTempFile("spark-submit.", ".properties"); - if (!properties.setReadable(false) || !properties.setReadable(true, true)) { - throw new IOException("Cannot change permissions of job properties file."); - } - properties.deleteOnExit(); - - Properties allProps = new Properties(); - // first load the defaults from spark-defaults.conf if available - try { - URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf"); - if (sparkDefaultsUrl != null) { - LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl); - allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl))); - } - } catch (Exception e) { - String msg = "Exception trying to load spark-defaults.conf: " + e; - throw new IOException(msg, e); - } - // then load the SparkClientImpl config - for (Map.Entry<String, String> e : conf.entrySet()) { - allProps.put(e.getKey(), conf.get(e.getKey())); - } - allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId); - allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret); - allProps.put(DRIVER_OPTS_KEY, driverJavaOpts); - allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts); - - String isTesting = conf.get("spark.testing"); - if (isTesting != null && isTesting.equalsIgnoreCase("true")) { - String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH")); - if (!hiveHadoopTestClasspath.isEmpty()) { - String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH)); - if (extraDriverClasspath.isEmpty()) { - allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath); - } else { - extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator; - allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath); - } - - String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH)); - if (extraExecutorClasspath.isEmpty()) { - allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath); - } else { - extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator; - allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath); - } - } - } - - Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8); - try { - allProps.store(writer, "Spark Context configuration"); - } finally { - writer.close(); - } - - // Define how to pass options to the child process. If launching in client (or local) - // mode, the driver options need to be passed directly on the command line. Otherwise, - // SparkSubmit will take care of that for us. - String master = conf.get("spark.master"); - Preconditions.checkArgument(master != null, "spark.master is not defined."); - String deployMode = conf.get("spark.submit.deployMode"); - - List<String> argv = Lists.newLinkedList(); - - if (sparkHome != null) { - argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); - } else { - LOG.info("No spark.home provided, calling SparkSubmit directly."); - argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); - - if (master.startsWith("local") || master.startsWith("mesos") || - SparkClientUtilities.isYarnClientMode(master, deployMode) || - master.startsWith("spark")) { - String mem = conf.get("spark.driver.memory"); - if (mem != null) { - argv.add("-Xms" + mem); - argv.add("-Xmx" + mem); - } - - String cp = conf.get("spark.driver.extraClassPath"); - if (cp != null) { - argv.add("-classpath"); - argv.add(cp); - } - - String libPath = conf.get("spark.driver.extraLibPath"); - if (libPath != null) { - argv.add("-Djava.library.path=" + libPath); - } - - String extra = conf.get(DRIVER_OPTS_KEY); - if (extra != null) { - for (String opt : extra.split("[ ]")) { - if (!opt.trim().isEmpty()) { - argv.add(opt.trim()); - } - } - } - } - - argv.add("org.apache.spark.deploy.SparkSubmit"); - } - - if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) { - String executorCores = conf.get("spark.executor.cores"); - if (executorCores != null) { - argv.add("--executor-cores"); - argv.add(executorCores); - } - - String executorMemory = conf.get("spark.executor.memory"); - if (executorMemory != null) { - argv.add("--executor-memory"); - argv.add(executorMemory); - } - - String numOfExecutors = conf.get("spark.executor.instances"); - if (numOfExecutors != null) { - argv.add("--num-executors"); - argv.add(numOfExecutors); - } - } - // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh - // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or - // delegation token renewal, but not both. Since doAs is a more common case, if both - // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command, - // otherwise, we pass the principal/keypad to spark to support the token renewal for - // long-running application. - if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) { - String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), - "0.0.0.0"); - String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); - if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) { - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { - List<String> kinitArgv = Lists.newLinkedList(); - kinitArgv.add("kinit"); - kinitArgv.add(principal); - kinitArgv.add("-k"); - kinitArgv.add("-t"); - kinitArgv.add(keyTabFile + ";"); - kinitArgv.addAll(argv); - argv = kinitArgv; - } else { - // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to - // support the possible delegation token renewal in Spark - argv.add("--principal"); - argv.add(principal); - argv.add("--keytab"); - argv.add(keyTabFile); - } - } - } - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { - try { - String currentUser = Utils.getUGI().getShortUserName(); - // do not do impersonation in CLI mode - if (!currentUser.equals(System.getProperty("user.name"))) { - LOG.info("Attempting impersonation of " + currentUser); - argv.add("--proxy-user"); - argv.add(currentUser); - } - } catch (Exception e) { - String msg = "Cannot obtain username: " + e; - throw new IllegalStateException(msg, e); - } - } - - String regStr = conf.get("spark.kryo.registrator"); - if (HIVE_KRYO_REG_NAME.equals(regStr)) { - argv.add("--jars"); - argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); - } - - argv.add("--properties-file"); - argv.add(properties.getAbsolutePath()); - argv.add("--class"); - argv.add(RemoteDriver.class.getName()); - - String jar = "spark-internal"; - if (SparkContext.jarOfClass(this.getClass()).isDefined()) { - jar = SparkContext.jarOfClass(this.getClass()).get(); - } - argv.add(jar); - - argv.add(RemoteDriver.REMOTE_DRIVER_HOST_CONF); - argv.add(serverAddress); - argv.add(RemoteDriver.REMOTE_DRIVER_PORT_CONF); - argv.add(serverPort); - - //hive.spark.* keys are passed down to the RemoteDriver via REMOTE_DRIVER_CONF - // so that they are not used in sparkContext but only in remote driver, - //as --properties-file contains the spark.* keys that are meant for SparkConf object. - for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) { - String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey); - argv.add(RemoteDriver.REMOTE_DRIVER_CONF); - argv.add(String.format("%s=%s", hiveSparkConfKey, value)); - } - - String cmd = Joiner.on(" ").join(argv); - LOG.info("Running client driver with argv: {}", cmd); - ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd); - - // Prevent hive configurations from being visible in Spark. - pb.environment().remove("HIVE_HOME"); - pb.environment().remove("HIVE_CONF_DIR"); - // Add credential provider password to the child process's environment - // In case of Spark the credential provider location is provided in the jobConf when the job is submitted - String password = getSparkJobCredentialProviderPassword(); - if(password != null) { - pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); - } - if (isTesting != null) { - pb.environment().put("SPARK_TESTING", isTesting); - } - - final Process child = pb.start(); - String threadName = Thread.currentThread().getName(); - final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>()); - final LogRedirector.LogSourceCallback callback = () -> {return isAlive;}; - - LogRedirector.redirect("spark-submit-stdout-redir-" + threadName, - new LogRedirector(child.getInputStream(), LOG, callback)); - LogRedirector.redirect("spark-submit-stderr-redir-" + threadName, - new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback)); - - runnable = new Runnable() { - @Override - public void run() { - try { - int exitCode = child.waitFor(); - if (exitCode != 0) { - StringBuilder errStr = new StringBuilder(); - synchronized(childErrorLog) { - Iterator iter = childErrorLog.iterator(); - while(iter.hasNext()){ - errStr.append(iter.next()); - errStr.append('\n'); - } - } - - LOG.warn("Child process exited with code {}", exitCode); - rpcServer.cancelClient(clientId, - "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); - } - } catch (InterruptedException ie) { - LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); - rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted"); - Thread.interrupted(); - child.destroy(); - } catch (Exception e) { - String errMsg = "Exception while waiting for child process (spark-submit)"; - LOG.warn(errMsg, e); - rpcServer.cancelClient(clientId, errMsg); - } - } - }; - - Thread thread = new Thread(runnable); - thread.setDaemon(true); - thread.setName("Driver"); - thread.start(); - return thread; - } - - private String getSparkJobCredentialProviderPassword() { - if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) { - return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD"); - } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) { - return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD"); - } - return null; - } - - private class ClientProtocol extends BaseProtocol { - - <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) { - final String jobId = UUID.randomUUID().toString(); - final Promise<T> promise = driverRpc.createPromise(); - final JobHandleImpl<T> handle = - new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId, listeners); - jobs.put(jobId, handle); - - final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job)); - LOG.debug("Send JobRequest[{}].", jobId); - - // Link the RPC and the promise so that events from one are propagated to the other as - // needed. - rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() { - @Override - public void operationComplete(io.netty.util.concurrent.Future<Void> f) { - if (f.isSuccess()) { - // If the spark job finishes before this listener is called, the QUEUED status will not be set - handle.changeState(JobHandle.State.QUEUED); - } else if (!promise.isDone()) { - promise.setFailure(f.cause()); - } - } - }); - promise.addListener(new GenericFutureListener<Promise<T>>() { - @Override - public void operationComplete(Promise<T> p) { - if (jobId != null) { - jobs.remove(jobId); - } - if (p.isCancelled() && !rpc.isDone()) { - rpc.cancel(true); - } - } - }); - return handle; - } - - <T extends Serializable> Future<T> run(Job<T> job) { - @SuppressWarnings("unchecked") - final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>) - driverRpc.call(new SyncJobRequest(job), Serializable.class); - return rpc; - } - - void cancel(String jobId) { - driverRpc.call(new CancelJob(jobId)); - } - - Future<?> endSession() { - return driverRpc.call(new EndSession()); - } - - private void handle(ChannelHandlerContext ctx, Error msg) { - LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause); - } - - private void handle(ChannelHandlerContext ctx, JobMetrics msg) { - JobHandleImpl<?> handle = jobs.get(msg.jobId); - if (handle != null) { - handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics); - } else { - LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId); - } - } - - private void handle(ChannelHandlerContext ctx, JobResult msg) { - JobHandleImpl<?> handle = jobs.remove(msg.id); - if (handle != null) { - LOG.debug("Received result for client job {}", msg.id); - handle.setSparkCounters(msg.sparkCounters); - Throwable error = msg.error; - if (error == null) { - handle.setSuccess(msg.result); - } else { - handle.setFailure(error); - } - } else { - LOG.warn("Received result for unknown client job {}", msg.id); - } - } - - private void handle(ChannelHandlerContext ctx, JobStarted msg) { - JobHandleImpl<?> handle = jobs.get(msg.id); - if (handle != null) { - handle.changeState(JobHandle.State.STARTED); - } else { - LOG.warn("Received event for unknown client job {}", msg.id); - } - } - - private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { - JobHandleImpl<?> handle = jobs.get(msg.clientJobId); - if (handle != null) { - LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId); - handle.addSparkJobId(msg.sparkJobId); - } else { - LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId); - } - } - - @Override - protected String name() { - return "HiveServer2 to Remote Spark Driver Connection"; - } - } - - private static class AddJarJob implements Job<Serializable> { - private static final long serialVersionUID = 1L; - - private final String path; - - AddJarJob() { - this(null); - } - - AddJarJob(String path) { - this.path = path; - } - - @Override - public Serializable call(JobContext jc) throws Exception { - jc.sc().addJar(path); - // Following remote job may refer to classes in this jar, and the remote job would be executed - // in a different thread, so we add this jar path to JobContext for further usage. - jc.getAddedJars().put(path, System.currentTimeMillis()); - return null; - } - - } - - private static class AddFileJob implements Job<Serializable> { - private static final long serialVersionUID = 1L; - - private final String path; - - AddFileJob() { - this(null); - } - - AddFileJob(String path) { - this.path = path; - } - - @Override - public Serializable call(JobContext jc) throws Exception { - jc.sc().addFile(path); - return null; - } - - } - - private static class GetExecutorCountJob implements Job<Integer> { - private static final long serialVersionUID = 1L; - - @Override - public Integer call(JobContext jc) throws Exception { - // minus 1 here otherwise driver is also counted as an executor - int count = jc.sc().sc().getExecutorMemoryStatus().size() - 1; - return Integer.valueOf(count); - } - - } - - private static class GetDefaultParallelismJob implements Job<Integer> { - private static final long serialVersionUID = 1L; - - @Override - public Integer call(JobContext jc) throws Exception { - return jc.sc().sc().defaultParallelism(); - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java new file mode 100644 index 0000000..cf52c4f --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.spark.client.rpc.RpcServer; + +import org.apache.spark.launcher.AbstractLauncher; +import org.apache.spark.launcher.InProcessLauncher; +import org.apache.spark.launcher.SparkAppHandle; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + + +/** + * Extends the {@link AbstractSparkClient} and uses Spark's + * {@link org.apache.spark.launcher.SparkLauncher} to submit the HoS application. Specifically, + * it uses the {@link InProcessLauncher} to avoid spawning a sub-process to submit the Spark app. + * It uses a {@link Thread} to monitor when the Spark app has been successfully submitted. The + * thread can be interrupted, in which case the {@link RpcServer} client will be cancelled and + * the Spark app will be stopped. + */ +public class SparkLauncherSparkClient extends AbstractSparkClient { + + private static final Logger LOG = LoggerFactory.getLogger( + SparkLauncherSparkClient.class.getName()); + + private static final long serialVersionUID = 2153000661341457380L; + + private static final Set<SparkAppHandle.State> FAILED_SPARK_STATES = Sets.newHashSet( + SparkAppHandle.State.FAILED, + SparkAppHandle.State.KILLED, + SparkAppHandle.State.LOST); + + private AbstractLauncher<InProcessLauncher> sparkLauncher; + + SparkLauncherSparkClient(RpcServer rpcServer, + Map<String, String> conf, + HiveConf hiveConf, + String sessionid) throws IOException { + super(rpcServer, conf, hiveConf, sessionid); + } + + @Override + protected Future<Void> launchDriver(String isTesting, RpcServer rpcServer, + String clientId) throws IOException { + if (isTesting != null) { + System.setProperty("spark.testing", "true"); + } + + // Only allow the spark.master to be local in unit tests + if (isTesting == null) { + Preconditions.checkArgument(SparkClientUtilities.isYarnClusterMode( + this.conf.get("spark.master"), this.conf.get("spark.submit.deployMode")), + getClass().getName() + " is only supported in yarn-cluster mode"); + } + + // Monitors when the Spark app has been successfully started + CountDownLatch shutdownLatch = new CountDownLatch(1); + + // Submit the app + SparkAppHandle sparkAppHandle = getSparkLauncher().startApplication( + new SparkAppListener(shutdownLatch, rpcServer, clientId)); + + return createSparkLauncherFuture(shutdownLatch, sparkAppHandle, rpcServer, clientId); + } + + @VisibleForTesting + static Future<Void> createSparkLauncherFuture(CountDownLatch shutdownLatch, + SparkAppHandle sparkAppHandle, RpcServer rpcServer, + String clientId) { + // Monitor the countdown latch + Callable<Void> runnable = () -> { + try { + shutdownLatch.await(); + } catch (InterruptedException e) { + rpcServer.cancelClient(clientId, "Spark app launcher interrupted"); + sparkAppHandle.stop(); + } + return null; + }; + + FutureTask<Void> futureTask = new FutureTask<>(runnable); + + Thread driverThread = new Thread(futureTask); + driverThread.setDaemon(true); + driverThread.setName("SparkLauncherMonitor"); + driverThread.start(); + + return futureTask; + } + + @Override + protected String getSparkHome() { + return null; + } + + @Override + protected void addAppArg(String arg) { + getSparkLauncher().addAppArgs(arg); + } + + @Override + protected void addExecutableJar(String jar) { + getSparkLauncher().setAppResource(jar); + } + + @Override + protected void addPropertiesFile(String absolutePath) { + getSparkLauncher().setPropertiesFile(absolutePath); + } + + @Override + protected void addClass(String name) { + getSparkLauncher().setMainClass(name); + } + + @Override + protected void addJars(String jars) { + getSparkLauncher().addJar(jars); + } + + @Override + protected void addProxyUser(String proxyUser) { + throw new UnsupportedOperationException(); +// getSparkLauncher().addSparkArg("--proxy-user", proxyUser); + } + + @Override + protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) { + throw new UnsupportedOperationException(); +// getSparkLauncher().addSparkArg("--principal", principal); +// getSparkLauncher().addSparkArg("--keytab", keyTabFile); + } + + @Override + protected void addNumExecutors(String numOfExecutors) { + getSparkLauncher().addSparkArg("--num-executors", numOfExecutors); + } + + @Override + protected void addExecutorMemory(String executorMemory) { + getSparkLauncher().addSparkArg("--executor-memory", executorMemory); + } + + @Override + protected void addExecutorCores(String executorCores) { + getSparkLauncher().addSparkArg("--executor-cores", executorCores); + } + + private AbstractLauncher<InProcessLauncher> getSparkLauncher() { + if (this.sparkLauncher == null) { + this.sparkLauncher = new InProcessLauncher(); + } + return this.sparkLauncher; + } + + @VisibleForTesting + static final class SparkAppListener implements SparkAppHandle.Listener { + + private final CountDownLatch shutdownLatch; + private final RpcServer rpcServer; + private final String clientId; + + SparkAppListener(CountDownLatch shutdownLatch, RpcServer rpcServer, String clientId) { + this.shutdownLatch = shutdownLatch; + this.rpcServer = rpcServer; + this.clientId = clientId; + } + + @Override + public void stateChanged(SparkAppHandle sparkAppHandle) { + LOG.info("Spark app transitioned to state = " + sparkAppHandle.getState()); + if (sparkAppHandle.getState().isFinal() || sparkAppHandle.getState().equals( + SparkAppHandle.State.RUNNING)) { + this.shutdownLatch.countDown(); + sparkAppHandle.disconnect(); + LOG.info("Successfully disconnected from Spark app handle"); + } + if (FAILED_SPARK_STATES.contains(sparkAppHandle.getState())) { + this.rpcServer.cancelClient(this.clientId, "Spark app launcher failed," + + " transitioned to state " + sparkAppHandle.getState()); + } + } + + @Override + public void infoChanged(SparkAppHandle sparkAppHandle) { + // Do nothing + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java new file mode 100644 index 0000000..1a524b9 --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +import org.apache.hadoop.hive.common.log.LogRedirector; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.spark.client.rpc.RpcServer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Extends the {@link AbstractSparkClient} and launches a child process to run Spark's {@code + * bin/spark-submit} script. Logs are re-directed from the child process logs. + */ +class SparkSubmitSparkClient extends AbstractSparkClient { + + private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class); + + private static final String SPARK_HOME_ENV = "SPARK_HOME"; + private static final String SPARK_HOME_KEY = "spark.home"; + + private static final long serialVersionUID = -4272763023516238171L; + + private List<String> argv; + + SparkSubmitSparkClient(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf, + String sessionid) throws IOException { + super(rpcServer, conf, hiveConf, sessionid); + } + + @Override + protected String getSparkHome() { + String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY)); + if (sparkHome == null) { + sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV)); + } + if (sparkHome == null) { + sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY)); + } + + Preconditions.checkNotNull(sparkHome, "Cannot use " + HiveConf.HIVE_SPARK_SUBMIT_CLIENT + + " without setting Spark Home"); + String master = conf.get("spark.master"); + Preconditions.checkArgument(master != null, "spark.master is not defined."); + + argv = Lists.newLinkedList(); + argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); + + return sparkHome; + } + + @Override + protected void addAppArg(String arg) { + argv.add(arg); + } + + @Override + protected void addExecutableJar(String jar) { + argv.add(jar); + } + + @Override + protected void addPropertiesFile(String absolutePath) { + argv.add("--properties-file"); + argv.add(absolutePath); + } + + @Override + protected void addClass(String name) { + argv.add("--class"); + argv.add(RemoteDriver.class.getName()); + } + + @Override + protected void addJars(String jars) { + argv.add("--jars"); + argv.add(jars); + } + + @Override + protected void addProxyUser(String proxyUser) { + argv.add("--proxy-user"); + argv.add(proxyUser); + } + + @Override + protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) { + if (isDoAsEnabled) { + List<String> kinitArgv = Lists.newLinkedList(); + kinitArgv.add("kinit"); + kinitArgv.add(principal); + kinitArgv.add("-k"); + kinitArgv.add("-t"); + kinitArgv.add(keyTabFile + ";"); + kinitArgv.addAll(argv); + argv = kinitArgv; + } else { + // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to + // support the possible delegation token renewal in Spark + argv.add("--principal"); + argv.add(principal); + argv.add("--keytab"); + argv.add(keyTabFile); + } + } + + @Override + protected void addNumExecutors(String numOfExecutors) { + argv.add("--num-executors"); + argv.add(numOfExecutors); + } + + @Override + protected void addExecutorMemory(String executorMemory) { + argv.add("--executor-memory"); + argv.add(executorMemory); + } + + @Override + protected void addExecutorCores(String executorCores) { + argv.add("--executor-cores"); + argv.add(executorCores); + } + + private String getSparkJobCredentialProviderPassword() { + if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) { + return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD"); + } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) { + return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD"); + } + return null; + } + + @Override + protected Future<Void> launchDriver(String isTesting, RpcServer rpcServer, String clientId) throws + IOException { + Callable<Void> runnable; + + String cmd = Joiner.on(" ").join(argv); + LOG.info("Running client driver with argv: {}", cmd); + ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd); + + // Prevent hive configurations from being visible in Spark. + pb.environment().remove("HIVE_HOME"); + pb.environment().remove("HIVE_CONF_DIR"); + // Add credential provider password to the child process's environment + // In case of Spark the credential provider location is provided in the jobConf when the job is submitted + String password = getSparkJobCredentialProviderPassword(); + if(password != null) { + pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); + } + if (isTesting != null) { + pb.environment().put("SPARK_TESTING", isTesting); + } + + final Process child = pb.start(); + String threadName = Thread.currentThread().getName(); + final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>()); + final LogRedirector.LogSourceCallback callback = () -> isAlive; + + LogRedirector.redirect("spark-submit-stdout-redir-" + threadName, + new LogRedirector(child.getInputStream(), LOG, callback)); + LogRedirector.redirect("spark-submit-stderr-redir-" + threadName, + new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback)); + + runnable = () -> { + try { + int exitCode = child.waitFor(); + if (exitCode != 0) { + StringBuilder errStr = new StringBuilder(); + synchronized(childErrorLog) { + for (Object aChildErrorLog : childErrorLog) { + errStr.append(aChildErrorLog); + errStr.append('\n'); + } + } + + LOG.warn("Child process exited with code {}", exitCode); + rpcServer.cancelClient(clientId, + "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); + } + } catch (InterruptedException ie) { + LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); + rpcServer.cancelClient(clientId, "Thread waiting on the child process (spark-submit) is interrupted"); + Thread.interrupted(); + child.destroy(); + } catch (Exception e) { + String errMsg = "Exception while waiting for child process (spark-submit)"; + LOG.warn(errMsg, e); + rpcServer.cancelClient(clientId, errMsg); + } + return null; + }; + + FutureTask<Void> futureTask = new FutureTask<>(runnable); + + Thread driverThread = new Thread(futureTask); + driverThread.setDaemon(true); + driverThread.setName("SparkSubmitMonitor"); + driverThread.start(); + + return futureTask; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java index d6b627b..b81a34b 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java @@ -32,7 +32,7 @@ import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) public class TestJobHandle { - @Mock private SparkClientImpl client; + @Mock private SparkClient client; @Mock private Promise<Serializable> promise; @Mock private JobHandle.Listener<Serializable> listener; @Mock private JobHandle.Listener<Serializable> listener2; http://git-wip-us.apache.org/repos/asf/hive/blob/da663866/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index c134625..681463e 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -34,7 +34,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.InputStream; -import java.io.PrintStream; import java.io.Serializable; import java.net.URI; import java.nio.file.Paths; @@ -54,7 +53,7 @@ import com.google.common.base.Strings; import com.google.common.io.ByteStreams; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.counter.SparkCounters; -import org.apache.spark.SparkException; +import org.apache.spark.SparkContext$; import org.apache.spark.SparkFiles; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaRDD; @@ -72,6 +71,7 @@ public class TestSparkClient { static { HIVECONF.set("hive.spark.client.connect.timeout", "30000ms"); + HIVECONF.setVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE, HiveConf.HIVE_SPARK_LAUNCHER_CLIENT); } private Map<String, String> createConf() { @@ -82,6 +82,7 @@ public class TestSparkClient { conf.put("spark.app.name", "SparkClientSuite Remote App"); conf.put("spark.driver.extraClassPath", classpath); conf.put("spark.executor.extraClassPath", classpath); + conf.put("spark.testing", "true"); if (!Strings.isNullOrEmpty(System.getProperty("spark.home"))) { conf.put("spark.home", System.getProperty("spark.home")); @@ -342,6 +343,26 @@ public class TestSparkClient { client.stop(); } SparkClientFactory.stop(); + waitForSparkContextShutdown(); + } + } + + /** + * This was added to avoid a race condition where we try to create multiple SparkContexts in + * the same process. Since spark.master = local everything is run in the same JVM. Since we + * don't wait for the RemoteDriver to shutdown it's SparkContext, its possible that we finish a + * test before the SparkContext has been shutdown. In order to avoid the multiple SparkContexts + * in a single JVM exception, we wait for the SparkContext to shutdown after each test. + */ + private void waitForSparkContextShutdown() throws InterruptedException { + for (int i = 0; i < 100; i++) { + if (SparkContext$.MODULE$.getActive().isEmpty()) { + break; + } + Thread.sleep(100); + } + if (!SparkContext$.MODULE$.getActive().isEmpty()) { + throw new IllegalStateException("SparkContext did not shutdown in time"); } }