Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/3417#discussion_r20761166
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
---
@@ -824,3 +807,56 @@ private class PythonAccumulatorParam(@transient
serverHost: String, serverPort:
}
}
}
+
+/**
+ * An Wrapper for Python Broadcast, which is written into disk by Python.
It also will
+ * write the data into disk after deserialization, then Python can read it
from disks.
+ */
+private[spark] class PythonBroadcast(@transient var path: String)
+ extends Serializable with Closeable {
+
+ /**
+ * Read data from disks, then copy it to `out`
+ */
+ private def writeObject(out: ObjectOutputStream): Unit =
Utils.tryOrIOException {
+ val in = new FileInputStream(new File(path))
+ try {
+ Utils.copyStream(in, out)
+ } finally {
+ in.close()
+ }
+ }
+
+ /**
+ * Write data into disk, using randomly generated name.
+ * @param in
+ */
+ private def readObject(in: ObjectInputStream): Unit =
Utils.tryOrIOException {
+ val localDirs = SparkEnv.get.blockManager.diskBlockManager.localDirs
+ val random = new scala.util.Random(System.currentTimeMillis())
--- End diff --
Also, I think there might be a `Random.choice(collection)` method to return
a random entry from a list; this might be a little more succinct.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]