liuzqt commented on code in PR #38064:
URL: https://github.com/apache/spark/pull/38064#discussion_r999794248
##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -659,9 +659,10 @@ private[spark] class Executor(
val accumUpdates = task.collectAccumulatorUpdates()
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
// TODO: do not serialize value twice
- val directResult = new DirectTaskResult(valueBytes, accumUpdates,
metricPeaks)
- val serializedDirectResult = ser.serialize(directResult)
- val resultSize = serializedDirectResult.limit()
+ val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates,
metricPeaks)
+ val serializedDirectResult =
SerializerHelper.serializeToChunkedBuffer(ser, directResult,
+ valueByteBuffer.size)
Review Comment:
This topic is also related to the `estimatedSize >> 3` in the chunk size
estimation. Since the estimated size is hard to be accurate anyway, so if we
leave the chunk size to be exactly the `estimatedSize`, we're potentially
wasting a large chunk (for example, the estimatedSize is 8,000, while the exact
serialized size is 8,001, we're mostly wasting the last chunk of size 8000 and
only contain 1 useful byte).
And dividing the chunks to size of `estimatedSize / k`, we're having a
reasonably lower upper bound of the overhead(we're only wasting the last chunk
of size 1000)
Given this estimation heuristic, we don't have to be super accurate(actually
hardly possible to be exact) so using `valueByteBuffer.size` here for simplicity
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]