Repository: hive Updated Branches: refs/heads/master 7cf791472 -> 7795c0a7d
HIVE-19008: Improve Spark session id logging (Sahil Takiar, reviewed by Aihua Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7795c0a7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7795c0a7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7795c0a7 Branch: refs/heads/master Commit: 7795c0a7dc59941671f8845d78b16d9e5ddc9ea3 Parents: 7cf7914 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Sun Apr 1 21:21:43 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Mon Aug 6 11:34:00 2018 +0200 ---------------------------------------------------------------------- .../ql/exec/spark/HiveSparkClientFactory.java | 13 +++--- .../ql/exec/spark/session/SparkSessionImpl.java | 11 ++--- .../spark/session/SparkSessionManagerImpl.java | 3 +- .../hadoop/hive/ql/session/SessionState.java | 6 +++ .../session/TestSparkSessionManagerImpl.java | 43 ++++++++++++++++++-- 5 files changed, 58 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 5ed5d42..0aae0d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -62,8 +62,9 @@ public class HiveSparkClientFactory { @VisibleForTesting public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf"; - public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId) throws Exception { - Map<String, String> sparkConf = initiateSparkConf(hiveconf, sessionId); + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sparkSessionId, + String hiveSessionId) throws Exception { + Map<String, String> sparkConf = initiateSparkConf(hiveconf, hiveSessionId); // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. @@ -72,11 +73,11 @@ public class HiveSparkClientFactory { // With local spark context, all user sessions share the same spark context. return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf), hiveconf); } else { - return new RemoteHiveSparkClient(hiveconf, sparkConf, sessionId); + return new RemoteHiveSparkClient(hiveconf, sparkConf, hiveSessionId + "_" + sparkSessionId); } } - public static Map<String, String> initiateSparkConf(HiveConf hiveConf, String sessionId) { + public static Map<String, String> initiateSparkConf(HiveConf hiveConf, String hiveSessionId) { Map<String, String> sparkConf = new HashMap<String, String>(); HBaseConfiguration.addHbaseResources(hiveConf); @@ -84,9 +85,9 @@ public class HiveSparkClientFactory { sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); final String appNameKey = "spark.app.name"; String appName = hiveConf.get(appNameKey); - final String sessionIdString = " (sessionId = " + sessionId + ")"; + final String sessionIdString = " (hiveSessionId = " + hiveSessionId + ")"; if (appName == null) { - if (sessionId == null) { + if (hiveSessionId == null) { appName = SPARK_DEFAULT_APP_NAME; } else { appName = SPARK_DEFAULT_APP_NAME + sessionIdString; http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 0f2f031..2015810 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -72,8 +72,8 @@ public class SparkSessionImpl implements SparkSession { private Path scratchDir; private final Object dirLock = new Object(); - public SparkSessionImpl() { - sessionId = makeSessionId(); + SparkSessionImpl(String sessionId) { + this.sessionId = sessionId; initErrorPatterns(); } @@ -83,7 +83,8 @@ public class SparkSessionImpl implements SparkSession { this.conf = conf; isOpen = true; try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, + SessionState.get().getSessionId()); } catch (Throwable e) { // It's possible that user session is closed while creating Spark client. HiveException he; @@ -260,10 +261,6 @@ public class SparkSessionImpl implements SparkSession { return scratchDir; } - public static String makeSessionId() { - return UUID.randomUUID().toString(); - } - @VisibleForTesting HiveSparkClient getHiveSparkClient() { return hiveSparkClient; http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 46cee0d..68c9e04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +111,7 @@ public class SparkSessionManagerImpl implements SparkSessionManager { return existingSession; } - SparkSession sparkSession = new SparkSessionImpl(); + SparkSession sparkSession = new SparkSessionImpl(SessionState.get().getNewSparkSessionId()); if (doOpen) { sparkSession.open(conf); } http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 262bbb9..71e130b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -43,6 +43,7 @@ import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; @@ -315,6 +316,8 @@ public class SessionState { private List<Closeable> cleanupItems = new LinkedList<Closeable>(); + private final AtomicLong sparkSessionId = new AtomicLong(); + public HiveConf getConf() { return sessionConf; } @@ -2059,6 +2062,9 @@ public class SessionState { return currentFunctionsInUse; } + public String getNewSparkSessionId() { + return Long.toString(this.sparkSessionId.getAndIncrement()); + } } class ResourceMaps { http://git-wip-us.apache.org/repos/asf/hive/blob/7795c0a7/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 6964764..853e4f4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hive.ql.exec.spark.session; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; + +import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -34,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.spark.SparkConf; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -41,10 +46,17 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class TestSparkSessionManagerImpl { + private static final Logger LOG = LoggerFactory.getLogger(TestSparkSessionManagerImpl.class); private SparkSessionManagerImpl sessionManagerHS2 = null; private boolean anyFailedSessionThread; // updated only when a thread has failed. + private static HiveConf SESSION_HIVE_CONF = new HiveConf(); + + @BeforeClass + public static void setup() { + SessionState.start(SESSION_HIVE_CONF); + } /** Tests CLI scenario where we get a single session and use it multiple times. */ @@ -82,7 +94,7 @@ public class TestSparkSessionManagerImpl { List<Thread> threadList = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { - Thread t = new Thread(new SessionThread(), "Session thread " + i); + Thread t = new Thread(new SessionThread(SessionState.get()), "Session thread " + i); t.start(); threadList.add(t); } @@ -185,6 +197,23 @@ public class TestSparkSessionManagerImpl { "java.lang.NoClassDefFoundError: org/apache/spark/SparkConf"); } + @Test + public void testGetSessionId() throws HiveException { + SessionState ss = SessionState.start(SESSION_HIVE_CONF); + SparkSessionManager ssm = SparkSessionManagerImpl.getInstance(); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("0", ss.getSparkSession().getSessionId()); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("1", ss.getSparkSession().getSessionId()); + + ss = SessionState.start(SESSION_HIVE_CONF); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("0", ss.getSparkSession().getSessionId()); + } + private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) { checkHiveException(ss, e, expectedErrMsg, null); } @@ -220,10 +249,16 @@ public class TestSparkSessionManagerImpl { /* Thread simulating a user session in HiveServer2. */ public class SessionThread implements Runnable { + private final SessionState ss; + + private SessionThread(SessionState ss) { + this.ss = ss; + } @Override public void run() { try { + SessionState.setCurrentSessionState(ss); Random random = new Random(Thread.currentThread().getId()); String threadName = Thread.currentThread().getName(); System.out.println(threadName + " started.");