attilapiros commented on a change in pull request #28331:
URL: https://github.com/apache/spark/pull/28331#discussion_r419477550
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1786,6 +1808,105 @@ private[spark] class BlockManager(
}
}
+
+ // Shuffles which are either in queue for migrations or migrated
+ private val migratingShuffles = mutable.HashSet[(Int, Long)]()
+ // Shuffles which are queued for migration
+ private val shufflesToMigrate = new
java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()
+
+
+ private class ShuffleMigrationRunnable(peer: BlockManagerId) extends
Runnable {
+ @volatile var running = true
+ override def run(): Unit = {
+ var migrating: Option[(Int, Long)] = None
+ val storageLevel = StorageLevel(
+ useDisk = true,
+ useMemory = false,
+ useOffHeap = false,
+ deserialized = false,
+ replication = 1)
+ // Once a block fails to transfer to an executor stop trying to transfer
more blocks
+ try {
+ while (running) {
+ val migrating = Option(shufflesToMigrate.poll())
+ migrating match {
+ case None =>
+ // Nothing to do right now, but maybe a transfer will fail or a
new block
+ // will finish being committed.
+ val SLEEP_TIME_SECS = 5
+ Thread.sleep(SLEEP_TIME_SECS * 1000L)
+ case Some((shuffleId, mapId)) =>
+ logInfo(s"Trying to migrate ${shuffleId},${mapId} to ${peer}")
+ val ((indexBlockId, indexBuffer), (dataBlockId, dataBuffer)) =
+ indexShuffleResolver.getMigrationBlocks(shuffleId, mapId)
+ blockTransferService.uploadBlockSync(
+ peer.host,
+ peer.port,
+ peer.executorId,
+ indexBlockId,
+ indexBuffer,
+ storageLevel,
+ null)// class tag, we don't need for shuffle
+ blockTransferService.uploadBlockSync(
+ peer.host,
+ peer.port,
+ peer.executorId,
+ dataBlockId,
+ dataBuffer,
+ storageLevel,
+ null)// class tag, we don't need for shuffle
+ }
+ }
+ } catch {
+ case e: Exception =>
+ migrating match {
+ case Some(shuffleMap) =>
+ logError("Error ${e} during migration, adding ${shuffleMap} back
to migration queue")
+ shufflesToMigrate.add(shuffleMap)
+ case None =>
+ logError("Error ${e} while waiting for block to migrate")
+ }
+ }
+ }
+ }
+
+ private val migrationPeers = mutable.HashMap[BlockManagerId,
ShuffleMigrationRunnable]()
+
+ /**
+ * Tries to offload all shuffle blocks that are registered with the shuffle
service locally.
+ * Note: this does not delete the shuffle files in-case there is an
in-progress fetch
+ * but rather shadows them.
+ * Requires an Indexed based shuffle resolver.
+ */
+ def offloadShuffleBlocks(): Unit = {
+ // Update the queue of shuffles to be migrated
+ logDebug("Offloading shuffle blocks")
+ val localShuffles = indexShuffleResolver.getStoredShuffles()
+ logDebug(s"My local shuffles are ${localShuffles.toList}")
+ val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq
+ logDebug(s"My new shuffles to migrate ${newShufflesToMigrate.toList}")
+ shufflesToMigrate.addAll(newShufflesToMigrate.asJava)
+ migratingShuffles ++= newShufflesToMigrate
+
+ // Update the threads doing migrations
+ // TODO: Sort & only start as many threads as min(||blocks||, ||targets||)
using location pref
+ val livePeerSet = getPeers(false).toSet
+ val currentPeerSet = migrationPeers.keys.toSet
+ val deadPeers = currentPeerSet.&~(livePeerSet)
+ val newPeers = livePeerSet.&~(currentPeerSet)
+ migrationPeers ++= newPeers.map{peer =>
+ logDebug(s"Starting thread to migrate shuffle blocks to ${peer}")
+ val executor =
ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}")
+ val runnable = new ShuffleMigrationRunnable(peer)
+ executor.submit(runnable)
+ (peer, runnable)
+ }
+ // A peer may have entered a decommissioning state, don't transfer any new
blocks
+ deadPeers.map{peer =>
+ migrationPeers.get(peer).map(_.running = false)
Review comment:
in both lines use `foreach` instead of `map`
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -148,6 +170,86 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ /**
+ * Write a provided shuffle block as a stream. Used for block migrations.
+ * ShuffleBlockBatchIds must contain the full range represented in the
ShuffleIndexBlock.
+ * Requires the caller to delete any shuffle index blocks where the shuffle
block fails to
+ * put.
+ */
+ def putShuffleBlockAsStream(blockId: BlockId, serializerManager:
SerializerManager):
+ StreamCallbackWithID = {
+ val file = blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ getIndexFile(shuffleId, mapId)
+ case ShuffleBlockBatchId(shuffleId, mapId, _, _) =>
+ getDataFile(shuffleId, mapId)
+ case _ =>
+ throw new Exception(s"Unexpected shuffle block transfer ${blockId}")
+ }
+ val fileTmp = Utils.tempFileWith(file)
+ val channel = Channels.newChannel(
+ serializerManager.wrapStream(blockId,
+ new FileOutputStream(fileTmp)))
+
+ new StreamCallbackWithID {
+
+ override def getID: String = blockId.name
+
+ override def onData(streamId: String, buf: ByteBuffer): Unit = {
+ while (buf.hasRemaining) {
+ channel.write(buf)
+ }
+ }
+
+ override def onComplete(streamId: String): Unit = {
+ logTrace(s"Done receiving block $blockId, now putting into local
shuffle service")
+ channel.close()
+ val diskSize = fileTmp.length()
+ this.synchronized {
+ if (file.exists()) {
+ file.delete()
+ }
+ if (!fileTmp.renameTo(file)) {
+ throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
+ }
+ }
+ blockManager.reportBlockStatus(blockId, BlockStatus(
+ StorageLevel(
+ useDisk = true,
+ useMemory = false,
+ useOffHeap = false,
+ deserialized = false,
+ replication = 0)
+ , 0, diskSize))
+ }
+
+ override def onFailure(streamId: String, cause: Throwable): Unit = {
+ // the framework handles the connection itself, we just need to do
local cleanup
+ channel.close()
+ fileTmp.delete()
+ }
+ }
+ }
+
+ /**
+ * Get the index & data block for migration.
+ */
+ def getMigrationBlocks(shuffleId: Int, mapId: Long):
+ ((BlockId, ManagedBuffer), (BlockId, ManagedBuffer)) = {
+ // Load the index block
+ val indexFile = getIndexFile(shuffleId, mapId)
+ val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0)
Review comment:
Use `NOOP_REDUCE_ID` instead of 0
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -650,6 +656,19 @@ private[spark] class BlockManager(
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID = {
+ // Delegate shuffle blocks here to resolver if supported
+ if (blockId.isShuffle || blockId.isInternalShuffle) {
Review comment:
The `(blockId.isShuffle || blockId.isInternalShuffle)` covers:
- `ShuffleBlockId`
- `ShuffleBlockBatchId`
- `ShuffleDataBlockId`
- `ShuffleIndexBlockId`
But the `putShuffleBlockAsStream()` is prepared for only these two:
- `ShuffleIndexBlockId`
https://github.com/apache/spark/blob/be2a5e736e051ca0497906b2a2e904c7b4033596/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala#L182
- `ShuffleBlockBatchId`
https://github.com/apache/spark/blob/be2a5e736e051ca0497906b2a2e904c7b4033596/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala#L184
And as I know shuffle block batch IDs are used for fetching blocks where for
the same map ID continuous reducer IDs are requested. To merge those continues
block ids together and fetch them with one request so I assume they are not
what we need here.
Finally the `getMigrationBlocks()` returns only a `ShuffleIndexBlockId ` and
a `ShuffleDataBlockId` which is correct.
We should harmonise three places (methods) regarding the block IDs they are
handling: use the `isInternalShuffle` within this condition and fix
`putShuffleBlockAsStream()`.
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -148,6 +170,86 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ /**
+ * Write a provided shuffle block as a stream. Used for block migrations.
+ * ShuffleBlockBatchIds must contain the full range represented in the
ShuffleIndexBlock.
+ * Requires the caller to delete any shuffle index blocks where the shuffle
block fails to
+ * put.
+ */
+ def putShuffleBlockAsStream(blockId: BlockId, serializerManager:
SerializerManager):
+ StreamCallbackWithID = {
+ val file = blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ getIndexFile(shuffleId, mapId)
+ case ShuffleBlockBatchId(shuffleId, mapId, _, _) =>
+ getDataFile(shuffleId, mapId)
+ case _ =>
+ throw new Exception(s"Unexpected shuffle block transfer ${blockId}")
+ }
+ val fileTmp = Utils.tempFileWith(file)
+ val channel = Channels.newChannel(
+ serializerManager.wrapStream(blockId,
+ new FileOutputStream(fileTmp)))
+
+ new StreamCallbackWithID {
+
+ override def getID: String = blockId.name
+
+ override def onData(streamId: String, buf: ByteBuffer): Unit = {
+ while (buf.hasRemaining) {
+ channel.write(buf)
+ }
+ }
+
+ override def onComplete(streamId: String): Unit = {
+ logTrace(s"Done receiving block $blockId, now putting into local
shuffle service")
+ channel.close()
+ val diskSize = fileTmp.length()
+ this.synchronized {
+ if (file.exists()) {
+ file.delete()
+ }
+ if (!fileTmp.renameTo(file)) {
+ throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
+ }
+ }
+ blockManager.reportBlockStatus(blockId, BlockStatus(
+ StorageLevel(
+ useDisk = true,
+ useMemory = false,
+ useOffHeap = false,
+ deserialized = false,
+ replication = 0)
+ , 0, diskSize))
+ }
+
+ override def onFailure(streamId: String, cause: Throwable): Unit = {
+ // the framework handles the connection itself, we just need to do
local cleanup
+ channel.close()
+ fileTmp.delete()
+ }
+ }
+ }
+
+ /**
+ * Get the index & data block for migration.
+ */
+ def getMigrationBlocks(shuffleId: Int, mapId: Long):
+ ((BlockId, ManagedBuffer), (BlockId, ManagedBuffer)) = {
+ // Load the index block
+ val indexFile = getIndexFile(shuffleId, mapId)
+ val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0)
+ val indexFileSize = indexFile.length()
+ val indexBlockData = new FileSegmentManagedBuffer(transportConf,
indexFile, 0, indexFileSize)
+
+ // Load the data block
+ val dataFile = getDataFile(shuffleId, mapId)
+ val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, 0)
Review comment:
Use NOOP_REDUCE_ID instead of 0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]