[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19374 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r145391350 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -135,22 +135,24 @@ private[spark] class MesosClusterScheduler( private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false) private val schedulerState = engineFactory.createEngine("scheduler") private val stateLock = new Object() + // Keyed by submission id private val finishedDrivers = new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers) private var frameworkId: String = null - // Holds all the launched drivers and current launch state, keyed by driver id. + // Holds all the launched drivers and current launch state, keyed by submission id. private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]() // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation. // All drivers that are loaded after failover are added here, as we need get the latest - // state of the tasks from Mesos. + // state of the tasks from Mesos. Keyed by task Id. private val pendingRecover = new mutable.HashMap[String, SlaveID]() - // Stores all the submitted drivers that hasn't been launched. + // Stores all the submitted drivers that hasn't been launched, keyed by submission id private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]() - // All supervised drivers that are waiting to retry after termination. + // All supervised drivers that are waiting to retry after termination, keyed by submission id private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]() private val queuedDriversState = engineFactory.createEngine("driverQueue") private val launchedDriversState = engineFactory.createEngine("launchedDrivers") private val pendingRetryDriversState = engineFactory.createEngine("retryList") + private final val RETRY_ID = "-retry-" --- End diff -- np will update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r145292254 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -135,22 +135,24 @@ private[spark] class MesosClusterScheduler( private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false) private val schedulerState = engineFactory.createEngine("scheduler") private val stateLock = new Object() + // Keyed by submission id private val finishedDrivers = new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers) private var frameworkId: String = null - // Holds all the launched drivers and current launch state, keyed by driver id. + // Holds all the launched drivers and current launch state, keyed by submission id. private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]() // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation. // All drivers that are loaded after failover are added here, as we need get the latest - // state of the tasks from Mesos. + // state of the tasks from Mesos. Keyed by task Id. private val pendingRecover = new mutable.HashMap[String, SlaveID]() - // Stores all the submitted drivers that hasn't been launched. + // Stores all the submitted drivers that hasn't been launched, keyed by submission id private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]() - // All supervised drivers that are waiting to retry after termination. + // All supervised drivers that are waiting to retry after termination, keyed by submission id private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]() private val queuedDriversState = engineFactory.createEngine("driverQueue") private val launchedDriversState = engineFactory.createEngine("launchedDrivers") private val pendingRetryDriversState = engineFactory.createEngine("retryList") + private final val RETRY_ID = "-retry-" --- End diff -- Sorry, super nit, maybe `RETRY_SEP`. Feel free to ignore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r145066047 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -276,8 +276,8 @@ private[spark] class MesosClusterScheduler( private def recoverState(): Unit = { stateLock.synchronized { launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state => -launchedDrivers(state.taskId.getValue) = state -pendingRecover(state.taskId.getValue) = state.slaveId +launchedDrivers(state.driverDescription.submissionId) = state --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144947372 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -896,8 +913,8 @@ private[spark] class MesosClusterScheduler( revive() } - private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = { -pendingRetryDriversState.persist(taskId, desc) + private def addDriverToPending(desc: MesosDriverDescription, id: String) = { +pendingRetryDriversState.persist(id, desc) --- End diff -- ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144947305 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler( s"${frameworkId}-${desc.submissionId}${retries}" } + private def getDriverTaskId(desc: MesosDriverDescription): String = { +val sId = desc.submissionId +desc.retryState.map(state => sId + s"-retry-${state.retries.toString}").getOrElse(sId) + } + + private def getSumbmissionIdFromTaskId(taskId: String): String = { +taskId.split("-retry-").head --- End diff -- sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144947255 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -373,10 +374,16 @@ class SparkContext(config: SparkConf) extends Logging { // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") -// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster -if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { - throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + -"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") +// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster or +// System property spark.mesos.driver.frameworkId must be set if user code ran by +// Mesos Dispatcher on a MESOS cluster +if (deployMode == "cluster") { --- End diff -- Ok probably you are right we do a spark submit again. I will remove this part. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144682661 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -896,8 +913,8 @@ private[spark] class MesosClusterScheduler( revive() } - private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = { -pendingRetryDriversState.persist(taskId, desc) + private def addDriverToPending(desc: MesosDriverDescription, id: String) = { +pendingRetryDriversState.persist(id, desc) --- End diff -- Maybe keep the name as `subId` because it could be confusing otherwise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144682434 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -373,10 +374,16 @@ class SparkContext(config: SparkConf) extends Logging { // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") -// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster -if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { - throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + -"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") +// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster or +// System property spark.mesos.driver.frameworkId must be set if user code ran by +// Mesos Dispatcher on a MESOS cluster +if (deployMode == "cluster") { --- End diff -- FWIW, I _believe_ that when we submit a job with the dispatcher `deployMode` is actually set to `client`, so this logic may not be invoked as expected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144682554 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler( s"${frameworkId}-${desc.submissionId}${retries}" } + private def getDriverTaskId(desc: MesosDriverDescription): String = { +val sId = desc.submissionId +desc.retryState.map(state => sId + s"-retry-${state.retries.toString}").getOrElse(sId) + } + + private def getSumbmissionIdFromTaskId(taskId: String): String = { +taskId.split("-retry-").head --- End diff -- Maybe make this a constant? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r143969946 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -804,45 +814,52 @@ private[spark] class MesosClusterScheduler( logInfo(s"Received status update: taskId=${taskId}" + s" state=${status.getState}" + s" message=${status.getMessage}" + - s" reason=${status.getReason}"); + s" reason=${status.getReason}") stateLock.synchronized { - if (launchedDrivers.contains(taskId)) { + val subId = getSumbmissionIdFromTaskId(taskId) + if (launchedDrivers.contains(subId)) { if (status.getReason == Reason.REASON_RECONCILIATION && !pendingRecover.contains(taskId)) { // Task has already received update and no longer requires reconciliation. return } -val state = launchedDrivers(taskId) +val state = launchedDrivers(subId) // Check if the driver is supervise enabled and can be relaunched. if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { - removeFromLaunchedDrivers(taskId) + removeFromLaunchedDrivers(subId) state.finishDate = Some(new Date()) val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState val (retries, waitTimeSec) = retryState .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) } .getOrElse{ (1, 1) } val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) - val newDriverDescription = state.driverDescription.copy( retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec))) - addDriverToPending(newDriverDescription, taskId); + addDriverToPending(newDriverDescription, newDriverDescription.submissionId) } else if (TaskState.isFinished(mesosToTaskState(status.getState))) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - if (finishedDrivers.size >= retainedDrivers) { -val toRemove = math.max(retainedDrivers / 10, 1) -finishedDrivers.trimStart(toRemove) - } - finishedDrivers += state + retireDriver(subId, state, status) } state.mesosTaskStatus = Option(status) } else { -logError(s"Unable to find driver $taskId in status update") +logError(s"Unable to find driver with $taskId in status update") } } } + private def retireDriver( + submissionId: String, + state: MesosClusterSubmissionState, + status: TaskStatus) = { --- End diff -- Will fix thanx. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r143969875 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler( s"${frameworkId}-${desc.submissionId}${retries}" } + private def getDriverTaskId(desc: MesosDriverDescription): String = { +val sId = desc.submissionId +desc.retryState.map(state => sId + s"-retry-${state.retries.toString}").getOrElse(sId) + } + + private def getSumbmissionIdFromTaskId(taskId: String): String = { --- End diff -- Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r143969837 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -276,8 +276,8 @@ private[spark] class MesosClusterScheduler( private def recoverState(): Unit = { stateLock.synchronized { launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state => -launchedDrivers(state.taskId.getValue) = state -pendingRecover(state.taskId.getValue) = state.slaveId +launchedDrivers(state.driverDescription.submissionId) = state +pendingRecover(state.taskId.toString) = state.slaveId --- End diff -- I will check, probably missed the getValue call here it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r143344688 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler( s"${frameworkId}-${desc.submissionId}${retries}" } + private def getDriverTaskId(desc: MesosDriverDescription): String = { +val sId = desc.submissionId +desc.retryState.map(state => sId + s"-retry-${state.retries.toString}").getOrElse(sId) + } + + private def getSumbmissionIdFromTaskId(taskId: String): String = { --- End diff -- typo: "Submission" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r143484031 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -804,45 +814,52 @@ private[spark] class MesosClusterScheduler( logInfo(s"Received status update: taskId=${taskId}" + s" state=${status.getState}" + s" message=${status.getMessage}" + - s" reason=${status.getReason}"); + s" reason=${status.getReason}") stateLock.synchronized { - if (launchedDrivers.contains(taskId)) { + val subId = getSumbmissionIdFromTaskId(taskId) + if (launchedDrivers.contains(subId)) { if (status.getReason == Reason.REASON_RECONCILIATION && !pendingRecover.contains(taskId)) { // Task has already received update and no longer requires reconciliation. return } -val state = launchedDrivers(taskId) +val state = launchedDrivers(subId) // Check if the driver is supervise enabled and can be relaunched. if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { - removeFromLaunchedDrivers(taskId) + removeFromLaunchedDrivers(subId) state.finishDate = Some(new Date()) val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState val (retries, waitTimeSec) = retryState .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) } .getOrElse{ (1, 1) } val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) - val newDriverDescription = state.driverDescription.copy( retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec))) - addDriverToPending(newDriverDescription, taskId); + addDriverToPending(newDriverDescription, newDriverDescription.submissionId) } else if (TaskState.isFinished(mesosToTaskState(status.getState))) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - if (finishedDrivers.size >= retainedDrivers) { -val toRemove = math.max(retainedDrivers / 10, 1) -finishedDrivers.trimStart(toRemove) - } - finishedDrivers += state + retireDriver(subId, state, status) } state.mesosTaskStatus = Option(status) } else { -logError(s"Unable to find driver $taskId in status update") +logError(s"Unable to find driver with $taskId in status update") } } } + private def retireDriver( + submissionId: String, + state: MesosClusterSubmissionState, + status: TaskStatus) = { --- End diff -- This parameter, `status`, is not used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r143487275 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -276,8 +276,8 @@ private[spark] class MesosClusterScheduler( private def recoverState(): Unit = { stateLock.synchronized { launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state => -launchedDrivers(state.taskId.getValue) = state -pendingRecover(state.taskId.getValue) = state.slaveId +launchedDrivers(state.driverDescription.submissionId) = state --- End diff -- Maybe modify the comments up in L.138-150 ^^ to clarify which data structures are keyed by submission ID vs. task ID: - Keyed by submission ID: `launchedDrivers`, `queuedDrivers`, `pendingRetryDrivers`, `finishedDrivers` - Keyed by task ID: `pendingRecover` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r143361887 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -276,8 +276,8 @@ private[spark] class MesosClusterScheduler( private def recoverState(): Unit = { stateLock.synchronized { launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state => -launchedDrivers(state.taskId.getValue) = state -pendingRecover(state.taskId.getValue) = state.slaveId +launchedDrivers(state.driverDescription.submissionId) = state +pendingRecover(state.taskId.toString) = state.slaveId --- End diff -- Why did this change? I think the original `getValue` is the standard way to get the value of this TaskID proto. http://mesos.apache.org/api/latest/java/org/apache/mesos/Protos.TaskID.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
GitHub user skonto opened a pull request: https://github.com/apache/spark/pull/19374 [SPARK-22145][MESOS] fix supervise with checkpointing on mesos ## What changes were proposed in this pull request? - Fixes the issue with frameworkId being recovered by checkpointed data. - Keeps submission driver id is the only index for all data structures in the dispatcher. Allocates a different task id per driver retry to satisfy the mesos requirements. Check the relevant ticket. ## How was this patch tested? Manually tested this. Launched a streaming job with checkpointing to hdfs, made the driver fail several times and observed behavior: ![image](https://user-images.githubusercontent.com/7945591/30940500-f7d2a744-a3e9-11e7-8c56-f2ccbb271e80.png) ![image](https://user-images.githubusercontent.com/7945591/30940550-19bc15de-a3ea-11e7-8a11-f48abfe36720.png) ![image](https://user-images.githubusercontent.com/7945591/30940524-083ea308-a3ea-11e7-83ae-00d3fa17b928.png) ![image](https://user-images.githubusercontent.com/7945591/30940579-2f0fb242-a3ea-11e7-82f9-86179da28b8c.png) ![image](https://user-images.githubusercontent.com/7945591/30940591-3b561b0e-a3ea-11e7-9dbd-e71912bb2ef3.png) ![image](https://user-images.githubusercontent.com/7945591/30940605-49c810ca-a3ea-11e7-8af5-67930851fd38.png) ![image](https://user-images.githubusercontent.com/7945591/30940631-59f4a288-a3ea-11e7-88cb-c3741b72bb13.png) ![image](https://user-images.githubusercontent.com/7945591/30940642-62346c9e-a3ea-11e7-8935-82e494925f67.png) ![image](https://user-images.githubusercontent.com/7945591/30940653-6c46d53c-a3ea-11e7-8dd1-5840d484d28c.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/skonto/spark fix_retry Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19374.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19374 commit 0e5e5e0ef0d2ba030af71132955c63aadf4ca970 Author: Stavros KontopoulosDate: 2017-09-27T22:04:38Z fix supervise with checkpointing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org