JoshRosen opened a new pull request, #37110:
URL: https://github.com/apache/spark/pull/37110
### What changes were proposed in this pull request?
This patch aims to reduce the memory overhead of
`TransportCipher$EncryptedMessage`. In the current code, the `EncryptedMessage`
constructor eagerly initializes a `ByteArrayWritableChannel byteRawChannel`
(which consumes ~32kb of memory). If there are many `EncryptedMessage`
instances on the heap (e.g. because there is a long queue of outgoing messages
on a channel) then this overhead adds up and can cause OOMs or GC problems.
In SPARK-24801 / #21811 we fixed a similar issue in `SaslEncryption`. There,
the fix was to lazily initialize the buffer: the buffer isn't actually accessed
before `transferTo()` is called (and is only used there), so lazily
initializing it there reduces memory requirements for queued outgoing messages.
In principle we could apply a similar lazy initialization fix here. In this
PR, however, I have taken a different approach: I construct a single shared
`ByteArrayWritableChannel byteRawChannel` in
`TransportChannel$EncryptionHandler` and pass that shared instance to the
`EncryptedMessage` constructor. I believe that this is safe because we are
already doing this for the `byteEncChannel` channel buffer. That shared
`byteEncChannel` gets `reset()` when `EncryptedMessage.deallocate()` is called.
If we assume that existing sharing is correct then I think it's okay to apply
similar sharing of the `byteRawChannel` buffer because its scope of use and
lifecycle is similar.
### Why are the changes needed?
Improve performance and reduce a source of OOMs when encryption is enabled.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
**Correctness**: Existing unit tests.
**PerformanceI**: observed memory usage and performance improvements by
running an artificial workload that significantly stresses the shuffle sending
path. On a two-host Spark Standalone cluster where each host had an external
shuffle service (with 1gb heap) and a 64-core executor, I ran the following
code:
```scala
val numMapTasks = 25000
val numReduceTasks = 256
val random = new java.util.Random()
val inputData = spark.range(1, numMapTasks * numReduceTasks, 1,
numMapTasks).map { x =>
val bytes = new Array[Byte](10 * 1024)
random.nextBytes(bytes)
bytes
}
inputData.repartition(numReduceTasks).write.mode("overwrite").format("noop").save()
```
Prior to this patch, this job reliably failed because the Worker (where the
shuffle service runs) would fill its heap and go into long GC pauses,
eventually causing it to become disassociated from the Master. After this
patch's changes, this job smoothly runs to completion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]