mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1063031369
##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -287,6 +288,9 @@ public void onFailure(Throwable e) {
try {
return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ logger.warn("RPC {} timed-out", rpcId);
+ throw Throwables.propagate(new SaslTimeoutException(e));
Review Comment:
`SaslTimeoutException` -> `RpcTimeoutException` ? (there is one in
`org.apache.spark.rpc`)
This api is not specific to SASL - we cannot assume a timeout for a sync
send is due to sasl timeout.
##########
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala:
##########
@@ -251,6 +251,10 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
}
handleResult(PushResult(blockId, exception))
}
+
+ override def onSaslTimeout(): Unit = {
+ TaskContext.get().taskMetrics().incSaslRequestRetries(1)
Review Comment:
This is not within the task thread - you should be getting an NPE as task
context should be null.
If the tests are not failing, we have to make sure we are testing this fix
appropriately - some codepaths (like push related suite's) might not be getting
tested with this ?
##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -287,6 +288,9 @@ public void onFailure(Throwable e) {
try {
return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ logger.warn("RPC {} timed-out", rpcId);
Review Comment:
This should be at `trace` level - `debug` at best
##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -245,8 +269,9 @@ private static void performInteractions(List<? extends
Map<String, Object>> inte
throws IOException, InterruptedException {
MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
- "spark.shuffle.io.maxRetries", "2",
- "spark.shuffle.io.retryWait", "0"));
+ "spark.shuffle.io.maxRetries", "2",
+ "spark.shuffle.io.retryWait", "0",
+ "spark.shuffle.sasl.enableRetries", "true"));
Review Comment:
nit: fix indentation.
Also, we want to make sure tests are handling
`spark.shuffle.sasl.enableRetries = false` case correctly as well.
You can extend the test and specify value of enableRetries = true there
See `LevelDBHybridStoreSuite` as an example
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java:
##########
@@ -41,4 +41,6 @@ public interface BlockTransferListener extends EventListener {
* Return a string indicating the type of the listener such as fetch, push,
or something else
*/
String getTransferType();
+
+ void onSaslTimeout();
Review Comment:
Add default impl for it
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -289,5 +301,11 @@ public String getTransferType() {
throw new RuntimeException(
"Invocation on RetryingBlockTransferListener.getTransferType is
unexpected.");
}
+
+ @Override
+ public void onSaslTimeout() {
+ throw new RuntimeException(
Review Comment:
`RuntimeException` -> `IllegalStateException` or something specific
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]