This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 7aad7d9 [ZEPPELIN-5644] Reduce redundancy during the interpreter execution process (#4294) 7aad7d9 is described below commit 7aad7d92d255cf19a56272b2d3ac40f47373a7ec Author: Leomax_Sun <282130...@qq.com> AuthorDate: Fri Mar 18 07:58:25 2022 +0800 [ZEPPELIN-5644] Reduce redundancy during the interpreter execution process (#4294) --- .../apache/zeppelin/spark/SparkInterpreter.java | 57 +++++++++++++--------- .../zeppelin/spark/SparkInterpreterTest.java | 5 +- .../zeppelin/spark/SparkScala211Interpreter.scala | 1 + .../zeppelin/spark/SparkScala212Interpreter.scala | 1 + .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 3 +- .../remote/RemoteInterpreterServer.java | 19 +++++++- .../remote/RemoteInterpreterServerTest.java | 32 +++++++++++- 7 files changed, 91 insertions(+), 27 deletions(-) diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index fef0998..b13d67f 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -66,6 +66,7 @@ public class SparkInterpreter extends AbstractInterpreter { } private static AtomicInteger SESSION_NUM = new AtomicInteger(0); + private static Class innerInterpreterClazz; private AbstractSparkScalaInterpreter innerInterpreter; private Map<String, String> innerInterpreterClassMap = new HashMap<>(); private SparkContext sc; @@ -151,38 +152,45 @@ public class SparkInterpreter extends AbstractInterpreter { */ private AbstractSparkScalaInterpreter loadSparkScalaInterpreter(SparkConf conf) throws Exception { scalaVersion = extractScalaVersion(conf); - ClassLoader scalaInterpreterClassLoader = Thread.currentThread().getContextClassLoader(); - - String zeppelinHome = System.getenv("ZEPPELIN_HOME"); - if (zeppelinHome != null) { - // ZEPPELIN_HOME is null in yarn-cluster mode, load it directly via current ClassLoader. - // otherwise, load from the specific folder ZEPPELIN_HOME/interpreter/spark/scala-<version> - - File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion); - List<URL> urls = new ArrayList<>(); - for (File file : scalaJarFolder.listFiles()) { - LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: " - + scalaJarFolder); - urls.add(file.toURI().toURL()); + // Make sure the innerInterpreter Class is loaded only once into JVM + // Use double lock to ensure thread safety + if (innerInterpreterClazz == null) { + synchronized (SparkInterpreter.class) { + if (innerInterpreterClazz == null) { + LOGGER.debug("innerInterpreterClazz is null, thread:{}", Thread.currentThread().getName()); + ClassLoader scalaInterpreterClassLoader = Thread.currentThread().getContextClassLoader(); + String zeppelinHome = System.getenv("ZEPPELIN_HOME"); + if (zeppelinHome != null) { + // ZEPPELIN_HOME is null in yarn-cluster mode, load it directly via current ClassLoader. + // otherwise, load from the specific folder ZEPPELIN_HOME/interpreter/spark/scala-<version> + File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion); + List<URL> urls = new ArrayList<>(); + for (File file : scalaJarFolder.listFiles()) { + LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: " + + scalaJarFolder); + urls.add(file.toURI().toURL()); + } + scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]), + Thread.currentThread().getContextClassLoader()); + } + String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion); + innerInterpreterClazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName); + } } - scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]), - Thread.currentThread().getContextClassLoader()); } - - String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion); - Class clazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName); return (AbstractSparkScalaInterpreter) - clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class) - .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader, scalaShellOutputDir); + innerInterpreterClazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class) + .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), innerInterpreterClazz.getClassLoader(), scalaShellOutputDir); } - @Override + @Override public void close() throws InterpreterException { LOGGER.info("Close SparkInterpreter"); if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) { innerInterpreter.close(); - innerInterpreter = null; + innerInterpreterClazz = null; } + innerInterpreter = null; } @Override @@ -306,4 +314,9 @@ public class SparkInterpreter extends AbstractInterpreter { public boolean isUnsupportedSparkVersion() { return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion(); } + + public AbstractSparkScalaInterpreter getInnerInterpreter() { + return innerInterpreter; + } + } diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 7b3d1df..297b55d 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -544,7 +544,7 @@ public class SparkInterpreterTest { } @Test - public void testScopedMode() throws InterpreterException { + public void testScopedMode() throws Exception { Properties properties = new Properties(); properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); @@ -567,6 +567,9 @@ public class SparkInterpreterTest { interpreter1.open(); interpreter2.open(); + // check if there is any duplicated loaded class + assertEquals(true, interpreter1.getInnerInterpreter().getClass()==interpreter2.getInnerInterpreter().getClass()); + InterpreterContext context = getInterpreterContext(); InterpreterResult result1 = interpreter1.interpret("sc.range(1, 10).sum", context); diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index 41470f1..d1a3b08 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -120,6 +120,7 @@ class SparkScala211Interpreter(override val conf: SparkConf, super.close() if (sparkILoop != null) { sparkILoop.closeInterpreter() + sparkILoop = null } } diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index e9c127d..4918e4b 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -111,6 +111,7 @@ class SparkScala212Interpreter(override val conf: SparkConf, super.close() if (sparkILoop != null) { sparkILoop.closeInterpreter() + sparkILoop = null } } diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index f984e15..46100da 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -202,13 +202,14 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } if (sc != null) { sc.stop() + sc = null } - sc = null if (sparkSession != null) { sparkSession.getClass.getMethod("stop").invoke(sparkSession) sparkSession = null } sqlContext = null + z = null } private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: Configuration): Unit = { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index aff8139..6b499fa 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -95,6 +95,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.Optional; import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; @@ -379,6 +380,13 @@ public class RemoteInterpreterServer extends Thread Integer.parseInt(properties.getOrDefault("zeppelin.interpreter.result.cache", "0")); } + boolean isPresent = Optional.ofNullable(interpreterGroup.get(sessionId)).orElse(new ArrayList<>()).stream() + .filter(m -> m.getClassName().equals(className)).findAny().isPresent(); + if (isPresent) { + LOGGER.info("interpreter {} is existing", className); + return; + } + Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className); Properties p = new Properties(); p.putAll(properties); @@ -482,9 +490,18 @@ public class RemoteInterpreterServer extends Thread Iterator<Interpreter> it = interpreters.iterator(); while (it.hasNext()) { Interpreter inp = it.next(); - if (inp.getClassName().equals(className)) { + boolean isOpen = false; + if (inp instanceof LazyOpenInterpreter) { + LazyOpenInterpreter lazy = (LazyOpenInterpreter) inp; + isOpen = lazy.isOpen(); + } + // only remove the open and matched interpreter + if (inp.getClassName().equals(className) && isOpen) { try { + LOGGER.debug("Trying to close interpreter {} with scheduler thread{}", inp.getClassName(), inp.getScheduler().getName()); inp.close(); + // close the thread + inp.getScheduler().stop(); } catch (InterpreterException e) { LOGGER.warn("Fail to close interpreter", e); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java index 20be866..7f6e0e5 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; @@ -119,8 +120,13 @@ public class RemoteInterpreterServerTest { assertEquals(2, interpreter1.getProperties().size()); assertEquals("value_1", interpreter1.getProperty("property_1")); - // create Test2Interpreter in session_1 + // create duplicated Test1Interpreter in session_1 server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(), + intpProperties, "user_1"); + assertEquals(1, server.getInterpreterGroup().get("session_1").size()); + + // create Test2Interpreter in session_1 + server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(), intpProperties, "user_1"); assertEquals(2, server.getInterpreterGroup().get("session_1").size()); @@ -188,9 +194,31 @@ public class RemoteInterpreterServerTest { assertEquals(10, server.getProgress("session_1", Test1Interpreter.class.getName(), intpContext)); - // close + // before close -> thread of Test1Interpreter is running + assertEquals(true, isThreadRunning(interpreter1.getScheduler().getName())); + + // close opened Test1Interpreter -> remove from interpreterGroup server.close("session_1", Test1Interpreter.class.getName()); assertTrue(interpreter1.closed.get()); + assertEquals(1, server.getInterpreterGroup().get("session_1").size()); + + // close unopened Test2Interpreter -> keep in interpreterGroup + server.close("session_1", Test2Interpreter.class.getName()); + assertEquals(1, server.getInterpreterGroup().get("session_1").size()); + + // after close -> thread of Test1Interpreter is not running + assertEquals(false, isThreadRunning(interpreter1.getScheduler().getName())); + } + + private boolean isThreadRunning(String schedulerName) { + boolean res = false; + Set<Thread> threads = Thread.getAllStackTraces().keySet(); + for (Thread t : threads) { + if (!t.getName().contains(schedulerName)) continue; + res = true; + break; + } + return res; } public static class Test1Interpreter extends Interpreter {