mridulm commented on a change in pull request #31876:
URL: https://github.com/apache/spark/pull/31876#discussion_r608445517
##########
File path: core/src/main/scala/org/apache/spark/TaskEndReason.scala
##########
@@ -81,7 +81,7 @@ case object Resubmitted extends TaskFailedReason {
*/
@DeveloperApi
case class FetchFailed(
- bmAddress: BlockManagerId, // Note that bmAddress can be null
+ bmAddress: Location, // Note that bmAddress can be null
Review comment:
This is a backwardly incompatible change, which also impacts event files.
Unfortunately it looks like most folks who worked on this in past are no
longer active.
+CC @tgravescs, @dongjoon-hyun for additional review.
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -161,7 +162,7 @@ private class ShuffleStatus(numPartitions: Int) extends
Logging {
*/
def removeOutputsOnHost(host: String): Unit = withWriteLock {
logDebug(s"Removing outputs for host ${host}")
- removeOutputsByFilter(x => x.host == host)
+ removeOutputsByFilter(x => x.asInstanceOf[BlockManagerId].host == host)
Review comment:
This is making assumption about the type - while it would work currently
with existing implementation, how is this going to work going forward ?
(Same comment for all other methods where `asInstanceOf[BlockManagerId]` is
being done).
##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -52,8 +53,20 @@ private[spark] sealed trait MapStatus {
* partitionId of the task or taskContext.taskAttemptId is used.
*/
def mapId: Long
-}
+ protected def loadLocation(in: ObjectInput): Location = {
+ val conf = SparkEnv.get.conf
+ conf.get(config.SHUFFLE_LOCATION_PLUGIN_CLASS).map { locClass =>
+ val loc = Utils.loadExtensions(
+ classOf[Location],
+ Seq(locClass),
+ conf
+ ).head
+ loc.readExternal(in)
+ loc
Review comment:
Do we want to explore a `LocationFactory` in addition to `Location` -
which is used here ?
`loadExtensions` is not cheap, and doing this for every instance creation is
going to be very expensive.
This will also allow for implementations to cache `Location` - else we will
have GC issues at driver.
##########
File path: core/src/main/java/org/apache/spark/shuffle/api/Location.java
##########
@@ -0,0 +1,28 @@
+package org.apache.spark.shuffle.api;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.Externalizable;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+
+/**
+ * :: Private ::
+ * An interface for plugging in the location of shuffle files, in order to
support store shuffle
+ * data in different storage, e.g., BlockManager, HDFS, S3. It would be
generated by
+ * {@link ShuffleMapOutputWriter} after writing a shuffle data file and used
by ShuffleMapOutputReader
+ * to read the shuffle data.
+ *
+ * Since the location is returned by {@link
ShuffleMapOutputWriter#commitAllPartitions()} at executor
+ * and would be sent to driver, users must ensure the location is serializable
by
+ *
+ * - implement a 0-arg constructor
+ * - implement {@link java.io.Externalizable#readExternal(ObjectInput)} for
deserialization
+ * - implement {@link java.io.Externalizable#writeExternal(ObjectOutput)} for
serialization
+ *
Review comment:
and `hashCode` (given use as keys in `Map`'s)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]