This is an automated email from the ASF dual-hosted git repository. irashid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2e28622 [SPARK-28211][CORE][SHUFFLE] Propose Shuffle Driver Components API 2e28622 is described below commit 2e28622d8aeb9ce2460e803bb7d994196bcc0253 Author: Yifei Huang <yif...@palantir.com> AuthorDate: Tue Oct 15 12:26:49 2019 -0500 [SPARK-28211][CORE][SHUFFLE] Propose Shuffle Driver Components API ### What changes were proposed in this pull request? This is the next step of the Spark-25299 work of proposing a new Shuffle storage API. This patch includes the components of the plugin that hook into the driver, including driver shuffle initialization, application cleanup, and shuffle cleanup. ### How was this patch tested? Existing unit tests, plus an additional test for testing the interactions between the driver and executor initialization. Closes #25823 from yifeih/yh/upstream/driver-lifecycle. Lead-authored-by: Yifei Huang <yif...@palantir.com> Co-authored-by: mccheah <mch...@palantir.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> --- .../apache/spark/shuffle/api/ShuffleDataIO.java | 6 ++ .../spark/shuffle/api/ShuffleDriverComponents.java | 64 +++++++++++++++ .../shuffle/api/ShuffleExecutorComponents.java | 12 ++- .../shuffle/sort/io/LocalDiskShuffleDataIO.java | 8 +- ....java => LocalDiskShuffleDriverComponents.java} | 35 +++++--- .../io/LocalDiskShuffleExecutorComponents.java | 7 +- .../scala/org/apache/spark/ContextCleaner.scala | 8 +- .../main/scala/org/apache/spark/Dependency.scala | 1 + .../main/scala/org/apache/spark/SparkContext.scala | 17 +++- .../apache/spark/shuffle/ShuffleDataIOUtils.scala | 42 ++++++++++ .../spark/shuffle/sort/SortShuffleManager.scala | 15 ++-- .../apache/spark/InternalAccumulatorSuite.scala | 3 +- .../shuffle/ShuffleDriverComponentsSuite.scala | 94 ++++++++++++++++++++++ 13 files changed, 281 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java index e9e50ec..e4554bd 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java @@ -46,4 +46,10 @@ public interface ShuffleDataIO { * are only invoked on the executors. */ ShuffleExecutorComponents executor(); + + /** + * Called once on driver process to bootstrap the shuffle metadata modules that + * are maintained by the driver. + */ + ShuffleDriverComponents driver(); } diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java new file mode 100644 index 0000000..b4cec17 --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.api; + +import java.util.Map; + +import org.apache.spark.annotation.Private; + +/** + * :: Private :: + * An interface for building shuffle support modules for the Driver. + */ +@Private +public interface ShuffleDriverComponents { + + /** + * Called once in the driver to bootstrap this module that is specific to this application. + * This method is called before submitting executor requests to the cluster manager. + * + * This method should prepare the module with its shuffle components i.e. registering against + * an external file servers or shuffle services, or creating tables in a shuffle + * storage data database. + * + * @return additional SparkConf settings necessary for initializing the executor components. + * This would include configurations that cannot be statically set on the application, like + * the host:port of external services for shuffle storage. + */ + Map<String, String> initializeApplication(); + + /** + * Called once at the end of the Spark application to clean up any existing shuffle state. + */ + void cleanupApplication(); + + /** + * Called once per shuffle id when the shuffle id is first generated for a shuffle stage. + * + * @param shuffleId The unique identifier for the shuffle stage. + */ + default void registerShuffle(int shuffleId) {} + + /** + * Removes shuffle data associated with the given shuffle. + * + * @param shuffleId The unique identifier for the shuffle stage. + * @param blocking Whether this call should block on the deletion of the data. + */ + default void removeShuffle(int shuffleId, boolean blocking) {} +} diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java index d30f3da..30ca177 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java @@ -18,6 +18,7 @@ package org.apache.spark.shuffle.api; import java.io.IOException; +import java.util.Map; import java.util.Optional; import org.apache.spark.annotation.Private; @@ -34,21 +35,26 @@ public interface ShuffleExecutorComponents { /** * Called once per executor to bootstrap this module with state that is specific to * that executor, specifically the application ID and executor ID. + * + * @param appId The Spark application id + * @param execId The unique identifier of the executor being initialized + * @param extraConfigs Extra configs that were returned by + * {@link ShuffleDriverComponents#initializeApplication()} */ - void initializeExecutor(String appId, String execId); + void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs); /** * Called once per map task to create a writer that will be responsible for persisting all the * partitioned bytes written by that map task. * * @param shuffleId Unique identifier for the shuffle the map task is a part of - * @param mapId An ID of the map task. The ID is unique within this Spark application. + * @param mapTaskId An ID of the map task. The ID is unique within this Spark application. * @param numPartitions The number of partitions that will be written by the map task. Some of * these partitions may be empty. */ ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, - long mapId, + long mapTaskId, int numPartitions) throws IOException; /** diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java index cabcb17..50eb2f1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java @@ -18,8 +18,9 @@ package org.apache.spark.shuffle.sort.io; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.api.ShuffleExecutorComponents; import org.apache.spark.shuffle.api.ShuffleDataIO; +import org.apache.spark.shuffle.api.ShuffleDriverComponents; +import org.apache.spark.shuffle.api.ShuffleExecutorComponents; /** * Implementation of the {@link ShuffleDataIO} plugin system that replicates the local shuffle @@ -37,4 +38,9 @@ public class LocalDiskShuffleDataIO implements ShuffleDataIO { public ShuffleExecutorComponents executor() { return new LocalDiskShuffleExecutorComponents(sparkConf); } + + @Override + public ShuffleDriverComponents driver() { + return new LocalDiskShuffleDriverComponents(); + } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java similarity index 50% copy from core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java copy to core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java index cabcb17..92b4b31 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java @@ -17,24 +17,33 @@ package org.apache.spark.shuffle.sort.io; -import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.api.ShuffleExecutorComponents; -import org.apache.spark.shuffle.api.ShuffleDataIO; +import java.util.Collections; +import java.util.Map; -/** - * Implementation of the {@link ShuffleDataIO} plugin system that replicates the local shuffle - * storage and index file functionality that has historically been used from Spark 2.4 and earlier. - */ -public class LocalDiskShuffleDataIO implements ShuffleDataIO { +import org.apache.spark.SparkEnv; +import org.apache.spark.shuffle.api.ShuffleDriverComponents; +import org.apache.spark.storage.BlockManagerMaster; + +public class LocalDiskShuffleDriverComponents implements ShuffleDriverComponents { + + private BlockManagerMaster blockManagerMaster; - private final SparkConf sparkConf; + @Override + public Map<String, String> initializeApplication() { + blockManagerMaster = SparkEnv.get().blockManager().master(); + return Collections.emptyMap(); + } - public LocalDiskShuffleDataIO(SparkConf sparkConf) { - this.sparkConf = sparkConf; + @Override + public void cleanupApplication() { + // nothing to clean up } @Override - public ShuffleExecutorComponents executor() { - return new LocalDiskShuffleExecutorComponents(sparkConf); + public void removeShuffle(int shuffleId, boolean blocking) { + if (blockManagerMaster == null) { + throw new IllegalStateException("Driver components must be initialized before using"); + } + blockManagerMaster.removeShuffle(shuffleId, blocking); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java index a0c7d3c..eb4d9d9 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle.sort.io; +import java.util.Map; import java.util.Optional; import com.google.common.annotations.VisibleForTesting; @@ -50,7 +51,7 @@ public class LocalDiskShuffleExecutorComponents implements ShuffleExecutorCompon } @Override - public void initializeExecutor(String appId, String execId) { + public void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs) { blockManager = SparkEnv.get().blockManager(); if (blockManager == null) { throw new IllegalStateException("No blockManager available from the SparkEnv."); @@ -61,14 +62,14 @@ public class LocalDiskShuffleExecutorComponents implements ShuffleExecutorCompon @Override public ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, - long mapId, + long mapTaskId, int numPartitions) { if (blockResolver == null) { throw new IllegalStateException( "Executor components must be initialized before getting writers."); } return new LocalDiskShuffleMapOutputWriter( - shuffleId, mapId, numPartitions, blockResolver, sparkConf); + shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf); } @Override diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index dfbd7d1..9506c36 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -27,6 +27,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} +import org.apache.spark.shuffle.api.ShuffleDriverComponents import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils} /** @@ -58,7 +59,9 @@ private class CleanupTaskWeakReference( * to be processed when the associated object goes out of scope of the application. Actual * cleanup is performed in a separate daemon thread. */ -private[spark] class ContextCleaner(sc: SparkContext) extends Logging { +private[spark] class ContextCleaner( + sc: SparkContext, + shuffleDriverComponents: ShuffleDriverComponents) extends Logging { /** * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they @@ -221,7 +224,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { try { logDebug("Cleaning shuffle " + shuffleId) mapOutputTrackerMaster.unregisterShuffle(shuffleId) - blockManagerMaster.removeShuffle(shuffleId, blocking) + shuffleDriverComponents.removeShuffle(shuffleId, blocking) listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) logDebug("Cleaned shuffle " + shuffleId) } catch { @@ -269,7 +272,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - private def blockManagerMaster = sc.env.blockManager.master private def broadcastManager = sc.env.broadcastManager private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] } diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index f0ac9ac..ba8e4d6 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -96,6 +96,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( shuffleId, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId) } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4792c0a..2db8809 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -58,6 +58,8 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.scheduler.local.LocalSchedulerBackend +import org.apache.spark.shuffle.ShuffleDataIOUtils +import org.apache.spark.shuffle.api.ShuffleDriverComponents import org.apache.spark.status.{AppStatusSource, AppStatusStore} import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage._ @@ -217,6 +219,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _statusStore: AppStatusStore = _ private var _heartbeater: Heartbeater = _ private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _ + private var _shuffleDriverComponents: ShuffleDriverComponents = _ /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -319,6 +322,8 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = ds } + private[spark] def shuffleDriverComponents: ShuffleDriverComponents = _shuffleDriverComponents + /** * A unique identifier for the Spark application. * Its format depends on the scheduler implementation. @@ -524,6 +529,11 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() + _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => + _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) + } + // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) _heartbeatReceiver = env.rpcEnv.setupEndpoint( @@ -576,7 +586,7 @@ class SparkContext(config: SparkConf) extends Logging { _cleaner = if (_conf.get(CLEANER_REFERENCE_TRACKING)) { - Some(new ContextCleaner(this)) + Some(new ContextCleaner(this, _shuffleDriverComponents)) } else { None } @@ -1975,6 +1985,11 @@ class SparkContext(config: SparkConf) extends Logging { } _heartbeater = null } + if (_shuffleDriverComponents != null) { + Utils.tryLogNonFatalError { + _shuffleDriverComponents.cleanupApplication() + } + } if (env != null && _heartbeatReceiver != null) { Utils.tryLogNonFatalError { env.rpcEnv.stop(_heartbeatReceiver) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala new file mode 100644 index 0000000..e9507a7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS +import org.apache.spark.shuffle.api.ShuffleDataIO +import org.apache.spark.util.Utils + +private[spark] object ShuffleDataIOUtils { + + /** + * The prefix of spark config keys that are passed from the driver to the executor. + */ + val SHUFFLE_SPARK_CONF_PREFIX = "spark.shuffle.plugin.__config__." + + def loadShuffleDataIO(conf: SparkConf): ShuffleDataIO = { + val configuredPluginClass = conf.get(SHUFFLE_IO_PLUGIN_CLASS) + val maybeIO = Utils.loadExtensions( + classOf[ShuffleDataIO], Seq(configuredPluginClass), conf) + require(maybeIO.nonEmpty, s"A valid shuffle plugin must be specified by config " + + s"${SHUFFLE_IO_PLUGIN_CLASS.key}, but $configuredPluginClass resulted in zero valid " + + s"plugins.") + maybeIO.head + } + +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index b21ce9c..5adfd71 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -19,6 +19,8 @@ package org.apache.spark.shuffle.sort import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ + import org.apache.spark._ import org.apache.spark.internal.{config, Logging} import org.apache.spark.shuffle._ @@ -236,12 +238,13 @@ private[spark] object SortShuffleManager extends Logging { } private def loadShuffleExecutorComponents(conf: SparkConf): ShuffleExecutorComponents = { - val configuredPluginClasses = conf.get(config.SHUFFLE_IO_PLUGIN_CLASS) - val maybeIO = Utils.loadExtensions( - classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) - require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") - val executorComponents = maybeIO.head.executor() - executorComponents.initializeExecutor(conf.getAppId, SparkEnv.get.executorId) + val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor() + val extraConfigs = conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX) + .toMap + executorComponents.initializeExecutor( + conf.getAppId, + SparkEnv.get.executorId, + extraConfigs.asJava) executorComponents } } diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 8433a6f..b982626 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -211,7 +211,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { /** * A special [[ContextCleaner]] that saves the IDs of the accumulators registered for cleanup. */ - private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) { + private class SaveAccumContextCleaner(sc: SparkContext) extends + ContextCleaner(sc, null) { private val accumsRegistered = new ArrayBuffer[Long] override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala new file mode 100644 index 0000000..d8657ec --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.util.{Map => JMap} +import java.util.concurrent.atomic.AtomicBoolean + +import com.google.common.collect.ImmutableMap +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS +import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleDriverComponents, ShuffleExecutorComponents, ShuffleMapOutputWriter} +import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO + +class ShuffleDriverComponentsSuite + extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach { + + test("test serialization of shuffle initialization conf to executors") { + val testConf = new SparkConf() + .setAppName("testing") + .set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + "test-plugin-key", "user-set-value") + .set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + "test-user-key", "user-set-value") + .setMaster("local-cluster[2,1,1024]") + .set(SHUFFLE_IO_PLUGIN_CLASS, "org.apache.spark.shuffle.TestShuffleDataIO") + + sc = new SparkContext(testConf) + + val out = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3) + .groupByKey() + .foreach { _ => + if (!TestShuffleExecutorComponentsInitialized.initialized.get()) { + throw new RuntimeException("TestShuffleExecutorComponents wasn't initialized") + } + } + } +} + +class TestShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO { + private val delegate = new LocalDiskShuffleDataIO(sparkConf) + + override def driver(): ShuffleDriverComponents = new TestShuffleDriverComponents() + + override def executor(): ShuffleExecutorComponents = + new TestShuffleExecutorComponentsInitialized(delegate.executor()) +} + +class TestShuffleDriverComponents extends ShuffleDriverComponents { + override def initializeApplication(): JMap[String, String] = { + ImmutableMap.of("test-plugin-key", "plugin-set-value") + } + + override def cleanupApplication(): Unit = {} +} + +object TestShuffleExecutorComponentsInitialized { + val initialized = new AtomicBoolean(false) +} + +class TestShuffleExecutorComponentsInitialized(delegate: ShuffleExecutorComponents) + extends ShuffleExecutorComponents { + + override def initializeExecutor( + appId: String, + execId: String, + extraConfigs: JMap[String, String]): Unit = { + delegate.initializeExecutor(appId, execId, extraConfigs) + assert(extraConfigs.get("test-plugin-key") == "plugin-set-value", extraConfigs) + assert(extraConfigs.get("test-user-key") == "user-set-value") + TestShuffleExecutorComponentsInitialized.initialized.set(true) + } + + override def createMapOutputWriter( + shuffleId: Int, + mapTaskId: Long, + numPartitions: Int): ShuffleMapOutputWriter = { + delegate.createMapOutputWriter(shuffleId, mapTaskId, numPartitions) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org