mridulm commented on a change in pull request #31876:
URL: https://github.com/apache/spark/pull/31876#discussion_r608445517



##########
File path: core/src/main/scala/org/apache/spark/TaskEndReason.scala
##########
@@ -81,7 +81,7 @@ case object Resubmitted extends TaskFailedReason {
  */
 @DeveloperApi
 case class FetchFailed(
-    bmAddress: BlockManagerId,  // Note that bmAddress can be null
+    bmAddress: Location,  // Note that bmAddress can be null

Review comment:
       This is a backwardly incompatible change, which also impacts event files.
   Unfortunately it looks like most folks who worked on this in past are no 
longer active.
   +CC @tgravescs, @dongjoon-hyun for additional review.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -161,7 +162,7 @@ private class ShuffleStatus(numPartitions: Int) extends 
Logging {
    */
   def removeOutputsOnHost(host: String): Unit = withWriteLock {
     logDebug(s"Removing outputs for host ${host}")
-    removeOutputsByFilter(x => x.host == host)
+    removeOutputsByFilter(x => x.asInstanceOf[BlockManagerId].host == host)

Review comment:
       This is making assumption about the type - while it would work currently 
with existing implementation, how is this going to work going forward ?
   
   (Same comment for all other methods where `asInstanceOf[BlockManagerId]` is 
being done).

##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -52,8 +53,20 @@ private[spark] sealed trait MapStatus {
    * partitionId of the task or taskContext.taskAttemptId is used.
    */
   def mapId: Long
-}
 
+  protected def loadLocation(in: ObjectInput): Location = {
+    val conf = SparkEnv.get.conf
+    conf.get(config.SHUFFLE_LOCATION_PLUGIN_CLASS).map { locClass =>
+      val loc = Utils.loadExtensions(
+        classOf[Location],
+        Seq(locClass),
+        conf
+      ).head
+      loc.readExternal(in)
+      loc

Review comment:
       Do we want to explore a `LocationFactory` in addition to `Location` - 
which is used here ?
   `loadExtensions` is not cheap, and doing this for every instance creation is 
going to be very expensive.
   
   This will also allow for implementations to cache `Location` - else we will 
have GC issues at driver.

##########
File path: core/src/main/java/org/apache/spark/shuffle/api/Location.java
##########
@@ -0,0 +1,28 @@
+package org.apache.spark.shuffle.api;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.Externalizable;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+
+/**
+ * :: Private ::
+ * An interface for plugging in the location of shuffle files, in order to 
support store shuffle
+ * data in different storage, e.g., BlockManager, HDFS, S3. It would be 
generated by
+ * {@link ShuffleMapOutputWriter} after writing a shuffle data file and used 
by ShuffleMapOutputReader
+ * to read the shuffle data.
+ *
+ * Since the location is returned by {@link 
ShuffleMapOutputWriter#commitAllPartitions()} at executor
+ * and would be sent to driver, users must ensure the location is serializable 
by
+ *
+ *  - implement a 0-arg constructor
+ *  - implement {@link java.io.Externalizable#readExternal(ObjectInput)} for 
deserialization
+ *  - implement {@link java.io.Externalizable#writeExternal(ObjectOutput)} for 
serialization
+ *

Review comment:
       and `hashCode` (given use as keys in `Map`'s)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to