mridulm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494003389



##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {
+          ByteBuffer buffer;
+
+          OutputStream init(ByteBuffer buffer) {
+            this.buffer = buffer;
+            return this;
+          }
+
+          @Override
+          public void close() {
+          }
+
+          @Override
+          public void flush() {
+          }
+
+          @Override
+          public void write(int b) {
+            buffer.put((byte) b);
+          }
+
+          @Override
+          public void write(byte[] b) {
+            buffer.put(b);
+          }
+
+          @Override
+          public void write(byte[] b, int off, int l) {
+            buffer.put(b, off, l);
+          }
+        }.init(outBuffer)));
+      } catch (IOException e) {
+        throw new RuntimeException("Exception while encoding bitmap", e);
+      }

Review comment:
       Replace this with something more concise - for example see 
`UnsafeShuffleWriter.MyByteArrayOutputStream`.
   To illustrate, something like:
   ```
   MyBaos out = new MyBaos(b.serializedSizeInBytes());
   b.serialize(new DataOutputStream(out));
   int size = out.size();
   buf.writeInt(size);
   buf.writeBytes(out.getBuf(), 0, size);
   ```
   
   The last part could also be moved as `ByteArrays.encode(byte[] arr, int 
offset, int len)`

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();

Review comment:
       `BitmapArrays` results in calling `trim` and `runOptimize` twice - 
