Victsm commented on a change in pull request #29855: URL: https://github.com/apache/spark/pull/29855#discussion_r494487660
########## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ########## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { + public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); + return 4 + b.serializedSizeInBytes(); + } + + public static void encode(ByteBuf buf, RoaringBitmap b) { + ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes()); + try { + b.serialize(new DataOutputStream(new OutputStream() { + ByteBuffer buffer; + + OutputStream init(ByteBuffer buffer) { + this.buffer = buffer; + return this; + } + + @Override + public void close() { + } + + @Override + public void flush() { + } + + @Override + public void write(int b) { + buffer.put((byte) b); + } + + @Override + public void write(byte[] b) { + buffer.put(b); + } + + @Override + public void write(byte[] b, int off, int l) { + buffer.put(b, off, l); + } + }.init(outBuffer))); + } catch (IOException e) { + throw new RuntimeException("Exception while encoding bitmap", e); + } + byte[] bytes = outBuffer.array(); + buf.writeInt(bytes.length); + buf.writeBytes(bytes); + } + + public static RoaringBitmap decode(ByteBuf buf) { + int length = buf.readInt(); + byte[] bytes = new byte[length]; + buf.readBytes(bytes); Review comment: This would require using ByteArrays.encode to encode the original byte arrays. I think @Ngone51 's recommendation earlier makes sense, that we should use roaringbitmap#serialize(ByteBuffer) to avoid the one additional memory copy during encoding. By doing that, we would directly serialize into the ByteBuf, and it won't be possible to use ByteArrays.encode to encode the corresponding byte arrays. ########## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ########## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { + public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); + return 4 + b.serializedSizeInBytes(); + } + + public static void encode(ByteBuf buf, RoaringBitmap b) { + ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes()); + try { + b.serialize(new DataOutputStream(new OutputStream() { Review comment: Good point, I think this also avoids one more memory copy. ########## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ########## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { + public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); + return 4 + b.serializedSizeInBytes(); + } + + public static void encode(ByteBuf buf, RoaringBitmap b) { + ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes()); Review comment: Yes, BlockTransferMessage.toByteBuffer ensures that. Need to know the encodedLength in order to create the encoding ByteBuf in the first place. Will add a comment to clarify this. ########## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ########## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { + public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); Review comment: It should be invoked only once. BlockTransferMessage.toByteBuffer is where the initial call to encodedLength happens. It's only called once for each RoaringBitmap in the bitmap array. ########## File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ########## @@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { public void onComplete(String streamId) throws IOException { try { streamHandler.onComplete(streamId); - callback.onSuccess(ByteBuffer.allocate(0)); + callback.onSuccess(meta.duplicate()); Review comment: Sure. Just to clarify here. Previously, this only serves the purpose of ack success data transfer for the use case of transferring large RDD partition blocks (> 2G). For block push, we have additional BlockPushCallback implemented to handle what to do when the push of a block succeeds or fails. This callback needs the original header metadata on the client side to properly handle things. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.buffer.NioManagedBuffer; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; +import org.apache.spark.network.shuffle.protocol.PushBlockStream; + +/** + * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote shuffle service to + * be merged instead of for fetching them from remote shuffle services. This is used by + * ShuffleWriter when the block push process is initiated. The supplied BlockFetchingListener + * is used to handle the success or failure in pushing each blocks. + */ +public class OneForOneBlockPusher { + private static final Logger logger = LoggerFactory.getLogger(OneForOneBlockPusher.class); + + private final TransportClient client; + private final String appId; + private final String[] blockIds; + private final BlockFetchingListener listener; + private final RpcResponseCallback callback; + private final Map<String, ManagedBuffer> buffers; + + public OneForOneBlockPusher( + TransportClient client, + String appId, + String[] blockIds, + BlockFetchingListener listener, Review comment: It's going to be the exact same API. As commented in this PR, we are thinking of renaming BlockFetchingListener and RetryingBlockFetcher, both of which are reused for block fetch/push, to BlockTransferListener and RetryingBlockTransferor for more appropriate naming. This change is not included in this PR to reduce the number of files we touch. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.protocol.Encoders; + +// Needed by ScalaDoc. See SPARK-7726 +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; + + +/** + * Request to push a block to a remote shuffle service to be merged in push based shuffle. Review comment: That's right. PushBlockStream is the metadata header associated with a shuffle block being pushed. It's created on the pusher/client side, and the shuffle service/server side will also include the same metadata header when it's acknowledging the success or failure of a block push request. Will update this comment to reflect both scenarios. ########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.protocol.Encoders; + +// Needed by ScalaDoc. See SPARK-7726 +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; + + +/** + * Request to push a block to a remote shuffle service to be merged in push based shuffle. + */ +public class PushBlockStream extends BlockTransferMessage { + public final String appId; + public final String blockId; + public final int index; Review comment: Will add comments to clarify. The block push API is pushing a batch of blocks in a single invocation, similar to block fetch. `index` is to identify the index in that batch for a given pushed block, so we can properly handle block push failures in a batch. This is similar to the `chunkIndex` in `StreamChunkId` used by `ChunkFetchRequest`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org