wankunde commented on PR #37533:
URL: https://github.com/apache/spark/pull/37533#issuecomment-1220198154

   > @wankunde
   > 
   > > Send finalize RPCs will block the main thread due to creating connection 
to some unreachable nodes.
   > 
   > Which main thread are you referring to here? Could you please explain 
which thread is being blocked. AFAICT this is already being done by 
`shuffle-merge-finalizer` threads.
   > 
   > Why can't you reduce the `SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY` 
default for the cluster? One of the reasons this configuration was introduced 
was because creating connection should have a lower timeout than connection 
idle time
   
   Yes, DAGSchuedler will finalize each shuffle map stage in one 
`shuffle-merge-finalizer` thread,  and lock `clientPool.locks[clientIndex]` 
when creating connect to the ESS merger node, the other  
`shuffle-merge-finalizer` threads (one stage per thread) will wait for 
`SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY`.
   Although reducing `SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY` helps, 
the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * 
lostMergerNodesSize * stageSize ) will still be long. This PR will run 
`scheduleShuffleMergeFinalize()` and send `finalizeShuffleMerge` RPCs in two 
threads, and stop all work after `PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT` 
regardless of sucess or failure.
   
   Now we will only call `removeShufflePushMergerLocation` when shuffle fetch 
fails, this PR will also prevent these merger nodes from bing selected as 
mergeLocations when creating connections fails. Adding those bad merge nodes to 
finalizeBlackNodes, so subsequent shuffle map stages will not try to connect 
them.
   
   
   
   ```
   "shuffle-merge-finalizer-4" #1842 daemon prio=5 os_prio=0 
tid=0x00007f19440d8000 nid=0x2be822 in Object.wait() [0x00007f19ea7f7000]
      java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:460)
        at 
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:679)
        - locked <0x00007f3eb8244598> (a 
io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
        at 
io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:298)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:283)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
        - locked <0x00007f1d7b0c0ba8> (a java.lang.Object)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
        at 
org.apache.spark.network.shuffle.ExternalBlockStoreClient.finalizeShuffleMerge(ExternalBlockStoreClient.java:229)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$finalizeShuffleMerge$5(DAGScheduler.scala:2437)
   
   "shuffle-merge-finalizer-3" #1647 daemon prio=5 os_prio=0 
tid=0x00007f19440d2800 nid=0x2be52e waiting for monitor entry 
[0x00007f1688ff2000]
      java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:198)
        - waiting to lock <0x00007f1d7b0c0ba8> (a java.lang.Object)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
        at 
org.apache.spark.network.shuffle.ExternalBlockStoreClient.finalizeShuffleMerge(ExternalBlockStoreClient.java:229)
        at 
org.apache.spark.scheduler.DAGScheduler$$anon$7.$anonfun$run$2(DAGScheduler.scala:2419)
   ...
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to