Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22404#discussion_r217154988
  
    --- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -193,19 +193,51 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
             val newBids = broadcastVars.map(_.id).toSet
             // number of different broadcasts
             val toRemove = oldBids.diff(newBids)
    -        val cnt = toRemove.size + newBids.diff(oldBids).size
    +        val addedBids = newBids.diff(oldBids)
    +        val cnt = toRemove.size + addedBids.size
    +        val needsDecryptionServer = 
env.serializerManager.encryptionEnabled && addedBids.nonEmpty
    +        dataOut.writeBoolean(needsDecryptionServer)
             dataOut.writeInt(cnt)
    -        for (bid <- toRemove) {
    -          // remove the broadcast from worker
    -          dataOut.writeLong(- bid - 1)  // bid >= 0
    -          oldBids.remove(bid)
    +        def sendBidsToRemove(): Unit = {
    +          for (bid <- toRemove) {
    +            // remove the broadcast from worker
    +            dataOut.writeLong(-bid - 1) // bid >= 0
    +            oldBids.remove(bid)
    +          }
             }
    -        for (broadcast <- broadcastVars) {
    -          if (!oldBids.contains(broadcast.id)) {
    +        if (needsDecryptionServer) {
    +          // if there is encryption, we setup a server which reads the 
encrypted files, and sends
    +          // the decrypted data to python
    +          val idsAndFiles = broadcastVars.flatMap { broadcast =>
    +            if (!oldBids.contains(broadcast.id)) {
    --- End diff --
    
    Nit: flip the if condition for clarity?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to