JoshRosen opened a new pull request, #37110:
URL: https://github.com/apache/spark/pull/37110

   ### What changes were proposed in this pull request?
   
   This patch aims to reduce the memory overhead of 
`TransportCipher$EncryptedMessage`. In the current code, the `EncryptedMessage` 
constructor eagerly initializes a `ByteArrayWritableChannel byteRawChannel` 
(which consumes ~32kb of memory). If there are many `EncryptedMessage` 
instances on the heap (e.g. because there is a long queue of outgoing messages 
on a channel) then this overhead adds up and can cause OOMs or GC problems.
   
   In SPARK-24801 / #21811 we fixed a similar issue in `SaslEncryption`. There, 
the fix was to lazily initialize the buffer: the buffer isn't actually accessed 
before `transferTo()` is called (and is only used there), so lazily 
initializing it there reduces memory requirements for queued outgoing messages.
   
   In principle we could apply a similar lazy initialization fix here. In this 
PR, however, I have taken a different approach: I construct a single shared 
`ByteArrayWritableChannel byteRawChannel` in 
`TransportChannel$EncryptionHandler` and pass that shared instance to the 
`EncryptedMessage` constructor. I believe that this is safe because we are 
already doing this for the `byteEncChannel` channel buffer. That shared 
`byteEncChannel` gets `reset()` when `EncryptedMessage.deallocate()` is called. 
If we assume that existing sharing is correct then I think it's okay to apply 
similar sharing of the `byteRawChannel` buffer because its scope of use and 
lifecycle is similar.
   
   ### Why are the changes needed?
   
   Improve performance and reduce a source of OOMs when encryption is enabled.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   **Correctness**: Existing unit tests.
   
   **PerformanceI**: observed memory usage and performance improvements by 
running an artificial workload that significantly stresses the shuffle sending 
path. On a two-host Spark Standalone cluster where each host had an external 
shuffle service (with 1gb heap) and a 64-core executor, I ran the following 
code:
   
   ```scala
   val numMapTasks = 25000
   val numReduceTasks = 256
   val random = new java.util.Random()
   val inputData = spark.range(1, numMapTasks * numReduceTasks, 1, 
numMapTasks).map { x =>
     val bytes = new Array[Byte](10 * 1024)
     random.nextBytes(bytes)
     bytes
   }
   
inputData.repartition(numReduceTasks).write.mode("overwrite").format("noop").save()
   ```
   
   Prior to this patch, this job reliably failed because the Worker (where the 
shuffle service runs) would fill its heap and go into long GC pauses, 
eventually causing it to become disassociated from the Master. After this 
patch's changes, this job smoothly runs to completion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to