This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e5b5b7e [SPARK-32175][CORE] Fix the order between initialization for ExecutorPlugin and starting heartbeat thread e5b5b7e is described below commit e5b5b7e507ab974bbca3abb0bbf56bf67696d53e Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Wed Jul 29 08:44:56 2020 -0500 [SPARK-32175][CORE] Fix the order between initialization for ExecutorPlugin and starting heartbeat thread ### What changes were proposed in this pull request? This PR changes the order between initialization for ExecutorPlugin and starting heartbeat thread in Executor. ### Why are the changes needed? In the current master, heartbeat thread in a executor starts after plugin initialization so if the initialization takes long time, heartbeat is not sent to driver and the executor will be removed from cluster. ### Does this PR introduce _any_ user-facing change? Yes. Plugins for executors will be allowed to take long time for initialization. ### How was this patch tested? New testcase. Closes #29002 from sarutak/fix-heartbeat-issue. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Thomas Graves <tgra...@apache.org> (cherry picked from commit 9be088357eff4328248b29a3a49a816756745345) Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../main/scala/org/apache/spark/TestUtils.scala | 15 ++++- .../scala/org/apache/spark/executor/Executor.scala | 12 ++-- .../org/apache/spark/executor/ExecutorSuite.scala | 72 +++++++++++++++++++++- 3 files changed, 89 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index d459627..1e00769 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -179,11 +179,20 @@ private[spark] object TestUtils { destDir: File, toStringValue: String = "", baseClass: String = null, - classpathUrls: Seq[URL] = Seq.empty): File = { + classpathUrls: Seq[URL] = Seq.empty, + implementsClasses: Seq[String] = Seq.empty, + extraCodeBody: String = ""): File = { val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") + val implementsText = + "implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ") val sourceFile = new JavaSourceFromString(className, - "public class " + className + extendsText + " implements java.io.Serializable {" + - " @Override public String toString() { return \"" + toStringValue + "\"; }}") + s""" + |public class $className $extendsText $implementsText { + | @Override public String toString() { return "$toStringValue"; } + | + | $extraCodeBody + |} + """.stripMargin) createCompiledClass(className, destDir, sourceFile, classpathUrls) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 8aeb16f..e9f1d9c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -153,11 +153,6 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) - // Plugins need to load using a class loader that includes the executor's user classpath - private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) { - PluginContainer(env, resources.asJava) - } - // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. private val maxDirectResultSize = Math.min( @@ -218,6 +213,13 @@ private[spark] class Executor( heartbeater.start() + // Plugins need to load using a class loader that includes the executor's user classpath. + // Plugins also needs to be initialized after the heartbeater started + // to avoid blocking to send heartbeat (see SPARK-32175). + private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) { + PluginContainer(env, resources.asJava) + } + metricsPoller.start() private[executor] def numRunningTasks: Int = runningTasks.size() diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 31049d1..b198448 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.io.{Externalizable, File, ObjectInput, ObjectOutput} import java.lang.Thread.UncaughtExceptionHandler import java.nio.ByteBuffer import java.util.Properties @@ -41,6 +41,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.{SimpleApplicationTest, SparkSubmitSuite} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.memory.TestMemoryManager @@ -52,7 +53,7 @@ import org.apache.spark.scheduler.{DirectTaskResult, FakeTask, ResultTask, Task, import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockManager, BlockManagerId} -import org.apache.spark.util.{LongAccumulator, UninterruptibleThread} +import org.apache.spark.util.{LongAccumulator, UninterruptibleThread, Utils} class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester { @@ -402,6 +403,73 @@ class ExecutorSuite extends SparkFunSuite assert(taskMetrics.getMetricValue("JVMHeapMemory") > 0) } + test("SPARK-32175: Plugin initialization should start after heartbeater started") { + withTempDir { tempDir => + val sparkPluginCodeBody = + """ + |@Override + |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() { + | return new TestExecutorPlugin(); + |} + | + |@Override + |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; } + """.stripMargin + val executorPluginBody = + """ + |@Override + |public void init( + | org.apache.spark.api.plugin.PluginContext ctx, + | java.util.Map<String, String> extraConf) { + | try { + | Thread.sleep(8 * 1000); + | } catch (InterruptedException e) { + | throw new RuntimeException(e); + | } + |} + """.stripMargin + + val compiledExecutorPlugin = TestUtils.createCompiledClass( + "TestExecutorPlugin", + tempDir, + "", + null, + Seq.empty, + Seq("org.apache.spark.api.plugin.ExecutorPlugin"), + executorPluginBody) + + val thisClassPath = + sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL) + val compiledSparkPlugin = TestUtils.createCompiledClass( + "TestSparkPlugin", + tempDir, + "", + null, + Seq(tempDir.toURI.toURL) ++ thisClassPath, + Seq("org.apache.spark.api.plugin.SparkPlugin"), + sparkPluginCodeBody) + + val jarUrl = TestUtils.createJar( + Seq(compiledSparkPlugin, compiledExecutorPlugin), + new File(tempDir, "testPlugin.jar")) + + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local-cluster[1,1,1024]", + "--conf", "spark.plugins=TestSparkPlugin", + "--conf", "spark.storage.blockManagerSlaveTimeoutMs=" + 5 * 1000, + "--conf", "spark.network.timeoutInterval=" + 1000, + "--conf", "spark.executor.heartbeatInterval=" + 1000, + "--conf", "spark.executor.extraClassPath=" + jarUrl.toString, + "--conf", "spark.driver.extraClassPath=" + jarUrl.toString, + "--conf", "spark.ui.enabled=false", + unusedJar.toString) + SparkSubmitSuite.runSparkSubmit(args, timeout = 30.seconds) + } + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org