This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e28622  [SPARK-28211][CORE][SHUFFLE] Propose Shuffle Driver 
Components API
2e28622 is described below

commit 2e28622d8aeb9ce2460e803bb7d994196bcc0253
Author: Yifei Huang <yif...@palantir.com>
AuthorDate: Tue Oct 15 12:26:49 2019 -0500

    [SPARK-28211][CORE][SHUFFLE] Propose Shuffle Driver Components API
    
    ### What changes were proposed in this pull request?
    
    This is the next step of the Spark-25299 work of proposing a new Shuffle 
storage API. This patch includes the components of the plugin that hook into 
the driver, including driver shuffle initialization, application cleanup, and 
shuffle cleanup.
    
    ### How was this patch tested?
    Existing unit tests, plus an additional test for testing the interactions 
between the driver and executor initialization.
    
    Closes #25823 from yifeih/yh/upstream/driver-lifecycle.
    
    Lead-authored-by: Yifei Huang <yif...@palantir.com>
    Co-authored-by: mccheah <mch...@palantir.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../apache/spark/shuffle/api/ShuffleDataIO.java    |  6 ++
 .../spark/shuffle/api/ShuffleDriverComponents.java | 64 +++++++++++++++
 .../shuffle/api/ShuffleExecutorComponents.java     | 12 ++-
 .../shuffle/sort/io/LocalDiskShuffleDataIO.java    |  8 +-
 ....java => LocalDiskShuffleDriverComponents.java} | 35 +++++---
 .../io/LocalDiskShuffleExecutorComponents.java     |  7 +-
 .../scala/org/apache/spark/ContextCleaner.scala    |  8 +-
 .../main/scala/org/apache/spark/Dependency.scala   |  1 +
 .../main/scala/org/apache/spark/SparkContext.scala | 17 +++-
 .../apache/spark/shuffle/ShuffleDataIOUtils.scala  | 42 ++++++++++
 .../spark/shuffle/sort/SortShuffleManager.scala    | 15 ++--
 .../apache/spark/InternalAccumulatorSuite.scala    |  3 +-
 .../shuffle/ShuffleDriverComponentsSuite.scala     | 94 ++++++++++++++++++++++
 13 files changed, 281 insertions(+), 31 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java 
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
index e9e50ec..e4554bd 100644
--- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
+++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
@@ -46,4 +46,10 @@ public interface ShuffleDataIO {
    * are only invoked on the executors.
    */
   ShuffleExecutorComponents executor();
+
+  /**
+   * Called once on driver process to bootstrap the shuffle metadata modules 
that
+   * are maintained by the driver.
+   */
+  ShuffleDriverComponents driver();
 }
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java 
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
new file mode 100644
index 0000000..b4cec17
--- /dev/null
+++ 
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.api;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * :: Private ::
+ * An interface for building shuffle support modules for the Driver.
+ */
+@Private
+public interface ShuffleDriverComponents {
+
+  /**
+   * Called once in the driver to bootstrap this module that is specific to 
this application.
+   * This method is called before submitting executor requests to the cluster 
manager.
+   *
+   * This method should prepare the module with its shuffle components i.e. 
registering against
+   * an external file servers or shuffle services, or creating tables in a 
shuffle
+   * storage data database.
+   *
+   * @return additional SparkConf settings necessary for initializing the 
executor components.
+   * This would include configurations that cannot be statically set on the 
application, like
+   * the host:port of external services for shuffle storage.
+   */
+  Map<String, String> initializeApplication();
+
+  /**
+   * Called once at the end of the Spark application to clean up any existing 
shuffle state.
+   */
+  void cleanupApplication();
+
+  /**
+   * Called once per shuffle id when the shuffle id is first generated for a 
shuffle stage.
+   *
+   * @param shuffleId The unique identifier for the shuffle stage.
+   */
+  default void registerShuffle(int shuffleId) {}
+
+  /**
+   * Removes shuffle data associated with the given shuffle.
+   *
+   * @param shuffleId The unique identifier for the shuffle stage.
+   * @param blocking Whether this call should block on the deletion of the 
data.
+   */
+  default void removeShuffle(int shuffleId, boolean blocking) {}
+}
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
 
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
index d30f3da..30ca177 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
@@ -18,6 +18,7 @@
 package org.apache.spark.shuffle.api;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.spark.annotation.Private;
