[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks
mridulm commented on a change in pull request #29855: URL: https://github.com/apache/spark/pull/29855#discussion_r500719679 ## File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ## @@ -181,6 +182,17 @@ public void onFailure(Throwable e) { private void processStreamUpload(final UploadStream req) { assert (req.body() == null); try { + // Retain the original metadata buffer, since it will be used during the invocation of + // this method. Will be released later. + req.meta.retain(); + // Make a copy of the original metadata buffer. In benchmark, we noticed that + // we cannot respond the original metadata buffer back to the client, otherwise + // in cases where multiple concurrent shuffles are present, a wrong metadata might + // be sent back to client. This is related to the eager release of the metadata buffer, + // i.e., we always release the original buffer by the time the invocation of this + // method ends, instead of by the time we respond it to the client. This is necessary, + // otherwise we start seeing memory issues very quickly in benchmarks. + ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer()); Review comment: Do we have cases where large number of callback's are created which is causing memory pressure/gc issues ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks
mridulm commented on a change in pull request #29855: URL: https://github.com/apache/spark/pull/29855#discussion_r494003389 ## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { +public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); + return 4 + b.serializedSizeInBytes(); +} + +public static void encode(ByteBuf buf, RoaringBitmap b) { + ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes()); + try { +b.serialize(new DataOutputStream(new OutputStream() { + ByteBuffer buffer; + + OutputStream init(ByteBuffer buffer) { +this.buffer = buffer; +return this; + } + + @Override + public void close() { + } + + @Override + public void flush() { + } + + @Override + public void write(int b) { +buffer.put((byte) b); + } + + @Override + public void write(byte[] b) { +buffer.put(b); + } + + @Override + public void write(byte[] b, int off, int l) { +buffer.put(b, off, l); + } +}.init(outBuffer))); + } catch (IOException e) { +throw new RuntimeException("Exception while encoding bitmap", e); + } Review comment: Replace this with something more concise - for example see `UnsafeShuffleWriter.MyByteArrayOutputStream`. To illustrate, something like: ``` MyBaos out = new MyBaos(b.serializedSizeInBytes()); b.serialize(new DataOutputStream(out)); int size = out.size(); buf.writeInt(size); buf.writeBytes(out.getBuf(), 0, size); ``` The last part could also be moved as `ByteArrays.encode(byte[] arr, int offset, int len)` ## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { +public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); Review comment: `BitmapArrays` results in calling `trim` and `runOptimize` twice - refactor so that it is only done once for this codepath ? ## File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ## @@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { public void onComplete(String streamId) throws IOException { try { streamHandler.onComplete(streamId); - callback.onSuccess(ByteBuffer.allocate(0)); + callback.onSuccess(meta.duplicate()); Review comment: Can you add a comment on why we are making this change ? From sending empty buffer to meta. ## File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ## @@ -181,6 +182,17 @@ public void onFailure(Throwable e) { private void processStreamUpload(final UploadStream req) { assert (req.body() == null); try { + // Retain the original metadata buffer, since it will be used during the invocation of + // this method. Will be released later. + req.meta.retain(); + // Make a copy of the original metadata buffer. In benchmark, we noticed that + // we cannot respond the original metadata buffer back to the client, otherwise + // in cases where multiple concurrent shuffles are present, a wrong metadata might + // be sent back to client. This is related to the eager release of the metadata buffer, + // i.e., we always release the original buffer by the time the invocation of this + // method ends, instead of by the time we respond it to the client. This is necessary, + // otherwise we start seeing memory issues very quickly in benchmarks. + ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer()); Review comment: Since we are always making a copy of meta here; can we remove the `retain` + `release` below and instead always release it here and only rely on the cloned butter within this method ? ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file
[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks
mridulm commented on a change in pull request #29855: URL: https://github.com/apache/spark/pull/29855#discussion_r494658398 ## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { +public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); + return 4 + b.serializedSizeInBytes(); +} + +public static void encode(ByteBuf buf, RoaringBitmap b) { + ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes()); + try { +b.serialize(new DataOutputStream(new OutputStream() { Review comment: We will still have a memory copy, right ? serialize to local ByteBuffer, and copy from local ByteBuffer to Bytebuf This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks
mridulm commented on a change in pull request #29855: URL: https://github.com/apache/spark/pull/29855#discussion_r494657482 ## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { +public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); + return 4 + b.serializedSizeInBytes(); +} + +public static void encode(ByteBuf buf, RoaringBitmap b) { + ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes()); + try { +b.serialize(new DataOutputStream(new OutputStream() { + ByteBuffer buffer; + + OutputStream init(ByteBuffer buffer) { +this.buffer = buffer; +return this; + } + + @Override + public void close() { + } + + @Override + public void flush() { + } + + @Override + public void write(int b) { +buffer.put((byte) b); + } + + @Override + public void write(byte[] b) { +buffer.put(b); + } + + @Override + public void write(byte[] b, int off, int l) { +buffer.put(b, off, l); + } +}.init(outBuffer))); + } catch (IOException e) { +throw new RuntimeException("Exception while encoding bitmap", e); + } + byte[] bytes = outBuffer.array(); + buf.writeInt(bytes.length); + buf.writeBytes(bytes); +} + +public static RoaringBitmap decode(ByteBuf buf) { + int length = buf.readInt(); + byte[] bytes = new byte[length]; + buf.readBytes(bytes); Review comment: Yes, it `ByteArrays.encode` would give a nice symmetry - that is why I suggested that as well above :) Given I was looking at li branch, did not see the serialize(ByteBuffer) that @Ngone51 suggested - that is definitely much better ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks
mridulm commented on a change in pull request #29855: URL: https://github.com/apache/spark/pull/29855#discussion_r494083495 ## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { +public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); Review comment: I might have misread the code here; but want to make sure I am not missing anything. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks
mridulm commented on a change in pull request #29855: URL: https://github.com/apache/spark/pull/29855#discussion_r494003389 ## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { +public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); + return 4 + b.serializedSizeInBytes(); +} + +public static void encode(ByteBuf buf, RoaringBitmap b) { + ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes()); + try { +b.serialize(new DataOutputStream(new OutputStream() { + ByteBuffer buffer; + + OutputStream init(ByteBuffer buffer) { +this.buffer = buffer; +return this; + } + + @Override + public void close() { + } + + @Override + public void flush() { + } + + @Override + public void write(int b) { +buffer.put((byte) b); + } + + @Override + public void write(byte[] b) { +buffer.put(b); + } + + @Override + public void write(byte[] b, int off, int l) { +buffer.put(b, off, l); + } +}.init(outBuffer))); + } catch (IOException e) { +throw new RuntimeException("Exception while encoding bitmap", e); + } Review comment: Replace this with something more concise - for example see `UnsafeShuffleWriter.MyByteArrayOutputStream`. To illustrate, something like: ``` MyBaos out = new MyBaos(b.serializedSizeInBytes()); b.serialize(new DataOutputStream(out)); int size = out.size(); buf.writeInt(size); buf.writeBytes(out.getBuf(), 0, size); ``` The last part could also be moved as `ByteArrays.encode(byte[] arr, int offset, int len)` ## File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java ## @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) { } } + /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */ + public static class Bitmaps { +public static int encodedLength(RoaringBitmap b) { + // Compress the bitmap before serializing it + b.trim(); + b.runOptimize(); Review comment: `BitmapArrays` results in calling `trim` and `runOptimize` twice - refactor so that it is only done once for this codepath ? ## File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ## @@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { public void onComplete(String streamId) throws IOException { try { streamHandler.onComplete(streamId); - callback.onSuccess(ByteBuffer.allocate(0)); + callback.onSuccess(meta.duplicate()); Review comment: Can you add a comment on why we are making this change ? From sending empty buffer to meta. ## File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ## @@ -181,6 +182,17 @@ public void onFailure(Throwable e) { private void processStreamUpload(final UploadStream req) { assert (req.body() == null); try { + // Retain the original metadata buffer, since it will be used during the invocation of + // this method. Will be released later. + req.meta.retain(); + // Make a copy of the original metadata buffer. In benchmark, we noticed that + // we cannot respond the original metadata buffer back to the client, otherwise + // in cases where multiple concurrent shuffles are present, a wrong metadata might + // be sent back to client. This is related to the eager release of the metadata buffer, + // i.e., we always release the original buffer by the time the invocation of this + // method ends, instead of by the time we respond it to the client. This is necessary, + // otherwise we start seeing memory issues very quickly in benchmarks. + ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer()); Review comment: Since we are always making a copy of meta here; can we remove the `retain` + `release` below and instead always release it here and only rely on the cloned butter within this method ? ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file