This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f8ff786 [SPARK-37984][SHUFFLE] Avoid calculating all outstanding requests to improve performance f8ff786 is described below commit f8ff7863e792b833afb2ff603878f29d4a9888e6 Author: weixiuli <weixi...@jd.com> AuthorDate: Sun Jan 23 20:23:20 2022 -0600 [SPARK-37984][SHUFFLE] Avoid calculating all outstanding requests to improve performance ### What changes were proposed in this pull request? Avoid calculating all outstanding requests to improve performance. ### Why are the changes needed? Follow the comment (https://github.com/apache/spark/pull/34711#pullrequestreview-835520984) , we can implement a "has outstanding requests" method in the response handler that doesn't even need to get a count,let's do this with PR. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Exist unittests. Closes #35276 from weixiuli/SPARK-37984. Authored-by: weixiuli <weixi...@jd.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../apache/spark/network/client/TransportResponseHandler.java | 10 ++++++++-- .../apache/spark/network/server/TransportChannelHandler.java | 3 +-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 576c088..261f205 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -140,7 +140,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { @Override public void channelInactive() { - if (numOutstandingRequests() > 0) { + if (hasOutstandingRequests()) { String remoteAddress = getRemoteAddress(channel); logger.error("Still have {} requests outstanding when connection from {} is closed", numOutstandingRequests(), remoteAddress); @@ -150,7 +150,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { @Override public void exceptionCaught(Throwable cause) { - if (numOutstandingRequests() > 0) { + if (hasOutstandingRequests()) { String remoteAddress = getRemoteAddress(channel); logger.error("Still have {} requests outstanding when connection from {} is closed", numOutstandingRequests(), remoteAddress); @@ -275,6 +275,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { (streamActive ? 1 : 0); } + /** Check if there are any outstanding requests (fetch requests + rpcs) */ + public Boolean hasOutstandingRequests() { + return streamActive || !outstandingFetches.isEmpty() || !outstandingRpcs.isEmpty() || + !streamCallbacks.isEmpty(); + } + /** Returns the time in nanoseconds of when the last request was sent out. */ public long getTimeOfLastRequestNs() { return timeOfLastRequestNs.get(); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index 275e64e..d197032 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -161,8 +161,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message boolean isActuallyOverdue = System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs; if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) { - boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0; - if (hasInFlightRequests) { + if (responseHandler.hasOutstandingRequests()) { String address = getRemoteAddress(ctx.channel()); logger.error("Connection to {} has been quiet for {} ms while there are outstanding " + "requests. Assuming connection is dead; please adjust" + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org