venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r628401105
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -254,6 +259,28 @@ private[spark] class DAGScheduler(
private val blockManagerMasterDriverHeartbeatTimeout =
sc.getConf.get(config.STORAGE_BLOCKMANAGER_MASTER_DRIVER_HEARTBEAT_TIMEOUT).millis
+ private val shuffleMergeResultsTimeoutSec =
+
JavaUtils.timeStringAsSec(sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT))
+
+ private val shuffleMergeFinalizeWaitSec =
+
JavaUtils.timeStringAsSec(sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT))
+
+ // lazy initialized so that the shuffle client can be properly initialized
+ private lazy val externalShuffleClient: Option[ExternalBlockStoreClient] =
+ if (pushBasedShuffleEnabled) {
+ val transConf = SparkTransportConf.fromSparkConf(sc.conf, "shuffle", 1)
+ val shuffleClient = new ExternalBlockStoreClient(transConf,
env.securityManager,
+ env.securityManager.isAuthenticationEnabled(),
+ sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
+ shuffleClient.init(sc.conf.getAppId)
+ Some(shuffleClient)
+ } else {
+ None
+ }
+
+ private val shuffleMergeFinalizeScheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-merge-finalizer")
Review comment:
Changed it to use `newDaemonThreadPoolScheduledExecutor` with 8 threads.
Don't think we need this to be configurable, let me know your thoughts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]