Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/22404#discussion_r217154988
--- Diff:
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -193,19 +193,51 @@ private[spark] abstract class BasePythonRunner[IN,
OUT](
val newBids = broadcastVars.map(_.id).toSet
// number of different broadcasts
val toRemove = oldBids.diff(newBids)
- val cnt = toRemove.size + newBids.diff(oldBids).size
+ val addedBids = newBids.diff(oldBids)
+ val cnt = toRemove.size + addedBids.size
+ val needsDecryptionServer =
env.serializerManager.encryptionEnabled && addedBids.nonEmpty
+ dataOut.writeBoolean(needsDecryptionServer)
dataOut.writeInt(cnt)
- for (bid <- toRemove) {
- // remove the broadcast from worker
- dataOut.writeLong(- bid - 1) // bid >= 0
- oldBids.remove(bid)
+ def sendBidsToRemove(): Unit = {
+ for (bid <- toRemove) {
+ // remove the broadcast from worker
+ dataOut.writeLong(-bid - 1) // bid >= 0
+ oldBids.remove(bid)
+ }
}
- for (broadcast <- broadcastVars) {
- if (!oldBids.contains(broadcast.id)) {
+ if (needsDecryptionServer) {
+ // if there is encryption, we setup a server which reads the
encrypted files, and sends
+ // the decrypted data to python
+ val idsAndFiles = broadcastVars.flatMap { broadcast =>
+ if (!oldBids.contains(broadcast.id)) {
--- End diff --
Nit: flip the if condition for clarity?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]