liuzqt commented on code in PR #38064:
URL: https://github.com/apache/spark/pull/38064#discussion_r991524342
##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -659,25 +660,27 @@ private[spark] class Executor(
val accumUpdates = task.collectAccumulatorUpdates()
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
// TODO: do not serialize value twice
- val directResult = new DirectTaskResult(valueBytes, accumUpdates,
metricPeaks)
- val serializedDirectResult = ser.serialize(directResult)
- val resultSize = serializedDirectResult.limit()
+ val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates,
metricPeaks)
+ val serializedDirectResult =
SerializerHelper.serializeToChunkedBuffer(ser, directResult)
+ val resultSize = serializedDirectResult.size
// directSend = sending directly back to the driver
- val serializedResult: ByteBuffer = {
+ val serializedResult: ChunkedByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName. Result is larger than
maxResultSize " +
s"(${Utils.bytesToString(resultSize)} >
${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
- ser.serialize(new
IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
+ SerializerHelper.serializeToChunkedBuffer(ser,
+ new IndirectTaskResult[Any](TaskResultBlockId(taskId),
resultSize))
Review Comment:
Good point. The `serializedDirectResult` in [line
668](https://github.com/apache/spark/pull/38064/files#diff-d7a989c491f3cb77cca02c701496a9e2a3443f70af73b0d1ab0899239f3a789dR668)
is the one that might exceed 2GB and we want to replace it with
`ChunkedByteBuffer`.
While the `serializedResult` here is always guaranteed to be smaller than
2GB in all cases
- larger than `maxResultSize`, dropping, sending a `IndirectTaskResult`
indication failure, which is small
- larger than `maxDirectResultSize`, sending a `IndirectTaskResult` with the
blockId, which is small
- directly send back to driver, which is always small than 2GB, guarded by
direct result size value check I added
[here](https://github.com/apache/spark/pull/38064/files#diff-888b19a7ec330288db98c7cb699b26bcb8667bb9eba2533019f9d67b597fab95R805)
So let's just use `ByteBuffer` for `serializedResult`.
Thanks for the point!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]