@@ -34,21 +35,26 @@ public interface ShuffleExecutorComponents {
   /**
    * Called once per executor to bootstrap this module with state that is 
specific to
    * that executor, specifically the application ID and executor ID.
+   *
+   * @param appId The Spark application id
+   * @param execId The unique identifier of the executor being initialized
+   * @param extraConfigs Extra configs that were returned by
+   *                     {@link 
ShuffleDriverComponents#initializeApplication()}
    */
-  void initializeExecutor(String appId, String execId);
+  void initializeExecutor(String appId, String execId, Map<String, String> 
extraConfigs);
 
   /**
    * Called once per map task to create a writer that will be responsible for 
persisting all the
    * partitioned bytes written by that map task.
    *
    * @param shuffleId Unique identifier for the shuffle the map task is a part 
of
-   * @param mapId An ID of the map task. The ID is unique within this Spark 
application.
+   * @param mapTaskId An ID of the map task. The ID is unique within this 
Spark application.
    * @param numPartitions The number of partitions that will be written by the 
map task. Some of
    *                      these partitions may be empty.
    */
   ShuffleMapOutputWriter createMapOutputWriter(
       int shuffleId,
-      long mapId,
+      long mapTaskId,
       int numPartitions) throws IOException;
 
   /**
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
index cabcb17..50eb2f1 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
@@ -18,8 +18,9 @@
 package org.apache.spark.shuffle.sort.io;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
 import org.apache.spark.shuffle.api.ShuffleDataIO;
+import org.apache.spark.shuffle.api.ShuffleDriverComponents;
+import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
 
 /**
  * Implementation of the {@link ShuffleDataIO} plugin system that replicates 
the local shuffle
@@ -37,4 +38,9 @@ public class LocalDiskShuffleDataIO implements ShuffleDataIO {
   public ShuffleExecutorComponents executor() {
     return new LocalDiskShuffleExecutorComponents(sparkConf);
   }
+
+  @Override
+  public ShuffleDriverComponents driver() {
+    return new LocalDiskShuffleDriverComponents();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java
similarity index 50%
copy from 
core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
copy to 
core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java
index cabcb17..92b4b31 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java
@@ -17,24 +17,33 @@
 
 package org.apache.spark.shuffle.sort.io;
 
-import org.apache.spark.SparkConf;
-import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
-import org.apache.spark.shuffle.api.ShuffleDataIO;
+import java.util.Collections;
+import java.util.Map;
 
-/**
- * Implementation of the {@link ShuffleDataIO} plugin system that replicates 
the local shuffle
- * storage and index file functionality that has historically been used from 
Spark 2.4 and earlier.
- */
-public class LocalDiskShuffleDataIO implements ShuffleDataIO {
+import org.apache.spark.SparkEnv;
+import org.apache.spark.shuffle.api.ShuffleDriverComponents;
+import org.apache.spark.storage.BlockManagerMaster;
+
+public class LocalDiskShuffleDriverComponents implements 
ShuffleDriverComponents {
+
+  private BlockManagerMaster blockManagerMaster;
 
-  private final SparkConf sparkConf;
+  @Override
+  public Map<String, String> initializeApplication() {
+    blockManagerMaster = SparkEnv.get().blockManager().master();
+    return Collections.emptyMap();
+  }
 
-  public LocalDiskShuffleDataIO(SparkConf sparkConf) {
-    this.sparkConf = sparkConf;
+  @Override
+  public void cleanupApplication() {
+    // nothing to clean up
   }
 
   @Override
-  public ShuffleExecutorComponents executor() {
-    return new LocalDiskShuffleExecutorComponents(sparkConf);
+  public void removeShuffle(int shuffleId, boolean blocking) {
+    if (blockManagerMaster == null) {
+      throw new IllegalStateException("Driver components must be initialized 
before using");
+    }
+    blockManagerMaster.removeShuffle(shuffleId, blocking);
   }
 }
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
index a0c7d3c..eb4d9d9 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle.sort.io;
 
+import java.util.Map;
 import java.util.Optional;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -50,7 +51,7 @@ public class LocalDiskShuffleExecutorComponents implements 
ShuffleExecutorCompon
   }
 
   @Override
-  public void initializeExecutor(String appId, String execId) {
+  public void initializeExecutor(String appId, String execId, Map<String, 
String> extraConfigs) {
     blockManager = SparkEnv.get().blockManager();
     if (blockManager == null) {
       throw new IllegalStateException("No blockManager available from the 
SparkEnv.");
@@ -61,14 +62,14 @@ public class LocalDiskShuffleExecutorComponents implements 
ShuffleExecutorCompon
   @Override
   public ShuffleMapOutputWriter createMapOutputWriter(
       int shuffleId,
-      long mapId,
+      long mapTaskId,
       int numPartitions) {
     if (blockResolver == null) {
       throw new IllegalStateException(
           "Executor components must be initialized before getting writers.");
     }
     return new LocalDiskShuffleMapOutputWriter(
-        shuffleId, mapId, numPartitions, blockResolver, sparkConf);
+        shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf);
   }
 
   @Override
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index dfbd7d1..9506c36 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -27,6 +27,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
+import org.apache.spark.shuffle.api.ShuffleDriverComponents
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, 
Utils}
 
 /**
@@ -58,7 +59,9 @@ private class CleanupTaskWeakReference(
  * to be processed when the associated object goes out of scope of the 
application. Actual
  * cleanup is performed in a separate daemon thread.
  */
-private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
+private[spark] class ContextCleaner(
+    sc: SparkContext,
+    shuffleDriverComponents: ShuffleDriverComponents) extends Logging {
 
   /**
    * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage 
collected as long as they
@@ -221,7 +224,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
     try {
       logDebug("Cleaning shuffle " + shuffleId)
       mapOutputTrackerMaster.unregisterShuffle(shuffleId)
-      blockManagerMaster.removeShuffle(shuffleId, blocking)
+      shuffleDriverComponents.removeShuffle(shuffleId, blocking)
       listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
       logDebug("Cleaned shuffle " + shuffleId)
     } catch {
@@ -269,7 +272,6 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
     }
   }
 
-  private def blockManagerMaster = sc.env.blockManager.master
   private def broadcastManager = sc.env.broadcastManager
   private def mapOutputTrackerMaster = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
 }
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index f0ac9ac..ba8e4d6 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -96,6 +96,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
     shuffleId, this)
 
   _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
 }
 
 
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4792c0a..2db8809 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -58,6 +58,8 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.spark.shuffle.ShuffleDataIOUtils
+import org.apache.spark.shuffle.api.ShuffleDriverComponents
 import org.apache.spark.status.{AppStatusSource, AppStatusStore}
 import org.apache.spark.status.api.v1.ThreadStackTrace
 import org.apache.spark.storage._
@@ -217,6 +219,7 @@ class SparkContext(config: SparkConf) extends Logging {
   private var _statusStore: AppStatusStore = _
   private var _heartbeater: Heartbeater = _
   private var _resources: scala.collection.immutable.Map[String, 
ResourceInformation] = _
+  private var _shuffleDriverComponents: ShuffleDriverComponents = _
 
   /* 
-------------------------------------------------------------------------------------
 *
    | Accessors and public fields. These provide access to the internal state 
of the        |
@@ -319,6 +322,8 @@ class SparkContext(config: SparkConf) extends Logging {
     _dagScheduler = ds
   }
 
+  private[spark] def shuffleDriverComponents: ShuffleDriverComponents = 
_shuffleDriverComponents
+
   /**
    * A unique identifier for the Spark application.
    * Its format depends on the scheduler implementation.
@@ -524,6 +529,11 @@ class SparkContext(config: SparkConf) extends Logging {
     executorEnvs ++= _conf.getExecutorEnv
     executorEnvs("SPARK_USER") = sparkUser
 
+    _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
+    _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
+      _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
+    }
+
     // We need to register "HeartbeatReceiver" before "createTaskScheduler" 
because Executor will
     // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
     _heartbeatReceiver = env.rpcEnv.setupEndpoint(
@@ -576,7 +586,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     _cleaner =
       if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
-        Some(new ContextCleaner(this))
+        Some(new ContextCleaner(this, _shuffleDriverComponents))
       } else {
         None
       }
@@ -1975,6 +1985,11 @@ class SparkContext(config: SparkConf) extends Logging {
       }
       _heartbeater = null
     }
+    if (_shuffleDriverComponents != null) {
+      Utils.tryLogNonFatalError {
+        _shuffleDriverComponents.cleanupApplication()
+      }
+    }
     if (env != null && _heartbeatReceiver != null) {
       Utils.tryLogNonFatalError {
         env.rpcEnv.stop(_heartbeatReceiver)
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala
new file mode 100644
index 0000000..e9507a7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS
+import org.apache.spark.shuffle.api.ShuffleDataIO
+import org.apache.spark.util.Utils
+
+private[spark] object ShuffleDataIOUtils {
+
+  /**
+   * The prefix of spark config keys that are passed from the driver to the 
executor.
+   */
+  val SHUFFLE_SPARK_CONF_PREFIX = "spark.shuffle.plugin.__config__."
+
+  def loadShuffleDataIO(conf: SparkConf): ShuffleDataIO = {
+    val configuredPluginClass = conf.get(SHUFFLE_IO_PLUGIN_CLASS)
+    val maybeIO = Utils.loadExtensions(
+      classOf[ShuffleDataIO], Seq(configuredPluginClass), conf)
+    require(maybeIO.nonEmpty, s"A valid shuffle plugin must be specified by 
config " +
+      s"${SHUFFLE_IO_PLUGIN_CLASS.key}, but $configuredPluginClass resulted in 
zero valid " +
+      s"plugins.")
+    maybeIO.head
+  }
+
+}
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index b21ce9c..5adfd71 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -19,6 +19,8 @@ package org.apache.spark.shuffle.sort
 
 import java.util.concurrent.ConcurrentHashMap
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark._
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.shuffle._
@@ -236,12 +238,13 @@ private[spark] object SortShuffleManager extends Logging {
   }
 
   private def loadShuffleExecutorComponents(conf: SparkConf): 
ShuffleExecutorComponents = {
-    val configuredPluginClasses = conf.get(config.SHUFFLE_IO_PLUGIN_CLASS)
-    val maybeIO = Utils.loadExtensions(
-      classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf)
-    require(maybeIO.size == 1, s"Failed to load plugins of type 
$configuredPluginClasses")
-    val executorComponents = maybeIO.head.executor()
-    executorComponents.initializeExecutor(conf.getAppId, 
SparkEnv.get.executorId)
+    val executorComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
+    val extraConfigs = 
conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX)
+        .toMap
+    executorComponents.initializeExecutor(
+      conf.getAppId,
+      SparkEnv.get.executorId,
+      extraConfigs.asJava)
     executorComponents
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 8433a6f..b982626 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -211,7 +211,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with 
LocalSparkContext {
   /**
    * A special [[ContextCleaner]] that saves the IDs of the accumulators 
registered for cleanup.
    */
-  private class SaveAccumContextCleaner(sc: SparkContext) extends 
ContextCleaner(sc) {
+  private class SaveAccumContextCleaner(sc: SparkContext) extends
+      ContextCleaner(sc, null) {
     private val accumsRegistered = new ArrayBuffer[Long]
 
     override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = 
{
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala
new file mode 100644
index 0000000..d8657ec
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.util.{Map => JMap}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.common.collect.ImmutableMap
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
+import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS
+import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleDriverComponents, 
ShuffleExecutorComponents, ShuffleMapOutputWriter}
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO
+
+class ShuffleDriverComponentsSuite
+    extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach {
+
+  test("test serialization of shuffle initialization conf to executors") {
+    val testConf = new SparkConf()
+      .setAppName("testing")
+      .set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + "test-plugin-key", 
"user-set-value")
+      .set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + "test-user-key", 
"user-set-value")
+      .setMaster("local-cluster[2,1,1024]")
+      .set(SHUFFLE_IO_PLUGIN_CLASS, 
"org.apache.spark.shuffle.TestShuffleDataIO")
+
+    sc = new SparkContext(testConf)
+
+    val out = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3)
+      .groupByKey()
+      .foreach { _ =>
+        if (!TestShuffleExecutorComponentsInitialized.initialized.get()) {
+          throw new RuntimeException("TestShuffleExecutorComponents wasn't 
initialized")
+        }
+      }
+  }
+}
+
+class TestShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
+  private val delegate = new LocalDiskShuffleDataIO(sparkConf)
+
+  override def driver(): ShuffleDriverComponents = new 
TestShuffleDriverComponents()
+
+  override def executor(): ShuffleExecutorComponents =
+    new TestShuffleExecutorComponentsInitialized(delegate.executor())
+}
+
+class TestShuffleDriverComponents extends ShuffleDriverComponents {
+  override def initializeApplication(): JMap[String, String] = {
+    ImmutableMap.of("test-plugin-key", "plugin-set-value")
+  }
+
+  override def cleanupApplication(): Unit = {}
+}
+
+object TestShuffleExecutorComponentsInitialized {
+  val initialized = new AtomicBoolean(false)
+}
+
+class TestShuffleExecutorComponentsInitialized(delegate: 
ShuffleExecutorComponents)
+    extends ShuffleExecutorComponents {
+
+  override def initializeExecutor(
+      appId: String,
+      execId: String,
+      extraConfigs: JMap[String, String]): Unit = {
+    delegate.initializeExecutor(appId, execId, extraConfigs)
+    assert(extraConfigs.get("test-plugin-key") == "plugin-set-value", 
extraConfigs)
+    assert(extraConfigs.get("test-user-key") == "user-set-value")
+    TestShuffleExecutorComponentsInitialized.initialized.set(true)
+  }
+
+  override def createMapOutputWriter(
+      shuffleId: Int,
+      mapTaskId: Long,
+      numPartitions: Int): ShuffleMapOutputWriter = {
+    delegate.createMapOutputWriter(shuffleId, mapTaskId, numPartitions)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to