Github user brad-kaiser commented on a diff in the pull request:
https://github.com/apache/spark/pull/19041#discussion_r177129232
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala
---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.util.Try
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.server.TransportServer
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage._
+
+class CacheRecoveryIntegrationSuite
+ extends SparkFunSuite with Matchers with BeforeAndAfterEach with
Eventually {
+
+ var shuffleService: TransportServer = _
+ var conf: SparkConf = _
+ var sc: SparkContext = _
+
+ private def makeBaseConf() = new SparkConf()
+ .setAppName("test")
+ .setMaster("local-cluster[4, 1, 512]")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always
+ .set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s")
+ .set(EXECUTOR_MEMORY.key, "512m")
+ .set(SHUFFLE_SERVICE_ENABLED.key, "true")
+ .set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true")
+ .set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s")
+ .set(EXECUTOR_INSTANCES.key, "1")
+ .set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4")
+ .set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3")
+
+ override def beforeEach(): Unit = {
+ conf = makeBaseConf()
+ val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle",
numUsableCores = 4)
+ val rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
+ val transportContext = new TransportContext(transportConf, rpcHandler)
+ shuffleService = transportContext.createServer()
--- End diff --
Updated test so we only start one shuffle service.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]