Repository: hive Updated Branches: refs/heads/master 219538701 -> 189d3fec2
HIVE-14162: Allow disabling of long running job on Hive On Spark On YARN (Sahil Takiar, reviewed by Adam Szita) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/189d3fec Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/189d3fec Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/189d3fec Branch: refs/heads/master Commit: 189d3fec25dfb94b209b1a34c1be674ce9d85bc5 Parents: 2195387 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Mon Jul 16 10:26:21 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Fri Aug 31 15:42:57 2018 -0500 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 6 + .../ql/exec/spark/TestSparkSessionTimeout.java | 145 +++++++++++ .../java/org/apache/hadoop/hive/ql/Driver.java | 10 + .../ql/exec/spark/session/SparkSession.java | 27 ++ .../ql/exec/spark/session/SparkSessionImpl.java | 246 ++++++++++++++----- .../spark/session/SparkSessionManagerImpl.java | 63 +++-- 6 files changed, 423 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8c39de3..40ea3ac 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4311,6 +4311,12 @@ public class HiveConf extends Configuration { "specified (default) then the spark-submit shell script is used to launch the Spark " + "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " + "InProcessLauncher is used to programmatically launch the app."), + SPARK_SESSION_TIMEOUT("hive.spark.session.timeout", "30m", new TimeValidator(TimeUnit.MINUTES, + 30L, true, null, true), "Amount of time the Spark Remote Driver should wait for " + + " a Spark job to be submitted before shutting down. Minimum value is 30 minutes"), + SPARK_SESSION_TIMEOUT_PERIOD("hive.spark.session.timeout.period", "60s", + new TimeValidator(TimeUnit.SECONDS, 60L, true, null, true), + "How frequently to check for idle Spark sessions. Minimum value is 60 seconds."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java new file mode 100644 index 0000000..c887297 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.net.MalformedURLException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class TestSparkSessionTimeout { + + @Test + public void testSparkSessionTimeout() throws HiveException, InterruptedException, MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + + HiveConf conf = new HiveConf(); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testSparkSessionTimeout-local-dir").toString()); + + SessionState.start(conf); + + runTestSparkSessionTimeout(conf); + } + + @Test + public void testMultiSessionSparkSessionTimeout() throws InterruptedException, + ExecutionException { + List<Future<Void>> futures = new ArrayList<>(); + ExecutorService es = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + futures.add(es.submit(() -> { + String confDir = "../../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + + HiveConf conf = new HiveConf(); + conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); + + SessionState.start(conf); + + runTestSparkSessionTimeout(conf); + return null; + })); + } + for (Future<Void> future : futures) { + future.get(); + } + } + + @Test + public void testMultiSparkSessionTimeout() throws ExecutionException, InterruptedException { + List<Future<Void>> futures = new ArrayList<>(); + ExecutorService es = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + futures.add(es.submit(() -> { + String confDir = "../../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + + HiveConf conf = new HiveConf(); + conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); + + SessionState.start(conf); + + runTestSparkSessionTimeout(conf); + return null; + })); + } + for (Future<Void> future : futures) { + future.get(); + } + } + + private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException, + InterruptedException { + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "5s"); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s"); + + String tableName = "test" + UUID.randomUUID().toString().replace("-", ""); + + Driver driver = null; + + try { + driver = new Driver(new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf).build(), + null, null); + + SparkSession sparkSession = SparkUtilities.getSparkSession(conf, SparkSessionManagerImpl + .getInstance()); + + Assert.assertEquals(0, + driver.run("create table " + tableName + " (col int)").getResponseCode()); + Assert.assertEquals(0, + driver.run("select * from " + tableName + " order by col").getResponseCode()); + + Thread.sleep(10000); + + Assert.assertFalse(sparkSession.isOpen()); + + Assert.assertEquals(0, + driver.run("select * from " + tableName + " order by col").getResponseCode()); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists " + tableName).getResponseCode()); + driver.destroy(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 737debd..dad2035 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.Entity.Type; @@ -544,6 +545,11 @@ public class Driver implements IDriver { String queryId = queryState.getQueryId(); + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQuerySubmission(queryId); + } + if (ctx != null) { setTriggerContext(queryId); } @@ -2570,6 +2576,10 @@ public class Driver implements IDriver { queryState.setNumModifiedRows(numModifiedRows); console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); } + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQueryCompletion(queryId); + } lDrvState.stateLock.lock(); try { lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java index f96a8f7..62f88c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.plan.SparkWork; import java.io.IOException; public interface SparkSession { + /** * Initializes a Spark session for DAG execution. * @param conf Hive configuration. @@ -75,4 +76,30 @@ public interface SparkSession { * Get an HDFS dir specific to the SparkSession * */ Path getHDFSSessionDir() throws IOException; + + /** + * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a + * query has completed. + * + * @param queryId the id of the query that completed + */ + void onQueryCompletion(String queryId); + + /** + * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a + * query has been submitted. + * + * @param queryId the id of the query that completed + */ + void onQuerySubmission(String queryId); + + /** + * Checks if a session has timed out, and closes if the session if the timeout has occurred; + * returns true if the session timed out, and false otherwise. + * + * @param sessionTimeout the session timeout + * + * @return true if the session timed out and was closed, false otherwise + */ + boolean triggerTimeout(long sessionTimeout); } http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 2015810..6a8b42e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.exec.spark.session; import java.io.IOException; import java.util.Map; -import java.util.UUID; +import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -28,13 +30,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.session.SessionState; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; @@ -43,12 +49,31 @@ import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.SparkWork; + import org.apache.spark.SparkConf; import org.apache.spark.util.Utils; import com.google.common.base.Preconditions; +/** + * Implementation of {@link SparkSession} that treats each Spark session as a separate Spark + * application. + * + * <p> + * It uses a {@link HiveSparkClient} to submit a Spark application and to submit Spark jobs to + * the Spark app. + * </p> + * + * <p> + * This class contains logic to trigger a timeout of this {@link SparkSession} if certain + * conditions are met (e.g. a job hasn't been submitted in the past "x" seconds). Since we use + * a threadpool to schedule a task that regularly checks if a session has timed out, we need to + * properly synchronize the {@link #open(HiveConf)} and {@link #close()} methods. We use a + * series of volatile variables and read-write locks to ensure this. + * </p> + */ public class SparkSessionImpl implements SparkSession { + private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class); private static final String SPARK_DIR = "_spark_session_dir"; @@ -65,13 +90,33 @@ public class SparkSessionImpl implements SparkSession { /** Pre-compiled error patterns. Shared between all Spark sessions */ private static Map<String, Pattern> errorPatterns; - private HiveConf conf; - private boolean isOpen; + // Several of the following variables need to be volatile so they can be accessed by the timeout + // thread + + private volatile HiveConf conf; + private volatile boolean isOpen; private final String sessionId; - private HiveSparkClient hiveSparkClient; - private Path scratchDir; + private volatile HiveSparkClient hiveSparkClient; + private volatile Path scratchDir; private final Object dirLock = new Object(); + /** + * The timestamp of the last completed Spark job. + */ + private volatile long lastSparkJobCompletionTime; + + /** + * A {@link Set} of currently running queries. Each job is identified by its query id. + */ + private final Set<String> activeJobs = Sets.newConcurrentHashSet(); + + /** + * True if at least a single query has been run by this session, false otherwise. + */ + private volatile boolean queryCompleted; + + private ReadWriteLock closeLock = new ReentrantReadWriteLock(); + SparkSessionImpl(String sessionId) { this.sessionId = sessionId; initErrorPatterns(); @@ -79,67 +124,87 @@ public class SparkSessionImpl implements SparkSession { @Override public void open(HiveConf conf) throws HiveException { - LOG.info("Trying to open Hive on Spark session {}", sessionId); - this.conf = conf; - isOpen = true; + closeLock.readLock().lock(); try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, + LOG.info("Trying to open Hive on Spark session {}", sessionId); + this.conf = conf; + isOpen = true; + try { + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, SessionState.get().getSessionId()); - } catch (Throwable e) { - // It's possible that user session is closed while creating Spark client. - HiveException he; - if (isOpen) { - he = getHiveException(e); - } else { - he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + } catch (Throwable e) { + // It's possible that user session is closed while creating Spark client. + HiveException he; + if (isOpen) { + he = getHiveException(e); + } else { + he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + } + throw he; } - throw he; + LOG.info("Hive on Spark session {} successfully opened", sessionId); + } finally { + closeLock.readLock().unlock(); } - LOG.info("Hive on Spark session {} successfully opened", sessionId); } @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { - Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); - return hiveSparkClient.execute(driverContext, sparkWork); + closeLock.readLock().lock(); + try { + Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); + return hiveSparkClient.execute(driverContext, sparkWork); + } finally { + closeLock.readLock().unlock(); + } } @Override public ObjectPair<Long, Integer> getMemoryAndCores() throws Exception { - SparkConf sparkConf = hiveSparkClient.getSparkConf(); - int numExecutors = hiveSparkClient.getExecutorCount(); - // at start-up, we may be unable to get number of executors - if (numExecutors <= 0) { - return new ObjectPair<Long, Integer>(-1L, -1); - } - int executorMemoryInMB = Utils.memoryStringToMb( - sparkConf.get("spark.executor.memory", "512m")); - double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); - long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); - int totalCores; - String masterURL = sparkConf.get("spark.master"); - if (masterURL.startsWith("spark") || masterURL.startsWith("local")) { - totalCores = sparkConf.contains("spark.default.parallelism") ? - sparkConf.getInt("spark.default.parallelism", 1) : - hiveSparkClient.getDefaultParallelism(); - totalCores = Math.max(totalCores, numExecutors); - } else { - int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); - totalCores = numExecutors * coresPerExecutor; + closeLock.readLock().lock(); + try { + SparkConf sparkConf = hiveSparkClient.getSparkConf(); + int numExecutors = hiveSparkClient.getExecutorCount(); + // at start-up, we may be unable to get number of executors + if (numExecutors <= 0) { + return new ObjectPair<Long, Integer>(-1L, -1); + } + int executorMemoryInMB = Utils.memoryStringToMb( + sparkConf.get("spark.executor.memory", "512m")); + double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); + long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); + int totalCores; + String masterURL = sparkConf.get("spark.master"); + if (masterURL.startsWith("spark") || masterURL.startsWith("local")) { + totalCores = sparkConf.contains("spark.default.parallelism") ? + sparkConf.getInt("spark.default.parallelism", 1) : + hiveSparkClient.getDefaultParallelism(); + totalCores = Math.max(totalCores, numExecutors); + } else { + int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); + totalCores = numExecutors * coresPerExecutor; + } + totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); + + long memoryPerTaskInBytes = totalMemory / totalCores; + LOG.info("Hive on Spark application currently has number of executors: " + numExecutors + + ", total cores: " + totalCores + ", memory per executor: " + + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); + return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes), + Integer.valueOf(totalCores)); + } finally { + closeLock.readLock().unlock(); } - totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); - - long memoryPerTaskInBytes = totalMemory / totalCores; - LOG.info("Hive on Spark application currently has number of executors: " + numExecutors - + ", total cores: " + totalCores + ", memory per executor: " - + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); - return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes), - Integer.valueOf(totalCores)); } @Override public boolean isOpen() { - return isOpen; + closeLock.readLock().lock(); + try { + return isOpen; + } finally { + closeLock.readLock().unlock(); + } } @Override @@ -154,18 +219,29 @@ public class SparkSessionImpl implements SparkSession { @Override public void close() { - LOG.info("Trying to close Hive on Spark session {}", sessionId); - isOpen = false; - if (hiveSparkClient != null) { + if (isOpen) { + closeLock.writeLock().lock(); try { - hiveSparkClient.close(); - LOG.info("Hive on Spark session {} successfully closed", sessionId); - cleanScratchDir(); - } catch (IOException e) { - LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e); + if (isOpen) { + LOG.info("Trying to close Hive on Spark session {}", sessionId); + isOpen = false; + if (hiveSparkClient != null) { + try { + hiveSparkClient.close(); + LOG.info("Hive on Spark session {} successfully closed", sessionId); + cleanScratchDir(); + } catch (IOException e) { + LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e); + } + } + hiveSparkClient = null; + queryCompleted = false; + lastSparkJobCompletionTime = 0; + } + } finally { + closeLock.writeLock().unlock(); } } - hiveSparkClient = null; } private Path createScratchDir() throws IOException { @@ -261,6 +337,60 @@ public class SparkSessionImpl implements SparkSession { return scratchDir; } + @Override + public void onQuerySubmission(String queryId) { + activeJobs.add(queryId); + } + + /** + * Check if a session has timed out, and if it has close the session. + */ + @Override + public boolean triggerTimeout(long sessionTimeout) { + if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { + closeLock.writeLock().lock(); + try { + if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { + LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " + + "been run in the past " + sessionTimeout / 1000 + " seconds"); + close(); + return true; + } + } finally { + closeLock.writeLock().unlock(); + } + } + return false; + } + + /** + * Returns true if a session has timed out, false otherwise. The following conditions must be met + * in order to consider a session as timed out: (1) the session must have run at least one + * query, (2) there can be no actively running Spark jobs, and (3) the last completed Spark job + * must have been more than sessionTimeout seconds ago. + */ + private static boolean hasTimedOut(boolean queryCompleted, Set<String> activeJobs, + long lastSparkJobCompletionTime, long sessionTimeout) { + return queryCompleted && + activeJobs.isEmpty() && + lastSparkJobCompletionTime > 0 && + (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout; + } + + /** + * When this session completes the execution of a query, set the {@link #queryCompleted} flag + * to true if it hasn't already been set, remove the query from the list of actively running jobs, + * and set the {@link #lastSparkJobCompletionTime} to the current timestamp. + */ + @Override + public void onQueryCompletion(String queryId) { + if (!queryCompleted) { + queryCompleted = true; + } + activeJobs.remove(queryId); + lastSparkJobCompletionTime = System.currentTimeMillis(); + } + @VisibleForTesting HiveSparkClient getHiveSparkClient() { return hiveSparkClient; http://git-wip-us.apache.org/repos/asf/hive/blob/189d3fec/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 68c9e04..79a56bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -18,19 +18,23 @@ package org.apache.hadoop.hive.ql.exec.spark.session; import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Sets; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hive.spark.client.SparkClientFactory; /** @@ -42,8 +46,16 @@ import org.apache.hive.spark.client.SparkClientFactory; public class SparkSessionManagerImpl implements SparkSessionManager { private static final Logger LOG = LoggerFactory.getLogger(SparkSessionManagerImpl.class); - private Set<SparkSession> createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>()); + private final Set<SparkSession> createdSessions = Sets.newConcurrentHashSet(); + + /** + * A {@link Future} that tracks the status of the scheduled time out thread launched via the + * {@link #startTimeoutThread()} method. + */ + private volatile Future<?> timeoutFuture; + private volatile boolean inited = false; + private volatile HiveConf conf; private static SparkSessionManagerImpl instance; @@ -80,9 +92,11 @@ public class SparkSessionManagerImpl implements SparkSessionManager { synchronized (this) { if (!inited) { LOG.info("Setting up the session manager."); - Map<String, String> conf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); + conf = hiveConf; + startTimeoutThread(); + Map<String, String> sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); try { - SparkClientFactory.initialize(conf); + SparkClientFactory.initialize(sparkConf); inited = true; } catch (IOException e) { throw new HiveException("Error initializing SparkClientFactory", e); @@ -136,7 +150,7 @@ public class SparkSessionManagerImpl implements SparkSessionManager { } if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Closing session (%s).", sparkSession.getSessionId())); + LOG.debug(String.format("Closing Spark session (%s).", sparkSession.getSessionId())); } sparkSession.close(); createdSessions.remove(sparkSession); @@ -145,15 +159,32 @@ public class SparkSessionManagerImpl implements SparkSessionManager { @Override public void shutdown() { LOG.info("Closing the session manager."); - synchronized (createdSessions) { - Iterator<SparkSession> it = createdSessions.iterator(); - while (it.hasNext()) { - SparkSession session = it.next(); - session.close(); - } - createdSessions.clear(); + if (timeoutFuture != null) { + timeoutFuture.cancel(false); } + createdSessions.forEach(SparkSession::close); + createdSessions.clear(); inited = false; SparkClientFactory.stop(); } + + /** + * Starts a scheduled thread that periodically calls {@link SparkSession#triggerTimeout(long)} + * on each {@link SparkSession} managed by this class. + */ + private void startTimeoutThread() { + long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, + TimeUnit.MILLISECONDS); + ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); + + // Schedules a thread that does the following: iterates through all the active SparkSessions + // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then + // the SparkSession is removed from the set of active sessions managed by this class. + timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream() + .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) + .forEach(createdSessions::remove), + 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); + } }