[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-16 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r456115333



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -55,6 +58,24 @@ private[spark] class IndexShuffleBlockResolver(
 
   def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, 
mapId, None)
 
+  /**
+   * Get the shuffle files that are stored locally. Used for block migrations.
+   */
+  override def getStoredShuffles(): Set[ShuffleBlockInfo] = {
+// Matches ShuffleIndexBlockId name
+val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r
+val rootDirs = blockManager.diskBlockManager.localDirs
+// ExecutorDiskUtil puts things inside one level hashed sub directories
+val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) 
++ rootDirs

Review comment:
   Since your other comment, I managed to drop this part so we can avoid 
this code and the regex :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-16 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r456115190



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -55,6 +58,24 @@ private[spark] class IndexShuffleBlockResolver(
 
   def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, 
mapId, None)
 
+  /**
+   * Get the shuffle files that are stored locally. Used for block migrations.
+   */
+  override def getStoredShuffles(): Set[ShuffleBlockInfo] = {
+// Matches ShuffleIndexBlockId name
+val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r

Review comment:
   I did some poking and it seems like we can drop this regex and instead 
do some pattern matching off of getAllBlocks from the disk manager. Thanks for 
pointing out this was weird.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-16 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r456063110



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[(ShuffleBlockInfo, Int)] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logDebug("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some((shuffleBlockInfo, retryCount)) =>
+  if (retryCount < maxReplicationFailuresForDecommission) {
+logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+val blocks =
+  bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+logDebug(s"Got migration sub-blocks ${blocks}")
+blocks.foreach { case (blockId, buffer) =>
+  logDebug(s"Migrating sub-block ${blockId}")
+  bm.blockTransferService.uploadBlockSync(
+peer.host,
+peer.port,
+peer.executorId,
+blockId,
+buffer,
+StorageLevel.DISK_ONLY,
+null)// class tag, we don't need for shuffle
+  logDebug(s"Migrated sub block ${blockId}")
+}
+logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  } else {
+logError(s"Skipping block ${shuffleBlockInfo} because it has 

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-16 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r456062915



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -650,6 +662,23 @@ private[spark] class BlockManager(
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[_]): StreamCallbackWithID = {
+
+if (isDecommissioning()) {
+   throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
+}
+
+if (blockId.isShuffle) {
+  logInfo(s"Putting shuffle block ${blockId}")

Review comment:
   Sure I can drop these two down to debug.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-16 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r456062494



##
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##
@@ -38,7 +38,10 @@ sealed abstract class BlockId {
   // convenience methods
   def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) 
else None
   def isRDD: Boolean = isInstanceOf[RDDBlockId]
-  def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] || 
isInstanceOf[ShuffleBlockBatchId]
+  def isShuffle: Boolean = {
+(isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] ||

Review comment:
   I think it's easier to read





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-16 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r456061843



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks

Review comment:
   Yeah so I've been thinking about that a bit. The reason for the 
configurable size pool for shuffle block migrations is shuffle blocks tend to 
be large and we want to allow the user to control the parallelism to match 
their network set up & general decommissioning time.
   
   I think having the management threads separate is ok since otherwise, this 
gets a bit confusing. We could do the same thing with some complicated locking, 
but I think keeping the tread pools distinct is easier to read.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-16 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r456062326



##
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##
@@ -168,7 +168,10 @@ private[spark] class NettyBlockTransferService(
 // Everything else is encoded using our binary protocol.
 val metadata = 
JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
 
-val asStream = blockData.size() > 
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
+// We always transfer shuffle blocks as a stream for simplicity with the 
receiving code since
+// they are always written to disk. Otherwise we check the block size.
+val asStream = (blockData.size() > 
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) ||

Review comment:
   I think it's easier to read as is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r455369270



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -55,6 +58,24 @@ private[spark] class IndexShuffleBlockResolver(
 
   def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, 
mapId, None)
 
+  /**
+   * Get the shuffle files that are stored locally. Used for block migrations.
+   */
+  override def getStoredShuffles(): Set[ShuffleBlockInfo] = {
+// Matches ShuffleIndexBlockId name
+val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r

Review comment:
   Sounds reasonable. I think there is a similar regex around the codebase 
somewhere, I'll see if I can unify them (or that could just be excess coffee).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r455367802



##
File path: core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.client.StreamCallbackWithID
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.storage.BlockId
+
+/**
+ * :: Experimental ::
+ * An experimental trait to allow Spark to migrate shuffle blocks.
+ */
+@Experimental
+trait MigratableResolver {

Review comment:
   Sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r455368622



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -44,9 +47,9 @@ import org.apache.spark.util.Utils
 // 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
 private[spark] class IndexShuffleBlockResolver(
 conf: SparkConf,
-_blockManager: BlockManager = null)
+var _blockManager: BlockManager = null)

Review comment:
   Yeah it's for the new tests in the `BlockManagerSuite`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r455337945



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,29 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.enabled")

Review comment:
   I was planning on saving that for once we've agreed it's ready for 
general usage. I know the SPIP is approved, but I still view this as more of a 
developer feature (e.g. one we would expect a cloud vendor to build on top of) 
than ready for end user feature. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r455188725



##
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##
@@ -1866,13 +1903,57 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
 assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId))
 
-store1.decommissionRddCacheBlocks()
+val decomManager = new BlockManagerDecommissioner(conf, store1)
+decomManager.decommissionRddCacheBlocks()
 // Smaller block offloaded to store2
 assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId))
 // Larger block still present in store1 as it can't be offloaded
 assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
   }
 
+  test("test migration of shuffle blocks during decommissioning") {
+val shuffleManager1 = makeSortShuffleManager()
+val bm1 = makeBlockManager(3500, "exec1", shuffleManager = shuffleManager1)
+shuffleManager1.shuffleBlockResolver._blockManager = bm1
+
+val shuffleManager2 = makeSortShuffleManager()
+val bm2 = makeBlockManager(3500, "exec2", shuffleManager = shuffleManager2)
+shuffleManager2.shuffleBlockResolver._blockManager = bm2
+
+val blockSize = 5
+val shuffleDataBlockContent = Array[Byte](0, 1, 2, 3, 4)
+val shuffleData = ShuffleDataBlockId(0, 0, 0)
+Files.write(bm1.diskBlockManager.getFile(shuffleData).toPath(), 
shuffleDataBlockContent)
+val shuffleIndexBlockContent = Array[Byte](5, 6, 7, 8, 9)
+val shuffleIndex = ShuffleIndexBlockId(0, 0, 0)
+Files.write(bm1.diskBlockManager.getFile(shuffleIndex).toPath(), 
shuffleIndexBlockContent)
+
+mapOutputTracker.registerShuffle(0, 1)
+val decomManager = new BlockManagerDecommissioner(conf, bm1)
+try {
+  mapOutputTracker.registerMapOutput(0, 0, MapStatus(bm1.blockManagerId, 
Array(blockSize), 0))
+  assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === 
bm1.blockManagerId)
+
+  val env = mock(classOf[SparkEnv])
+  when(env.conf).thenReturn(conf)
+  SparkEnv.set(env)
+
+  decomManager.refreshOffloadingShuffleBlocks()
+
+  eventually(timeout(1.second), interval(10.milliseconds)) {
+assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === 
bm2.blockManagerId)

Review comment:
   refreshOffloadingShuffleBlocks triggers the migration. We don't ever 
explicitly trigger a delete of the file on bm1 since this is for 
decommissioning.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r455184586



##
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.Semaphore
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
+import org.apache.spark.internal.config
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
+
+class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with 
LocalSparkContext
+with ResetSystemProperties with Eventually {
+
+  val numExecs = 3
+  val numParts = 3
+
+  test(s"verify that an already running task which is going to cache data 
succeeds " +
+s"on a decommissioned executor") {
+runDecomTest(true, false, true)
+  }
+
+  test(s"verify that shuffle blocks are migrated") {
+runDecomTest(false, true, false)
+  }
+
+  test(s"verify that both migrations can work at the same time.") {
+runDecomTest(true, true, false)
+  }
+
+  private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: 
Boolean) = {
+
+val master = s"local-cluster[${numExecs}, 1, 1024]"
+val conf = new SparkConf().setAppName("test").setMaster(master)
+  .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
+  .set(config.STORAGE_DECOMMISSION_ENABLED, true)
+  .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
+  .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
+// Just replicate blocks as fast as we can during testing, there isn't 
another
+// workload we need to worry about.
+  .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+
+sc = new SparkContext(master, "test", conf)
+
+// Wait for the executors to start
+TestUtils.waitUntilExecutorsUp(sc = sc,
+  numExecutors = numExecs,
+  timeout = 6) // 60s
+
+// Create input RDD with 10 partitions
+val input = sc.parallelize(1 to numParts, numParts)
+val accum = sc.longAccumulator("mapperRunAccumulator")
+input.count()
+
+// Create a new RDD where we have sleep in each partition, we are also 
increasing
+// the value of accumulator in each partition
+val baseRdd = input.mapPartitions { x =>
+  if (migrateDuring) {
+Thread.sleep(1000)
+  }
+  accum.add(1)
+  x.map(y => (y, y))
+}
+val testRdd = shuffle match {
+  case true => baseRdd.reduceByKey(_ + _)
+  case false => baseRdd
+}
+
+// Listen for the job & block updates
+val taskStartSem = new Semaphore(0)
+val broadcastSem = new Semaphore(0)
+val executorRemovedSem = new Semaphore(0)
+val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
+sc.addSparkListener(new SparkListener {
+
+  override def onExecutorRemoved(execRemoved: 
SparkListenerExecutorRemoved): Unit = {
+executorRemovedSem.release()
+  }
+
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+taskStartSem.release()
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+taskEndEvents.append(taskEnd)
+  }
+
+  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): 
Unit = {
+// Once broadcast start landing on the executors we're good to proceed.
+// We don't only use task start as it can occur before the work is on 
the executor.
+if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
+  broadcastSem.release()
+}
+blocksUpdated.append(blockUpdated)
+  }
+})
+
+
+// Cache the RDD lazily
+if (persist) {
+  testRdd.persist()
+}
+
+// Start the computation of RDD - this step will also cache the RDD
+val asyncCount = testRdd.countAsync()
+
+// Wait for 

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r455184084



##
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.Semaphore
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
+import org.apache.spark.internal.config
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
+
+class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with 
LocalSparkContext
+with ResetSystemProperties with Eventually {
+
+  val numExecs = 3
+  val numParts = 3
+
+  test(s"verify that an already running task which is going to cache data 
succeeds " +
+s"on a decommissioned executor") {
+runDecomTest(true, false, true)
+  }
+
+  test(s"verify that shuffle blocks are migrated") {
+runDecomTest(false, true, false)
+  }
+
+  test(s"verify that both migrations can work at the same time.") {
+runDecomTest(true, true, false)
+  }
+
+  private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: 
Boolean) = {
+
+val master = s"local-cluster[${numExecs}, 1, 1024]"
+val conf = new SparkConf().setAppName("test").setMaster(master)
+  .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
+  .set(config.STORAGE_DECOMMISSION_ENABLED, true)
+  .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
+  .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
+// Just replicate blocks as fast as we can during testing, there isn't 
another
+// workload we need to worry about.
+  .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+
+sc = new SparkContext(master, "test", conf)
+
+// Wait for the executors to start
+TestUtils.waitUntilExecutorsUp(sc = sc,
+  numExecutors = numExecs,
+  timeout = 6) // 60s
+
+// Create input RDD with 10 partitions
+val input = sc.parallelize(1 to numParts, numParts)
+val accum = sc.longAccumulator("mapperRunAccumulator")
+input.count()
+
+// Create a new RDD where we have sleep in each partition, we are also 
increasing
+// the value of accumulator in each partition
+val baseRdd = input.mapPartitions { x =>
+  if (migrateDuring) {
+Thread.sleep(1000)
+  }
+  accum.add(1)
+  x.map(y => (y, y))
+}
+val testRdd = shuffle match {
+  case true => baseRdd.reduceByKey(_ + _)
+  case false => baseRdd
+}
+
+// Listen for the job & block updates
+val taskStartSem = new Semaphore(0)
+val broadcastSem = new Semaphore(0)
+val executorRemovedSem = new Semaphore(0)
+val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
+sc.addSparkListener(new SparkListener {
+
+  override def onExecutorRemoved(execRemoved: 
SparkListenerExecutorRemoved): Unit = {
+executorRemovedSem.release()
+  }
+
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+taskStartSem.release()
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+taskEndEvents.append(taskEnd)
+  }
+
+  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): 
Unit = {
+// Once broadcast start landing on the executors we're good to proceed.
+// We don't only use task start as it can occur before the work is on 
the executor.
+if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
+  broadcastSem.release()
+}
+blocksUpdated.append(blockUpdated)
+  }
+})
+
+
+// Cache the RDD lazily
+if (persist) {
+  testRdd.persist()
+}
+
+// Start the computation of RDD - this step will also cache the RDD
+val asyncCount = testRdd.countAsync()
+
+// Wait for 

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r455181019



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -1285,6 +1314,9 @@ private[spark] class BlockManager(
 
 require(blockId != null, "BlockId is null")
 require(level != null && level.isValid, "StorageLevel is null or invalid")
+if (isDecommissioning()) {
+  throw new BlockSavedOnDecommissionedBlockManagerException(blockId)

Review comment:
   There is a test case for that situation, take a look at the 
BlockManagerDecommissionIntegrationSuite. It does not appear to cause task 
failure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-07-13 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r454058089



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -242,8 +244,7 @@ private[spark] class BlockManager(
 
   private var blockReplicationPolicy: BlockReplicationPolicy = _
 
-  private var blockManagerDecommissioning: Boolean = false
-  private var decommissionManager: Option[BlockManagerDecommissionManager] = 
None
+  @volatile private var decommissioner: Option[BlockManagerDecommissioner] = 
None

Review comment:
   I think I'm going to leave it volatile for now, I'd like to avoid remote 
block puts once we're in decommissioning because we depend on not getting new 
blocks except from tasks to figure out when it is safe to exit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-29 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r447249716



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -242,8 +244,7 @@ private[spark] class BlockManager(
 
   private var blockReplicationPolicy: BlockReplicationPolicy = _
 
-  private var blockManagerDecommissioning: Boolean = false
-  private var decommissionManager: Option[BlockManagerDecommissionManager] = 
None
+  @volatile private var decommissioner: Option[BlockManagerDecommissioner] = 
None

Review comment:
   That's true. If we drop it we might also accept remove block puts after 
we've started decommissioning though. Depends on how much we want to avoid that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-29 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r447247802



##
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##
@@ -40,6 +40,9 @@ sealed abstract class BlockId {
   def isRDD: Boolean = isInstanceOf[RDDBlockId]
   def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] || 
isInstanceOf[ShuffleBlockBatchId]
   def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
+  def isInternalShuffle: Boolean = {

Review comment:
   Looking at it, not widely used I'll audit each use case and then decide.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-29 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r447247346



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -148,6 +170,82 @@ private[spark] class IndexShuffleBlockResolver(
 }
   }
 
+  /**
+   * Write a provided shuffle block as a stream. Used for block migrations.
+   * ShuffleBlockBatchIds must contain the full range represented in the 
ShuffleIndexBlock.
+   * Requires the caller to delete any shuffle index blocks where the shuffle 
block fails to
+   * put.
+   */
+  override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: 
SerializerManager):
+  StreamCallbackWithID = {
+val file = blockId match {
+  case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+getIndexFile(shuffleId, mapId)
+  case ShuffleDataBlockId(shuffleId, mapId, _) =>
+getDataFile(shuffleId, mapId)
+  case _ =>
+throw new Exception(s"Unexpected shuffle block transfer ${blockId} as 
" +

Review comment:
   sgtm





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-29 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r447247000



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -55,6 +58,25 @@ private[spark] class IndexShuffleBlockResolver(
 
   def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, 
mapId, None)
 
+  /**
+   * Get the shuffle files that are stored locally. Used for block migrations.
+   */
+  override def getStoredShuffles(): Set[ShuffleBlockInfo] = {
+// Matches ShuffleIndexBlockId name
+val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r
+val rootDirs = blockManager.diskBlockManager.localDirs
+// ExecutorDiskUtil puts things inside one level hashed sub directories
+val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) 
++ rootDirs
+val filenames = searchDirs.flatMap(_.list())
+logDebug(s"Got block files ${filenames.toList}")
+filenames.flatMap { fname =>
+  pattern.findAllIn(fname).matchData.map {
+matched => ShuffleBlockInfo(matched.group(1).toInt, 
matched.group(2).toLong)
+  }
+}.toSet

Review comment:
   There shouldn't be any duplicates, but if there are we only need to 
transfer one anyways.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-29 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r447190973



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-26 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r446475360



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -57,7 +57,7 @@ import org.apache.spark.resource._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
StandaloneSchedulerBackend}

Review comment:
   Good point, yeah I'll take this change out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-24 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r445108940



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -157,7 +158,8 @@ class BlockManagerMasterEndpoint(
   context.reply(true)
 
 case DecommissionBlockManagers(executorIds) =>

Review comment:
   Right we don't do anything in that situation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-24 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r445108498



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -489,6 +491,24 @@ class BlockManagerMasterEndpoint(
   storageLevel: StorageLevel,
   memSize: Long,
   diskSize: Long): Boolean = {
+logInfo(s"Updating block info on master ${blockId} for ${blockManagerId}")
+
+if (blockId.isInternalShuffle) {
+  blockId match {
+case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+  // Don't update the map output on just the index block
+  logDebug(s"Received shuffle index block update for ${shuffleId} 
${mapId}, ignoring.")
+  return true
+case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
+  logInfo(s"Received shuffle data block update for ${shuffleId} 
${mapId}, updating.")

Review comment:
   Maybe? I was thinking that info might be the right level for 
successfully migrated shuffle blocks, but if you think debug would be better 
I'm happy to drop it down a level.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-24 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r445073751



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logDebug("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logDebug(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+ 

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-23 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r53446



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logDebug("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logDebug(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+ 

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-18 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r442604350



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -775,7 +802,12 @@ private[spark] class MapOutputTrackerMaster(
   override def stop(): Unit = {
 mapOutputRequests.offer(PoisonPill)
 threadpool.shutdown()
-sendTracker(StopMapOutputTracker)
+try {

Review comment:
   So this is only because we call shutdown during the tests that this is 
needed. It doesn't throw exceptions normally.

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,30 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.enabled")
+  .doc("Whether to transfer shuffle blocks during block manager 
decommissioning. Requires " +
+"a migratable shuffle resolver (like sort based shuffe)")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_MAX_THREADS =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxThreads")
+  .doc("Maximum number of threads to use in migrating shuffle files.")
+  .version("3.1.0")
+  .intConf
+  .checkValue(_ > 0, "The maximum number of threads should be positive")
+  .createWithDefault(10)

Review comment:
   Sure, why not. I'll change this to default to 
`SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS` if it's not configured. Sound good?

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,30 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =

Review comment:
   Sure sounds good, I'll rename it.

##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -650,6 +658,23 @@ private[spark] class BlockManager(
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[_]): StreamCallbackWithID = {
+
+if (decommissioner.isDefined) {
+   throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
+}
+
+if (blockId.isShuffle || blockId.isInternalShuffle) {
+  logInfo(s"Putting shuffle block ${blockId}")
+  try {
+return migratableResolver.putShuffleBlockAsStream(blockId, 
serializerManager)
+  } catch {
+case e: ClassCastException => throw new SparkException(

Review comment:
   Yeah were logging which kind of block we received. We only really care 
about the type but it's easier to just print the blockId its self.

##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each 

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-16 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r441027429



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440443890



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440442024



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440407328



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,21 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
+ConfigBuilder("spark.storage.decommission.shuffle_blocks")
+  .doc("Whether to transfer shuffle blocks during block manager 
decommissioning. Requires " +
+"an indexed shuffle resolver (like sort based shuffe)")

Review comment:
   Oh actually should change this wording, it can work a little more 
generally now. Any implementation which implements a given trait. I'm not sure 
about adding an assert here, we do log it when we first try and use it though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440405708



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -775,7 +802,12 @@ private[spark] class MapOutputTrackerMaster(
   override def stop(): Unit = {
 mapOutputRequests.offer(PoisonPill)
 threadpool.shutdown()
-sendTracker(StopMapOutputTracker)
+try {
+  sendTracker(StopMapOutputTracker)
+} catch {
+  case e: Exception =>

Review comment:
   So the exception I want to catch is a `SparkException`, so I'll just 
narrow it to that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440378850



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -121,12 +121,28 @@ private class ShuffleStatus(numPartitions: Int) {
 mapStatuses(mapIndex) = status
   }
 
+  /**
+   * Update the map output location (e.g. during migration).
+   */
+  def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = 
withWriteLock {
+val mapStatusOpt = mapStatuses.find(_.mapId == mapId)
+mapStatusOpt match {
+  case Some(mapStatus) =>
+logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
+mapStatus.updateLocation(bmAddress)

Review comment:
   no reason to, this is pretty close to a noop if we get a duplicated 
update message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440378160



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -334,13 +336,14 @@ class BlockManagerMasterEndpoint(
 val info = blockManagerInfo(blockManagerId)
 
 val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
-rddBlocks.map { blockId =>
+val result = rddBlocks.map { blockId =>
   val currentBlockLocations = blockLocations.get(blockId)
   val maxReplicas = currentBlockLocations.size + 1
   val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != 
blockManagerId)
   val replicateMsg = ReplicateBlock(blockId, remainingLocations, 
maxReplicas)
   replicateMsg
 }.toSeq
+result

Review comment:
   no, I can revert this. I have it in from when I was debugging some stuff 
in here.

##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -489,6 +492,24 @@ class BlockManagerMasterEndpoint(
   storageLevel: StorageLevel,
   memSize: Long,
   diskSize: Long): Boolean = {
+logInfo(s"Updating block info on master ${blockId} for ${blockManagerId}")
+
+if (blockId.isInternalShuffle) {
+  blockId match {
+case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+  // Don't update the map output on just the index block
+  logDebug("Received shuffle index block update for ${shuffleId} 
${mapId}, ignoring.")

Review comment:
   thanks :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440377849



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
##
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-

Review comment:
   Sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440377043



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>

Review 

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440377415



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440376585



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440375923



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -650,6 +658,23 @@ private[spark] class BlockManager(
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[_]): StreamCallbackWithID = {
+
+if (decommissioner.isDefined) {
+   throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
+}
+
+if (blockId.isShuffle || blockId.isInternalShuffle) {
+  logInfo(s"Putting shuffle block ${blockId}")
+  try {
+return migratableResolver.putShuffleBlockAsStream(blockId, 
serializerManager)
+  } catch {
+case e: ClassCastException => throw new Exception(

Review comment:
   Good point, I don't. I'll add a unit test for this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440375111



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")

Review comment:
   I'll switch the log level to debug.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440374116



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1725,6 +1725,17 @@ class SparkContext(config: SparkConf) extends Logging {
 }
   }
 
+
+  private[spark] def decommissionExecutors(executorIds: Seq[String]): Unit = {

Review comment:
   Good catch, I'll drop this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440373566



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440372693



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440372013



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440372154



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we normally have less cache present than shuffle data.
+   *
+   * The producer/consumer model is chosen for shuffle block migration to 
maximize
+   * the chance of migrating all shuffle blocks before the executor is forced 
to exit.
+   */
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[ShuffleBlockInfo] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some(shuffleBlockInfo) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleBlockInfo} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440370903



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,21 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
+ConfigBuilder("spark.storage.decommission.shuffle_blocks")

Review comment:
   Sounds good I'll swap out the _s in the config string for camel

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,21 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
+ConfigBuilder("spark.storage.decommission.shuffle_blocks")
+  .doc("Whether to transfer shuffle blocks during block manager 
decommissioning. Requires " +
+"an indexed shuffle resolver (like sort based shuffe)")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val STORAGE_RDD_DECOMMISSION_ENABLED =
+ConfigBuilder("spark.storage.decommission.rdd_blocks")

Review comment:
   Sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-15 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r440370509



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -775,7 +802,12 @@ private[spark] class MapOutputTrackerMaster(
   override def stop(): Unit = {
 mapOutputRequests.offer(PoisonPill)
 threadpool.shutdown()
-sendTracker(StopMapOutputTracker)
+try {
+  sendTracker(StopMapOutputTracker)
+} catch {
+  case e: Exception =>

Review comment:
   Good catch, I'll narrow this down.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-12 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r439700057



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.MigratableResolver
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[(Int, Long)] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some((shuffleId, mapId)) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleId, mapId)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+  migrating match {
+case Some(shuffleMap) =>
+  logError(s"Error ${e} during migration, adding ${shuffleMap} 
back to migration queue")
+  shufflesToMigrate.add(shuffleMap)
+case None =>
+  logError(s"Error ${e} while waiting for block to migrate")
+  }
+  }
+}
+  }
+
+  // Shuffles which are either in queue for migrations or migrated
+  private val migratingShuffles = mutable.HashSet[(Int, Long)]()
+
+  // Shuffles which are queued for migration
+  private[storage] val shufflesToMigrate =
+new java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()
+
+  @volatile private var stopped = false
+
+  private val migrationPeers =
+mutable.HashMap[BlockManagerId, (ShuffleMigrationRunnable, 
ExecutorService)]()
+
+  private lazy val blockMigrationExecutor =
+ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission")
+
+  private val blockMigrationRunnable = new Runnable {
+val sleepInterval = 
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
+
+override def run(): Unit = {
+  var failures = 0
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-12 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r439386843



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.MigratableResolver
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[(Int, Long)] = None
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running && !Thread.interrupted()) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some((shuffleId, mapId)) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to 
${peer}")
+  val blocks =
+bm.migratableResolver.getMigrationBlocks(shuffleId, mapId)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+bm.blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  StorageLevel.DISK_ONLY,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+  migrating match {
+case Some(shuffleMap) =>
+  logError(s"Error ${e} during migration, adding ${shuffleMap} 
back to migration queue")
+  shufflesToMigrate.add(shuffleMap)
+case None =>
+  logError(s"Error ${e} while waiting for block to migrate")
+  }
+  }
+}
+  }
+
+  // Shuffles which are either in queue for migrations or migrated
+  private val migratingShuffles = mutable.HashSet[(Int, Long)]()
+
+  // Shuffles which are queued for migration
+  private[storage] val shufflesToMigrate =
+new java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()
+
+  @volatile private var stopped = false
+
+  private val migrationPeers =
+mutable.HashMap[BlockManagerId, (ShuffleMigrationRunnable, 
ExecutorService)]()
+
+  private lazy val blockMigrationExecutor =
+ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission")
+
+  private val blockMigrationRunnable = new Runnable {
+val sleepInterval = 
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
+
+override def run(): Unit = {
+  var failures = 0
+   

[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-12 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r439384531



##
File path: core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.client.StreamCallbackWithID
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.storage.BlockId
+
+/**
+ * :: Experimental ::
+ * An experimental trait to allow Spark to migrate shuffle blocks.
+ */
+@Experimental
+trait MigratableResolver {
+  /**
+   * Get the shuffle ids that are stored locally. Used for block migrations.

Review comment:
   Sure, sounds reasonable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-10 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r438436447



##
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.concurrent.duration._
+
+import org.mockito.{ArgumentMatchers => mc}
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.scalatest._
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.internal.config
+import org.apache.spark.network.BlockTransferService
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.shuffle.MigratableResolver
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+
+class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
+
+  private val bmPort = 12345
+
+  private val sparkConf = new SparkConf(false)
+.set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, true)
+.set(config.STORAGE_RDD_DECOMMISSION_ENABLED, true)
+
+  private def registerShuffleBlocks(
+  mockMigratableShuffleResolver: MigratableResolver,
+  ids: Set[(Int, Long, Int)]): Unit = {
+
+when(mockMigratableShuffleResolver.getStoredShuffles())
+  .thenReturn(ids.map(triple => (triple._1, triple._2)).toSet)
+
+ids.foreach { case (shuffleId: Int, mapId: Long, reduceId: Int) =>
+  when(mockMigratableShuffleResolver.getMigrationBlocks(mc.any(), 
mc.any()))
+.thenReturn(List(
+  (ShuffleIndexBlockId(shuffleId, mapId, reduceId), 
mock(classOf[ManagedBuffer])),
+  (ShuffleDataBlockId(shuffleId, mapId, reduceId), 
mock(classOf[ManagedBuffer]
+}
+  }
+
+  test("test shuffle and cached rdd migration without any error") {
+val blockTransferService = mock(classOf[BlockTransferService])
+val bm = mock(classOf[BlockManager])
+
+val storedBlockId1 = RDDBlockId(0, 0)
+val storedBlock1 =
+  new ReplicateBlock(storedBlockId1, Seq(BlockManagerId("replicaHolder", 
"host1", bmPort)), 1)
+
+val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+when(bm.getPeers(mc.any()))
+  .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+when(bm.blockTransferService).thenReturn(blockTransferService)
+when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+when(bm.getMigratableRDDBlocks())
+  .thenReturn(Seq(storedBlock1))
+
+val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm)
+
+bmDecomManager.start()

Review comment:
   Note to self: I need to come back and add the matching stop here so we 
don't leak the thread (found it when working on my follow on work).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-10 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r438269803



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissionManager.scala
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.MigratableResolver
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissionManager(

Review comment:
   Yeah that's a good name





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-10 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r438269652



##
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.concurrent.duration._
+
+import org.mockito.{ArgumentMatchers => mc}
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.scalatest._
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.internal.config
+import org.apache.spark.network.BlockTransferService
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.shuffle.MigratableResolver
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+
+class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
+
+  private val bmPort = 12345
+
+  private val sparkConf = new SparkConf(false)
+.set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, true)
+.set(config.STORAGE_RDD_DECOMMISSION_ENABLED, true)
+
+  private def registerShuffleBlocks(
+  mockMigratableShuffleResolver: MigratableResolver,
+  ids: Set[(Int, Long, Int)]): Unit = {
+
+when(mockMigratableShuffleResolver.getStoredShuffles())
+  .thenReturn(ids.map(triple => (triple._1, triple._2)).toSet)
+
+ids.foreach { case (shuffleId: Int, mapId: Long, reduceId: Int) =>
+  when(mockMigratableShuffleResolver.getMigrationBlocks(mc.any(), 
mc.any()))
+.thenReturn(List(
+  (ShuffleIndexBlockId(shuffleId, mapId, reduceId), 
mock(classOf[ManagedBuffer])),
+  (ShuffleDataBlockId(shuffleId, mapId, reduceId), 
mock(classOf[ManagedBuffer]
+}
+  }
+
+  test("test shuffle and cached rdd migration without any error") {
+val blockTransferService = mock(classOf[BlockTransferService])
+val bm = mock(classOf[BlockManager])
+
+val storedBlockId1 = RDDBlockId(0, 0)
+val storedBlock1 =
+  new ReplicateBlock(storedBlockId1, Seq(BlockManagerId("replicaHolder", 
"host1", bmPort)), 1)
+
+val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+when(bm.getPeers(mc.any()))
+  .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+when(bm.blockTransferService).thenReturn(blockTransferService)
+when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+when(bm.getMigratableRDDBlocks())
+  .thenReturn(Seq(storedBlock1))
+
+val bmDecomManager = new BlockManagerDecommissionManager(
+  sparkConf,
+  bm)
+
+bmDecomManager.start()
+
+eventually(timeout(5.second), interval(10.milliseconds)) {
+  assert(bmDecomManager.shufflesToMigrate.isEmpty == true)
+  verify(bm, times(1)).replicateBlock(
+mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3)))
+  verify(blockTransferService, times(2))
+.uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), 
mc.any(), mc.any(),
+  mc.eq(StorageLevel.DISK_ONLY), mc.isNull())

Review comment:
   So no because the initial blockset can also be empty if we haven't done 
a run through of the decommissioning code. I've got a follow on PR that tracks 
migration being completed we can use and then get rid of the eventually. But 
for now I have spurious test failures if I don't retry the whole block.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-05 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r436071464



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -1790,6 +1813,107 @@ private[spark] class BlockManager(
 }
   }
 
+
+  // Shuffles which are either in queue for migrations or migrated
+  private val migratingShuffles = mutable.HashSet[(Int, Long)]()
+  // Shuffles which are queued for migration
+  private val shufflesToMigrate = new 
java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()
+
+
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[(Int, Long)] = None
+  val storageLevel = StorageLevel(
+useDisk = true,
+useMemory = false,
+useOffHeap = false,
+deserialized = false,
+replication = 1)
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some((shuffleId, mapId)) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to 
${peer}")
+  val blocks =
+migratableResolver.getMigrationBlocks(shuffleId, mapId)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  storageLevel,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+  migrating match {
+case Some(shuffleMap) =>
+  logError("Error ${e} during migration, adding ${shuffleMap} back 
to migration queue")
+  shufflesToMigrate.add(shuffleMap)
+case None =>
+  logError("Error ${e} while waiting for block to migrate")
+  }
+  }
+}
+  }
+
+  private val migrationPeers = mutable.HashMap[BlockManagerId, 
ShuffleMigrationRunnable]()
+
+  /**
+   * Tries to offload all shuffle blocks that are registered with the shuffle 
service locally.
+   * Note: this does not delete the shuffle files in-case there is an 
in-progress fetch
+   * but rather shadows them.
+   * Requires an Indexed based shuffle resolver.
+   */
+  def offloadShuffleBlocks(): Unit = {
+// Update the queue of shuffles to be migrated
+logInfo("Offloading shuffle blocks")
+val localShuffles = migratableResolver.getStoredShuffles()
+logInfo(s"My local shuffles are ${localShuffles.toList}")
+val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq
+logInfo(s"My new shuffles to migrate ${newShufflesToMigrate.toList}")
+shufflesToMigrate.addAll(newShufflesToMigrate.asJava)
+migratingShuffles ++= newShufflesToMigrate
+
+// Update the threads doing migrations
+// TODO: Sort & only start as many threads as min(||blocks||, ||targets||) 
using location pref
+val livePeerSet = getPeers(false).toSet
+val currentPeerSet = migrationPeers.keys.toSet
+val deadPeers = currentPeerSet.&~(livePeerSet)
+val newPeers = livePeerSet.&~(currentPeerSet)
+migrationPeers ++= newPeers.map{peer =>
+  logDebug(s"Starting thread to migrate shuffle blocks to ${peer}")
+  val executor = 
ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}")
+  val runnable = new ShuffleMigrationRunnable(peer)
+  executor.submit(runnable)

Review comment:
   I don't think we need that change specifically, right now setting 
`running` to false seems to do the job.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-04 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r435575853



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -1790,6 +1822,108 @@ private[spark] class BlockManager(
 }
   }
 
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[(Int, Long)] = None
+  val storageLevel = StorageLevel(
+useDisk = true,
+useMemory = false,
+useOffHeap = false,
+deserialized = false,
+replication = 1)
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some((shuffleId, mapId)) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to 
${peer}")
+  val blocks =
+migratableResolver.getMigrationBlocks(shuffleId, mapId)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  storageLevel,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+  migrating match {
+case Some(shuffleMap) =>
+  logError("Error ${e} during migration, adding ${shuffleMap} back 
to migration queue")
+  shufflesToMigrate.add(shuffleMap)
+case None =>
+  logError(s"Error ${e} while waiting for block to migrate")
+  }
+  }
+}
+  }
+
+  private val migrationPeers = mutable.HashMap[BlockManagerId, 
ShuffleMigrationRunnable]()
+
+  /**
+   * Tries to offload all shuffle blocks that are registered with the shuffle 
service locally.
+   * Note: this does not delete the shuffle files in-case there is an 
in-progress fetch
+   * but rather shadows them.
+   * Requires an Indexed based shuffle resolver.
+   */
+  def offloadShuffleBlocks(): Unit = {
+// Update the queue of shuffles to be migrated
+logInfo("Offloading shuffle blocks")
+val localShuffles = migratableResolver.getStoredShuffles()
+logInfo(s"My local shuffles are ${localShuffles.toList}")
+val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq

Review comment:
   This is for computing the change needed, readability isn't a big concern.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-04 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r435575539



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -1790,6 +1822,108 @@ private[spark] class BlockManager(
 }
   }
 
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[(Int, Long)] = None
+  val storageLevel = StorageLevel(
+useDisk = true,
+useMemory = false,
+useOffHeap = false,
+deserialized = false,
+replication = 1)
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some((shuffleId, mapId)) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to 
${peer}")
+  val blocks =
+migratableResolver.getMigrationBlocks(shuffleId, mapId)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  storageLevel,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+  migrating match {
+case Some(shuffleMap) =>
+  logError("Error ${e} during migration, adding ${shuffleMap} back 
to migration queue")
+  shufflesToMigrate.add(shuffleMap)
+case None =>
+  logError(s"Error ${e} while waiting for block to migrate")
+  }
+  }
+}
+  }
+
+  private val migrationPeers = mutable.HashMap[BlockManagerId, 
ShuffleMigrationRunnable]()
+
+  /**
+   * Tries to offload all shuffle blocks that are registered with the shuffle 
service locally.
+   * Note: this does not delete the shuffle files in-case there is an 
in-progress fetch
+   * but rather shadows them.
+   * Requires an Indexed based shuffle resolver.
+   */
+  def offloadShuffleBlocks(): Unit = {
+// Update the queue of shuffles to be migrated
+logInfo("Offloading shuffle blocks")
+val localShuffles = migratableResolver.getStoredShuffles()
+logInfo(s"My local shuffles are ${localShuffles.toList}")

Review comment:
   Looking at it not I think I'll just take it out, was useful while I was 
doing dev but shouldn't be needed for any operations stuff. Good call on it 
maybe being too long in production environments.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-04 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r435575094



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -1790,6 +1822,108 @@ private[spark] class BlockManager(
 }
   }
 
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[(Int, Long)] = None
+  val storageLevel = StorageLevel(
+useDisk = true,
+useMemory = false,
+useOffHeap = false,
+deserialized = false,
+replication = 1)
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some((shuffleId, mapId)) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to 
${peer}")
+  val blocks =
+migratableResolver.getMigrationBlocks(shuffleId, mapId)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  storageLevel,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")
+  }
+}
+// This catch is intentionally outside of the while running block.
+// if we encounter errors migrating to an executor we want to stop.
+  } catch {
+case e: Exception =>
+  migrating match {
+case Some(shuffleMap) =>
+  logError("Error ${e} during migration, adding ${shuffleMap} back 
to migration queue")
+  shufflesToMigrate.add(shuffleMap)
+case None =>
+  logError(s"Error ${e} while waiting for block to migrate")
+  }
+  }
+}
+  }
+
+  private val migrationPeers = mutable.HashMap[BlockManagerId, 
ShuffleMigrationRunnable]()
+
+  /**
+   * Tries to offload all shuffle blocks that are registered with the shuffle 
service locally.
+   * Note: this does not delete the shuffle files in-case there is an 
in-progress fetch
+   * but rather shadows them.
+   * Requires an Indexed based shuffle resolver.
+   */
+  def offloadShuffleBlocks(): Unit = {
+// Update the queue of shuffles to be migrated
+logInfo("Offloading shuffle blocks")
+val localShuffles = migratableResolver.getStoredShuffles()

Review comment:
   No, if we get a class cast exception we want to bubble it up because 
there isn't anything we can do in that situation besides report it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-04 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r43809



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -1790,6 +1822,108 @@ private[spark] class BlockManager(
 }
   }
 
+  private class ShuffleMigrationRunnable(peer: BlockManagerId) extends 
Runnable {
+@volatile var running = true
+override def run(): Unit = {
+  var migrating: Option[(Int, Long)] = None
+  val storageLevel = StorageLevel(
+useDisk = true,
+useMemory = false,
+useOffHeap = false,
+deserialized = false,
+replication = 1)
+  logInfo(s"Starting migration thread for ${peer}")
+  // Once a block fails to transfer to an executor stop trying to transfer 
more blocks
+  try {
+while (running) {
+  val migrating = Option(shufflesToMigrate.poll())
+  migrating match {
+case None =>
+  logInfo("Nothing to migrate")
+  // Nothing to do right now, but maybe a transfer will fail or a 
new block
+  // will finish being committed.
+  val SLEEP_TIME_SECS = 1
+  Thread.sleep(SLEEP_TIME_SECS * 1000L)
+case Some((shuffleId, mapId)) =>
+  logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to 
${peer}")
+  val blocks =
+migratableResolver.getMigrationBlocks(shuffleId, mapId)
+  logInfo(s"Got migration sub-blocks ${blocks}")
+  blocks.foreach { case (blockId, buffer) =>
+logInfo(s"Migrating sub-block ${blockId}")
+blockTransferService.uploadBlockSync(
+  peer.host,
+  peer.port,
+  peer.executorId,
+  blockId,
+  buffer,
+  storageLevel,
+  null)// class tag, we don't need for shuffle
+logInfo(s"Migrated sub block ${blockId}")
+  }
+  logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}")

Review comment:
   We don't delete the file from the current host right away. Once the 
BlockUpdate message is processed on the master it will go to the peer it has 
been migrated to.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-04 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r435447040



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1725,6 +1725,16 @@ class SparkContext(config: SparkConf) extends Logging {
 }
   }
 
+
+  private[spark] def decommissionExecutors(executorIds: Seq[String]): Unit = {
+schedulerBackend match {
+  case b: CoarseGrainedSchedulerBackend =>
+executorIds.foreach(b.decommissionExecutor)
+  case _ =>
+logWarning("Decommissioning executors is not supported by current 
scheduler.")

Review comment:
   Sure :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-04 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r435446889



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -479,6 +497,16 @@ private[spark] class MapOutputTrackerMaster(
 }
   }
 
+  def updateMapOutput(shuffleId: Int, mapId: Long, bmAddress: BlockManagerId): 
Unit = {
+shuffleStatuses.get(shuffleId) match {
+  case Some(shuffleStatus) =>
+shuffleStatus.updateMapOutput(mapId, bmAddress)
+shuffleStatus.invalidateSerializedMapOutputStatusCache()

Review comment:
   Good catch, I'll drop it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-04 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r435425845



##
File path: 
core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala
##
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+class RDDBlockSavedOnDecommissionedBlockManagerException(blockId: RDDBlockId)

Review comment:
   Sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-04 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r435425022



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -650,6 +657,19 @@ private[spark] class BlockManager(
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[_]): StreamCallbackWithID = {
+// Delegate shuffle blocks here to resolver if supported
+if (blockId.isShuffle || blockId.isInternalShuffle) {

Review comment:
   So given that the goal is supporting more than just Index shuffles I 
don't agree. If it was an entire chunk of code ok maybe, but it's just a 
boolean to allow other implementations outside of core Spark





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-03 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r434896155



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -650,6 +657,19 @@ private[spark] class BlockManager(
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[_]): StreamCallbackWithID = {
+// Delegate shuffle blocks here to resolver if supported

Review comment:
   Cool :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-03 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r434895820



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -650,6 +657,19 @@ private[spark] class BlockManager(
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[_]): StreamCallbackWithID = {
+// Delegate shuffle blocks here to resolver if supported
+if (blockId.isShuffle || blockId.isInternalShuffle) {

Review comment:
   It is for the current implementation of the trait, but if someone else 
implemented this for something beyond IndexShuffle then it might not be.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org