mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1063031369


##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -287,6 +288,9 @@ public void onFailure(Throwable e) {
 
     try {
       return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      logger.warn("RPC {} timed-out", rpcId);
+      throw Throwables.propagate(new SaslTimeoutException(e));

Review Comment:
   `SaslTimeoutException` -> `RpcTimeoutException` ? (there is one in 
`org.apache.spark.rpc`)
   
   This api is not specific to SASL - we cannot assume a timeout for a sync 
send is due to sasl timeout.



##########
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala:
##########
@@ -251,6 +251,10 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
         }
         handleResult(PushResult(blockId, exception))
       }
+
+      override def onSaslTimeout(): Unit = {
+        TaskContext.get().taskMetrics().incSaslRequestRetries(1)

Review Comment:
   This is not within the task thread - you should be getting an NPE as task 
context should be null.
   
   If the tests are not failing, we have to make sure we are testing this fix 
appropriately - some codepaths (like push related suite's) might not be getting 
tested with this ?



##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -287,6 +288,9 @@ public void onFailure(Throwable e) {
 
     try {
       return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      logger.warn("RPC {} timed-out", rpcId);

Review Comment:
   This should be at `trace` level - `debug` at best



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -245,8 +269,9 @@ private static void performInteractions(List<? extends 
Map<String, Object>> inte
     throws IOException, InterruptedException {
 
     MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
-      "spark.shuffle.io.maxRetries", "2",
-      "spark.shuffle.io.retryWait", "0"));
+        "spark.shuffle.io.maxRetries", "2",
+        "spark.shuffle.io.retryWait", "0",
+        "spark.shuffle.sasl.enableRetries", "true"));

Review Comment:
   nit: fix indentation.
   
   Also, we want to make sure tests are handling 
`spark.shuffle.sasl.enableRetries = false` case correctly as well.
   You can extend the test and specify value of enableRetries = true there
   
   See `LevelDBHybridStoreSuite` as an example



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java:
##########
@@ -41,4 +41,6 @@ public interface BlockTransferListener extends EventListener {
    * Return a string indicating the type of the listener such as fetch, push, 
or something else
    */
   String getTransferType();
+
+  void onSaslTimeout();

Review Comment:
   Add default impl for it



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -289,5 +301,11 @@ public String getTransferType() {
       throw new RuntimeException(
         "Invocation on RetryingBlockTransferListener.getTransferType is 
unexpected.");
     }
+
+    @Override
+    public void onSaslTimeout() {
+      throw new RuntimeException(

Review Comment:
   `RuntimeException` -> `IllegalStateException` or something specific



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to