refactor so that it is only done once for this codepath ?

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
         public void onComplete(String streamId) throws IOException {
            try {
              streamHandler.onComplete(streamId);
-             callback.onSuccess(ByteBuffer.allocate(0));
+             callback.onSuccess(meta.duplicate());

Review comment:
       Can you add a comment on why we are making this change ? From sending 
empty buffer to meta.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -181,6 +182,17 @@ public void onFailure(Throwable e) {
   private void processStreamUpload(final UploadStream req) {
     assert (req.body() == null);
     try {
+      // Retain the original metadata buffer, since it will be used during the 
invocation of
+      // this method. Will be released later.
+      req.meta.retain();
+      // Make a copy of the original metadata buffer. In benchmark, we noticed 
that
+      // we cannot respond the original metadata buffer back to the client, 
otherwise
+      // in cases where multiple concurrent shuffles are present, a wrong 
metadata might
+      // be sent back to client. This is related to the eager release of the 
metadata buffer,
+      // i.e., we always release the original buffer by the time the 
invocation of this
+      // method ends, instead of by the time we respond it to the client. This 
is necessary,
+      // otherwise we start seeing memory issues very quickly in benchmarks.
+      ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer());

Review comment:
       Since we are always making a copy of meta here; can we remove the 
`retain` + `release` below and instead always release it here and only rely on 
the cloned butter within this method ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+/**
+ * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote 
shuffle service to
+ * be merged instead of for fetching them from remote shuffle services. This 
is used by
+ * ShuffleWriter when the block push process is initiated. The supplied 
BlockFetchingListener
+ * is used to handle the success or failure in pushing each blocks.
+ */
+public class OneForOneBlockPusher {
+  private static final Logger logger = 
LoggerFactory.getLogger(OneForOneBlockPusher.class);
+
+  private final TransportClient client;
+  private final String appId;
+  private final String[] blockIds;
+  private final BlockFetchingListener listener;
+  private final RpcResponseCallback callback;
+  private final Map<String, ManagedBuffer> buffers;
+
+  public OneForOneBlockPusher(
+      TransportClient client,
+      String appId,
+      String[] blockIds,
+      BlockFetchingListener listener,

Review comment:
       Instead of reusing `BlockFetchingListener`, add a new interface for push 
?
   

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {
+          ByteBuffer buffer;
+
+          OutputStream init(ByteBuffer buffer) {
+            this.buffer = buffer;
+            return this;
+          }
+
+          @Override
+          public void close() {
+          }
+
+          @Override
+          public void flush() {
+          }
+
+          @Override
+          public void write(int b) {
+            buffer.put((byte) b);
+          }
+
+          @Override
+          public void write(byte[] b) {
+            buffer.put(b);
+          }
+
+          @Override
+          public void write(byte[] b, int off, int l) {
+            buffer.put(b, off, l);
+          }
+        }.init(outBuffer)));
+      } catch (IOException e) {
+        throw new RuntimeException("Exception while encoding bitmap", e);
+      }
+      byte[] bytes = outBuffer.array();
+      buf.writeInt(bytes.length);
+      buf.writeBytes(bytes);
+    }
+
+    public static RoaringBitmap decode(ByteBuf buf) {
+      int length = buf.readInt();
+      byte[] bytes = new byte[length];
+      buf.readBytes(bytes);

Review comment:
       Use `ByteArrays.decode` here ?

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -238,12 +259,26 @@ public String getID() {
       }
     } catch (Exception e) {
       logger.error("Error while invoking RpcHandler#receive() on RPC id " + 
req.requestId, e);
-      respond(new RpcFailure(req.requestId, 
Throwables.getStackTraceAsString(e)));
+      try {
+        // It's OK to respond the original metadata buffer here, because this 
is still inside
+        // the invocation of this method.
+        respond(new RpcFailure(req.requestId,
+            JavaUtils.encodeHeaderIntoErrorString(req.meta.nioByteBuffer(), 
e)));
+      } catch (IOException ioe) {
+        // No exception will be thrown here. req.meta.nioByteBuffer will not 
throw IOException
+        // because it's a NettyManagedBuffer. This try-catch block is to make 
compiler happy.
+        logger.error("Error in handling failure while invoking 
RpcHandler#receive() on RPC id {}",

Review comment:
       If this is not expected, `assert` on it instead ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+/**
+ * Plugs into {@link RetryingBlockFetcher} to further control when an 
exception should be retried
+ * and logged.
+ * Note: {@link RetryingBlockFetcher} will delegate the exception to this 
handler only when
+ * - remaining retries < max retries
+ * - exception is an IOException
+ */
+
+public interface ErrorHandler {
+  boolean shouldRetryError(Throwable t);
+
+  default boolean shouldLogError(Throwable t) {
+    return true;
+  }
+
+  /**
+   * A no-op error handler instance.
+   */
+  ErrorHandler NOOP_ERROR_HANDLER = t -> true;
+
+  /**
+   * The error handler for pushing shuffle blocks to remote shuffle services.
+   */
+  class BlockPushErrorHandler implements ErrorHandler {
+
+    @Override
+    public boolean shouldRetryError(Throwable t) {
+      // If it is a connection time out or a connection closed exception, no 
need to retry.
+      if (t.getCause() != null && t.getCause() instanceof ConnectException) {
+        return false;
+      }
+      // If the block is too late, there is no need to retry it
+      return (t.getMessage() == null || !t.getMessage()
+          .contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX)) && 
(t.getCause() == null
+          || t.getCause().getMessage() == null || !t.getCause().getMessage()
+          .contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX));
+    }
+
+    @Override
+    public boolean shouldLogError(Throwable t) {
+      return (t.getMessage() == null || (
+          
!t.getMessage().contains(BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX)
+              && 
!t.getMessage().contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX))) && (
+          t.getCause() == null || t.getCause().getMessage() == null || 
(!t.getCause()
+              .getMessage()
+              
.contains(BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX) && 
!t.getCause()
+              .getMessage()
+              .contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX)));
+    }

Review comment:
       nit: Fix style/indentation in this class.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();

Review comment:
       I might have misread the code here; but want to make sure I am not 
missing anything.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {
+          ByteBuffer buffer;
+
+          OutputStream init(ByteBuffer buffer) {
+            this.buffer = buffer;
+            return this;
+          }
+
+          @Override
+          public void close() {
+          }
+
+          @Override
+          public void flush() {
+          }
+
+          @Override
+          public void write(int b) {
+            buffer.put((byte) b);
+          }
+
+          @Override
+          public void write(byte[] b) {
+            buffer.put(b);
+          }
+
+          @Override
+          public void write(byte[] b, int off, int l) {
+            buffer.put(b, off, l);
+          }
+        }.init(outBuffer)));
+      } catch (IOException e) {
+        throw new RuntimeException("Exception while encoding bitmap", e);
+      }
+      byte[] bytes = outBuffer.array();
+      buf.writeInt(bytes.length);
+      buf.writeBytes(bytes);
+    }
+
+    public static RoaringBitmap decode(ByteBuf buf) {
+      int length = buf.readInt();
+      byte[] bytes = new byte[length];
+      buf.readBytes(bytes);

Review comment:
       Yes, it `ByteArrays.encode` would give a nice symmetry - that is why I 
suggested that as well above :)
   Given I was looking at li branch, did not see the serialize(ByteBuffer) that 
@Ngone51 suggested - that is definitely much better !

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {

Review comment:
       We will still have a memory copy, right ? serialize to local ByteBuffer, 
and copy from local ByteBuffer to Bytebuf




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to