Victsm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r498985394
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -181,6 +182,17 @@ public void onFailure(Throwable e) {
private void processStreamUpload(final UploadStream req) {
assert (req.body() == null);
try {
+ // Retain the original metadata buffer, since it will be used during the
invocation of
+ // this method. Will be released later.
+ req.meta.retain();
+ // Make a copy of the original metadata buffer. In benchmark, we noticed
that
+ // we cannot respond the original metadata buffer back to the client,
otherwise
+ // in cases where multiple concurrent shuffles are present, a wrong
metadata might
+ // be sent back to client. This is related to the eager release of the
metadata buffer,
+ // i.e., we always release the original buffer by the time the
invocation of this
+ // method ends, instead of by the time we respond it to the client. This
is necessary,
+ // otherwise we start seeing memory issues very quickly in benchmarks.
+ ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer());
Review comment:
I still do not want to change the `TransportClient#uploadStream` API
itself.
This transport layer utility was previously used for transferring large RDD
partition blocks, and now reused for doing shuffle block push.
In the future, it is possible that other use cases might benefit from this
utility as well.
I believe keeping this API generic and not specific to one use case is
important.
For the change you proposed to keep the `PushBlockStream` as metadata
tracked on the client side, I also thought about doing that during
implementation.
It's cleaner than the current approach.
One way to do this without incurring any potential protocol change would be
to make `BlockPushCallback` inside `OneForOneBlockPusher` stateful.
Currently, that callback is stateless, so multiple invocations to
`TransportClient#uploadStream` for the same batch of blocks would reuse the
same callback object.
If we make that callback object stateful, to keep track of the additional
metadata ManagedBuffer, then the callback object would have what we need built
into it during object creation.
My concern during the implementation was the potential JVM pressure this
approach might generate, since we will create one callback object per block to
be pushed.
What do you think?
Also CC @mridulm @tgravescs @squito @Ngone51 @jiangxb1987 for your inputs on
this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]