[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

2020-10-06 Thread GitBox


mridulm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r500719679



##
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##
@@ -181,6 +182,17 @@ public void onFailure(Throwable e) {
   private void processStreamUpload(final UploadStream req) {
 assert (req.body() == null);
 try {
+  // Retain the original metadata buffer, since it will be used during the 
invocation of
+  // this method. Will be released later.
+  req.meta.retain();
+  // Make a copy of the original metadata buffer. In benchmark, we noticed 
that
+  // we cannot respond the original metadata buffer back to the client, 
otherwise
+  // in cases where multiple concurrent shuffles are present, a wrong 
metadata might
+  // be sent back to client. This is related to the eager release of the 
metadata buffer,
+  // i.e., we always release the original buffer by the time the 
invocation of this
+  // method ends, instead of by the time we respond it to the client. This 
is necessary,
+  // otherwise we start seeing memory issues very quickly in benchmarks.
+  ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer());

Review comment:
   Do we have cases where large number of callback's are created which is 
causing memory pressure/gc issues ?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

2020-09-25 Thread GitBox


mridulm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494003389



##
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
 }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+public static int encodedLength(RoaringBitmap b) {
+  // Compress the bitmap before serializing it
+  b.trim();
+  b.runOptimize();
+  return 4 + b.serializedSizeInBytes();
+}
+
+public static void encode(ByteBuf buf, RoaringBitmap b) {
+  ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+  try {
+b.serialize(new DataOutputStream(new OutputStream() {
+  ByteBuffer buffer;
+
+  OutputStream init(ByteBuffer buffer) {
+this.buffer = buffer;
+return this;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void flush() {
+  }
+
+  @Override
+  public void write(int b) {
+buffer.put((byte) b);
+  }
+
+  @Override
+  public void write(byte[] b) {
+buffer.put(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int l) {
+buffer.put(b, off, l);
+  }
+}.init(outBuffer)));
+  } catch (IOException e) {
+throw new RuntimeException("Exception while encoding bitmap", e);
+  }

Review comment:
   Replace this with something more concise - for example see 
`UnsafeShuffleWriter.MyByteArrayOutputStream`.
   To illustrate, something like:
   ```
   MyBaos out = new MyBaos(b.serializedSizeInBytes());
   b.serialize(new DataOutputStream(out));
   int size = out.size();
   buf.writeInt(size);
   buf.writeBytes(out.getBuf(), 0, size);
   ```
   
   The last part could also be moved as `ByteArrays.encode(byte[] arr, int 
offset, int len)`

##
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
 }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+public static int encodedLength(RoaringBitmap b) {
+  // Compress the bitmap before serializing it
+  b.trim();
+  b.runOptimize();

Review comment:
   `BitmapArrays` results in calling `trim` and `runOptimize` twice - 
refactor so that it is only done once for this codepath ?

##
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##
@@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
 public void onComplete(String streamId) throws IOException {
try {
  streamHandler.onComplete(streamId);
- callback.onSuccess(ByteBuffer.allocate(0));
+ callback.onSuccess(meta.duplicate());

Review comment:
   Can you add a comment on why we are making this change ? From sending 
empty buffer to meta.

##
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##
@@ -181,6 +182,17 @@ public void onFailure(Throwable e) {
   private void processStreamUpload(final UploadStream req) {
 assert (req.body() == null);
 try {
+  // Retain the original metadata buffer, since it will be used during the 
invocation of
+  // this method. Will be released later.
+  req.meta.retain();
+  // Make a copy of the original metadata buffer. In benchmark, we noticed 
that
+  // we cannot respond the original metadata buffer back to the client, 
otherwise
+  // in cases where multiple concurrent shuffles are present, a wrong 
metadata might
+  // be sent back to client. This is related to the eager release of the 
metadata buffer,
+  // i.e., we always release the original buffer by the time the 
invocation of this
+  // method ends, instead of by the time we respond it to the client. This 
is necessary,
+  // otherwise we start seeing memory issues very quickly in benchmarks.
+  ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer());

Review comment:
   Since we are always making a copy of meta here; can we remove the 
`retain` + `release` below and instead always release it here and only rely on 
the cloned butter within this method ?

##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file 

[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

2020-09-24 Thread GitBox


mridulm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494658398



##
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
 }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+public static int encodedLength(RoaringBitmap b) {
+  // Compress the bitmap before serializing it
+  b.trim();
+  b.runOptimize();
+  return 4 + b.serializedSizeInBytes();
+}
+
+public static void encode(ByteBuf buf, RoaringBitmap b) {
+  ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+  try {
+b.serialize(new DataOutputStream(new OutputStream() {

Review comment:
   We will still have a memory copy, right ? serialize to local ByteBuffer, 
and copy from local ByteBuffer to Bytebuf





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

2020-09-24 Thread GitBox


mridulm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494657482



##
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
 }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+public static int encodedLength(RoaringBitmap b) {
+  // Compress the bitmap before serializing it
+  b.trim();
+  b.runOptimize();
+  return 4 + b.serializedSizeInBytes();
+}
+
+public static void encode(ByteBuf buf, RoaringBitmap b) {
+  ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+  try {
+b.serialize(new DataOutputStream(new OutputStream() {
+  ByteBuffer buffer;
+
+  OutputStream init(ByteBuffer buffer) {
+this.buffer = buffer;
+return this;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void flush() {
+  }
+
+  @Override
+  public void write(int b) {
+buffer.put((byte) b);
+  }
+
+  @Override
+  public void write(byte[] b) {
+buffer.put(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int l) {
+buffer.put(b, off, l);
+  }
+}.init(outBuffer)));
+  } catch (IOException e) {
+throw new RuntimeException("Exception while encoding bitmap", e);
+  }
+  byte[] bytes = outBuffer.array();
+  buf.writeInt(bytes.length);
+  buf.writeBytes(bytes);
+}
+
+public static RoaringBitmap decode(ByteBuf buf) {
+  int length = buf.readInt();
+  byte[] bytes = new byte[length];
+  buf.readBytes(bytes);

Review comment:
   Yes, it `ByteArrays.encode` would give a nice symmetry - that is why I 
suggested that as well above :)
   Given I was looking at li branch, did not see the serialize(ByteBuffer) that 
@Ngone51 suggested - that is definitely much better !





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

2020-09-24 Thread GitBox


mridulm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494083495



##
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
 }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+public static int encodedLength(RoaringBitmap b) {
+  // Compress the bitmap before serializing it
+  b.trim();
+  b.runOptimize();

Review comment:
   I might have misread the code here; but want to make sure I am not 
missing anything.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

2020-09-24 Thread GitBox


mridulm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494003389



##
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
 }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+public static int encodedLength(RoaringBitmap b) {
+  // Compress the bitmap before serializing it
+  b.trim();
+  b.runOptimize();
+  return 4 + b.serializedSizeInBytes();
+}
+
+public static void encode(ByteBuf buf, RoaringBitmap b) {
+  ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+  try {
+b.serialize(new DataOutputStream(new OutputStream() {
+  ByteBuffer buffer;
+
+  OutputStream init(ByteBuffer buffer) {
+this.buffer = buffer;
+return this;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void flush() {
+  }
+
+  @Override
+  public void write(int b) {
+buffer.put((byte) b);
+  }
+
+  @Override
+  public void write(byte[] b) {
+buffer.put(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int l) {
+buffer.put(b, off, l);
+  }
+}.init(outBuffer)));
+  } catch (IOException e) {
+throw new RuntimeException("Exception while encoding bitmap", e);
+  }

Review comment:
   Replace this with something more concise - for example see 
`UnsafeShuffleWriter.MyByteArrayOutputStream`.
   To illustrate, something like:
   ```
   MyBaos out = new MyBaos(b.serializedSizeInBytes());
   b.serialize(new DataOutputStream(out));
   int size = out.size();
   buf.writeInt(size);
   buf.writeBytes(out.getBuf(), 0, size);
   ```
   
   The last part could also be moved as `ByteArrays.encode(byte[] arr, int 
offset, int len)`

##
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
 }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+public static int encodedLength(RoaringBitmap b) {
+  // Compress the bitmap before serializing it
+  b.trim();
+  b.runOptimize();

Review comment:
   `BitmapArrays` results in calling `trim` and `runOptimize` twice - 
refactor so that it is only done once for this codepath ?

##
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##
@@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
 public void onComplete(String streamId) throws IOException {
try {
  streamHandler.onComplete(streamId);
- callback.onSuccess(ByteBuffer.allocate(0));
+ callback.onSuccess(meta.duplicate());

Review comment:
   Can you add a comment on why we are making this change ? From sending 
empty buffer to meta.

##
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##
@@ -181,6 +182,17 @@ public void onFailure(Throwable e) {
   private void processStreamUpload(final UploadStream req) {
 assert (req.body() == null);
 try {
+  // Retain the original metadata buffer, since it will be used during the 
invocation of
+  // this method. Will be released later.
+  req.meta.retain();
+  // Make a copy of the original metadata buffer. In benchmark, we noticed 
that
+  // we cannot respond the original metadata buffer back to the client, 
otherwise
+  // in cases where multiple concurrent shuffles are present, a wrong 
metadata might
+  // be sent back to client. This is related to the eager release of the 
metadata buffer,
+  // i.e., we always release the original buffer by the time the 
invocation of this
+  // method ends, instead of by the time we respond it to the client. This 
is necessary,
+  // otherwise we start seeing memory issues very quickly in benchmarks.
+  ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer());

Review comment:
   Since we are always making a copy of meta here; can we remove the 
`retain` + `release` below and instead always release it here and only rely on 
the cloned butter within this method ?

##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file