attilapiros commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r498714105



##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -181,6 +182,17 @@ public void onFailure(Throwable e) {
   private void processStreamUpload(final UploadStream req) {
     assert (req.body() == null);
     try {
+      // Retain the original metadata buffer, since it will be used during the 
invocation of
+      // this method. Will be released later.
+      req.meta.retain();
+      // Make a copy of the original metadata buffer. In benchmark, we noticed 
that
+      // we cannot respond the original metadata buffer back to the client, 
otherwise
+      // in cases where multiple concurrent shuffles are present, a wrong 
metadata might
+      // be sent back to client. This is related to the eager release of the 
metadata buffer,
+      // i.e., we always release the original buffer by the time the 
invocation of this
+      // method ends, instead of by the time we respond it to the client. This 
is necessary,
+      // otherwise we start seeing memory issues very quickly in benchmarks.
+      ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer());

Review comment:
       I have thought about this a lot and I do not think we need to send back 
the meta as I think it could be made easily available at the client side.  
   
   So the complete use case starts with sending of `UploadStream` by calling 
`TransportClient#uploadStream`. Here we are already store something (the 
callback) for each request ID which identify uniquely the request:
   
   
https://github.com/apache/spark/blob/7b48c50b88f43a548bb845ca994b9e8794ef593e/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java#L223
   
   From the meta what we really use is the index but even if you need the whole 
`PushBlockStream` as metadata you can store it these for the request ID. This 
way when RpcFailure comes back with the request ID we can simply use the stored 
data.
   
   I suggest to extend the `TransportClient#uploadStream` arguments with the 
necessary data (index) and store this data for the requestID NOT the meta as a 
`ManagedBuffer`. 
   This new parameter could be an object and null by default. 
   
   As `uploadStream` is used only two places in production this is not a huge 
change. 
   
   This way you can get rid not only the metadata send back but the 
`cloneBuffer` and ` JavaUtils.encodeHeaderIntoErrorString` and 
`BlockPushException#decodeException` methods too.
   
   
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to