Victsm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494487660



##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {
+          ByteBuffer buffer;
+
+          OutputStream init(ByteBuffer buffer) {
+            this.buffer = buffer;
+            return this;
+          }
+
+          @Override
+          public void close() {
+          }
+
+          @Override
+          public void flush() {
+          }
+
+          @Override
+          public void write(int b) {
+            buffer.put((byte) b);
+          }
+
+          @Override
+          public void write(byte[] b) {
+            buffer.put(b);
+          }
+
+          @Override
+          public void write(byte[] b, int off, int l) {
+            buffer.put(b, off, l);
+          }
+        }.init(outBuffer)));
+      } catch (IOException e) {
+        throw new RuntimeException("Exception while encoding bitmap", e);
+      }
+      byte[] bytes = outBuffer.array();
+      buf.writeInt(bytes.length);
+      buf.writeBytes(bytes);
+    }
+
+    public static RoaringBitmap decode(ByteBuf buf) {
+      int length = buf.readInt();
+      byte[] bytes = new byte[length];
+      buf.readBytes(bytes);

Review comment:
       This would require using ByteArrays.encode to encode the original byte 
arrays. I think @Ngone51 's recommendation earlier makes sense, that we should 
use roaringbitmap#serialize(ByteBuffer) to avoid the one additional memory copy 
during encoding. By doing that, we would directly serialize into the ByteBuf, 
and it won't be possible to use ByteArrays.encode to encode the corresponding 
byte arrays.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {

Review comment:
       Good point, I think this also avoids one more memory copy.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());

Review comment:
       Yes, BlockTransferMessage.toByteBuffer ensures that.
   Need to know the encodedLength in order to create the encoding ByteBuf in 
the first place.
   Will add a comment to clarify this.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();

Review comment:
       It should be invoked only once.
   BlockTransferMessage.toByteBuffer is where the initial call to encodedLength 
happens.
   It's only called once for each RoaringBitmap in the bitmap array.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
         public void onComplete(String streamId) throws IOException {
            try {
              streamHandler.onComplete(streamId);
-             callback.onSuccess(ByteBuffer.allocate(0));
+             callback.onSuccess(meta.duplicate());

Review comment:
       Sure.
   Just to clarify here. Previously, this only serves the purpose of ack 
success data transfer for the use case of transferring large RDD partition 
blocks (> 2G).
   For block push, we have additional BlockPushCallback implemented to handle 
what to do when the push of a block succeeds or fails.
   This callback needs the original header metadata on the client side to 
properly handle things.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+/**
+ * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote 
shuffle service to
+ * be merged instead of for fetching them from remote shuffle services. This 
is used by
+ * ShuffleWriter when the block push process is initiated. The supplied 
BlockFetchingListener
+ * is used to handle the success or failure in pushing each blocks.
+ */
+public class OneForOneBlockPusher {
+  private static final Logger logger = 
LoggerFactory.getLogger(OneForOneBlockPusher.class);
+
+  private final TransportClient client;
+  private final String appId;
+  private final String[] blockIds;
+  private final BlockFetchingListener listener;
+  private final RpcResponseCallback callback;
+  private final Map<String, ManagedBuffer> buffers;
+
+  public OneForOneBlockPusher(
+      TransportClient client,
+      String appId,
+      String[] blockIds,
+      BlockFetchingListener listener,

Review comment:
       It's going to be the exact same API. As commented in this PR, we are 
thinking of renaming BlockFetchingListener and RetryingBlockFetcher, both of 
which are reused for block fetch/push, to BlockTransferListener and 
RetryingBlockTransferor for more appropriate naming.
   This change is not included in this PR to reduce the number of files we 
touch. 

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/**
+ * Request to push a block to a remote shuffle service to be merged in push 
based shuffle.

Review comment:
       That's right. PushBlockStream is the metadata header associated with a 
shuffle block being pushed. It's created on the pusher/client side, and the 
shuffle service/server side will also include the same metadata header when 
it's acknowledging the success or failure of a block push request. Will update 
this comment to reflect both scenarios.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/**
+ * Request to push a block to a remote shuffle service to be merged in push 
based shuffle.
+ */
+public class PushBlockStream extends BlockTransferMessage {
+  public final String appId;
+  public final String blockId;
+  public final int index;

Review comment:
       Will add comments to clarify.
   The block push API is pushing a batch of blocks in a single invocation, 
similar to block fetch.
   `index` is to identify the index in that batch for a given pushed block, so 
we can properly handle block push failures in a batch.
   This is similar to the `chunkIndex` in `StreamChunkId` used by 
`ChunkFetchRequest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to