[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-11-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19374


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-18 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r145391350
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -135,22 +135,24 @@ private[spark] class MesosClusterScheduler(
   private val useFetchCache = 
conf.getBoolean("spark.mesos.fetchCache.enable", false)
   private val schedulerState = engineFactory.createEngine("scheduler")
   private val stateLock = new Object()
+  // Keyed by submission id
   private val finishedDrivers =
 new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
   private var frameworkId: String = null
-  // Holds all the launched drivers and current launch state, keyed by 
driver id.
+  // Holds all the launched drivers and current launch state, keyed by 
submission id.
   private val launchedDrivers = new mutable.HashMap[String, 
MesosClusterSubmissionState]()
   // Holds a map of driver id to expected slave id that is passed to Mesos 
for reconciliation.
   // All drivers that are loaded after failover are added here, as we need 
get the latest
-  // state of the tasks from Mesos.
+  // state of the tasks from Mesos. Keyed by task Id.
   private val pendingRecover = new mutable.HashMap[String, SlaveID]()
-  // Stores all the submitted drivers that hasn't been launched.
+  // Stores all the submitted drivers that hasn't been launched, keyed by 
submission id
   private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
-  // All supervised drivers that are waiting to retry after termination.
+  // All supervised drivers that are waiting to retry after termination, 
keyed by submission id
   private val pendingRetryDrivers = new 
ArrayBuffer[MesosDriverDescription]()
   private val queuedDriversState = 
engineFactory.createEngine("driverQueue")
   private val launchedDriversState = 
engineFactory.createEngine("launchedDrivers")
   private val pendingRetryDriversState = 
engineFactory.createEngine("retryList")
+  private final val RETRY_ID = "-retry-"
--- End diff --

np will update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r145292254
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -135,22 +135,24 @@ private[spark] class MesosClusterScheduler(
   private val useFetchCache = 
conf.getBoolean("spark.mesos.fetchCache.enable", false)
   private val schedulerState = engineFactory.createEngine("scheduler")
   private val stateLock = new Object()
+  // Keyed by submission id
   private val finishedDrivers =
 new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
   private var frameworkId: String = null
-  // Holds all the launched drivers and current launch state, keyed by 
driver id.
+  // Holds all the launched drivers and current launch state, keyed by 
submission id.
   private val launchedDrivers = new mutable.HashMap[String, 
MesosClusterSubmissionState]()
   // Holds a map of driver id to expected slave id that is passed to Mesos 
for reconciliation.
   // All drivers that are loaded after failover are added here, as we need 
get the latest
-  // state of the tasks from Mesos.
+  // state of the tasks from Mesos. Keyed by task Id.
   private val pendingRecover = new mutable.HashMap[String, SlaveID]()
-  // Stores all the submitted drivers that hasn't been launched.
+  // Stores all the submitted drivers that hasn't been launched, keyed by 
submission id
   private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
-  // All supervised drivers that are waiting to retry after termination.
+  // All supervised drivers that are waiting to retry after termination, 
keyed by submission id
   private val pendingRetryDrivers = new 
ArrayBuffer[MesosDriverDescription]()
   private val queuedDriversState = 
engineFactory.createEngine("driverQueue")
   private val launchedDriversState = 
engineFactory.createEngine("launchedDrivers")
   private val pendingRetryDriversState = 
engineFactory.createEngine("retryList")
+  private final val RETRY_ID = "-retry-"
--- End diff --

Sorry, super nit, maybe `RETRY_SEP`. Feel free to ignore. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-17 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r145066047
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -276,8 +276,8 @@ private[spark] class MesosClusterScheduler(
   private def recoverState(): Unit = {
 stateLock.synchronized {
   launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach 
{ state =>
-launchedDrivers(state.taskId.getValue) = state
-pendingRecover(state.taskId.getValue) = state.slaveId
+launchedDrivers(state.driverDescription.submissionId) = state
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-16 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144947372
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -896,8 +913,8 @@ private[spark] class MesosClusterScheduler(
 revive()
   }
 
-  private def addDriverToPending(desc: MesosDriverDescription, taskId: 
String) = {
-pendingRetryDriversState.persist(taskId, desc)
+  private def addDriverToPending(desc: MesosDriverDescription, id: String) 
= {
+pendingRetryDriversState.persist(id, desc)
--- End diff --

ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-16 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144947305
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler(
 s"${frameworkId}-${desc.submissionId}${retries}"
   }
 
+  private def getDriverTaskId(desc: MesosDriverDescription): String = {
+val sId = desc.submissionId
+desc.retryState.map(state => sId + 
s"-retry-${state.retries.toString}").getOrElse(sId)
+  }
+
+  private def getSumbmissionIdFromTaskId(taskId: String): String = {
+taskId.split("-retry-").head
--- End diff --

sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-16 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144947255
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -373,10 +374,16 @@ class SparkContext(config: SparkConf) extends Logging 
{
 // log out spark.app.name in the Spark driver logs
 logInfo(s"Submitted application: $appName")
 
-// System property spark.yarn.app.id must be set if user code ran by 
AM on a YARN cluster
-if (master == "yarn" && deployMode == "cluster" && 
!_conf.contains("spark.yarn.app.id")) {
-  throw new SparkException("Detected yarn cluster mode, but isn't 
running on a cluster. " +
-"Deployment to YARN is not supported directly by SparkContext. 
Please use spark-submit.")
+// System property spark.yarn.app.id must be set if user code ran by 
AM on a YARN cluster or
+// System property spark.mesos.driver.frameworkId must be set if user 
code ran by
+// Mesos Dispatcher on a MESOS cluster
+if (deployMode == "cluster") {
--- End diff --

Ok probably you are right we do a spark submit again. I will remove this 
part.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144682661
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -896,8 +913,8 @@ private[spark] class MesosClusterScheduler(
 revive()
   }
 
-  private def addDriverToPending(desc: MesosDriverDescription, taskId: 
String) = {
-pendingRetryDriversState.persist(taskId, desc)
+  private def addDriverToPending(desc: MesosDriverDescription, id: String) 
= {
+pendingRetryDriversState.persist(id, desc)
--- End diff --

Maybe keep the name as `subId` because it could be confusing otherwise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144682434
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -373,10 +374,16 @@ class SparkContext(config: SparkConf) extends Logging 
{
 // log out spark.app.name in the Spark driver logs
 logInfo(s"Submitted application: $appName")
 
-// System property spark.yarn.app.id must be set if user code ran by 
AM on a YARN cluster
-if (master == "yarn" && deployMode == "cluster" && 
!_conf.contains("spark.yarn.app.id")) {
-  throw new SparkException("Detected yarn cluster mode, but isn't 
running on a cluster. " +
-"Deployment to YARN is not supported directly by SparkContext. 
Please use spark-submit.")
+// System property spark.yarn.app.id must be set if user code ran by 
AM on a YARN cluster or
+// System property spark.mesos.driver.frameworkId must be set if user 
code ran by
+// Mesos Dispatcher on a MESOS cluster
+if (deployMode == "cluster") {
--- End diff --

FWIW, I _believe_ that when we submit a job with the dispatcher 
`deployMode` is actually set to `client`, so this logic may not be invoked as 
expected. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144682554
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler(
 s"${frameworkId}-${desc.submissionId}${retries}"
   }
 
+  private def getDriverTaskId(desc: MesosDriverDescription): String = {
+val sId = desc.submissionId
+desc.retryState.map(state => sId + 
s"-retry-${state.retries.toString}").getOrElse(sId)
+  }
+
+  private def getSumbmissionIdFromTaskId(taskId: String): String = {
+taskId.split("-retry-").head
--- End diff --

Maybe make this a constant? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-11 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r143969946
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -804,45 +814,52 @@ private[spark] class MesosClusterScheduler(
 logInfo(s"Received status update: taskId=${taskId}" +
   s" state=${status.getState}" +
   s" message=${status.getMessage}" +
-  s" reason=${status.getReason}");
+  s" reason=${status.getReason}")
 
 stateLock.synchronized {
-  if (launchedDrivers.contains(taskId)) {
+  val subId = getSumbmissionIdFromTaskId(taskId)
+  if (launchedDrivers.contains(subId)) {
 if (status.getReason == Reason.REASON_RECONCILIATION &&
   !pendingRecover.contains(taskId)) {
   // Task has already received update and no longer requires 
reconciliation.
   return
 }
-val state = launchedDrivers(taskId)
+val state = launchedDrivers(subId)
 // Check if the driver is supervise enabled and can be relaunched.
 if (state.driverDescription.supervise && 
shouldRelaunch(status.getState)) {
-  removeFromLaunchedDrivers(taskId)
+  removeFromLaunchedDrivers(subId)
   state.finishDate = Some(new Date())
   val retryState: Option[MesosClusterRetryState] = 
state.driverDescription.retryState
   val (retries, waitTimeSec) = retryState
 .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, 
rs.waitTime * 2)) }
 .getOrElse{ (1, 1) }
   val nextRetry = new Date(new Date().getTime + waitTimeSec * 
1000L)
-
   val newDriverDescription = state.driverDescription.copy(
 retryState = Some(new MesosClusterRetryState(status, retries, 
nextRetry, waitTimeSec)))
-  addDriverToPending(newDriverDescription, taskId);
+  addDriverToPending(newDriverDescription, 
newDriverDescription.submissionId)
 } else if 
(TaskState.isFinished(mesosToTaskState(status.getState))) {
-  removeFromLaunchedDrivers(taskId)
-  state.finishDate = Some(new Date())
-  if (finishedDrivers.size >= retainedDrivers) {
-val toRemove = math.max(retainedDrivers / 10, 1)
-finishedDrivers.trimStart(toRemove)
-  }
-  finishedDrivers += state
+  retireDriver(subId, state, status)
 }
 state.mesosTaskStatus = Option(status)
   } else {
-logError(s"Unable to find driver $taskId in status update")
+logError(s"Unable to find driver with $taskId in status update")
   }
 }
   }
 
+  private def retireDriver(
+  submissionId: String,
+  state: MesosClusterSubmissionState,
+  status: TaskStatus) = {
--- End diff --

Will fix thanx.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-11 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r143969875
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler(
 s"${frameworkId}-${desc.submissionId}${retries}"
   }
 
+  private def getDriverTaskId(desc: MesosDriverDescription): String = {
+val sId = desc.submissionId
+desc.retryState.map(state => sId + 
s"-retry-${state.retries.toString}").getOrElse(sId)
+  }
+
+  private def getSumbmissionIdFromTaskId(taskId: String): String = {
--- End diff --

Will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-11 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r143969837
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -276,8 +276,8 @@ private[spark] class MesosClusterScheduler(
   private def recoverState(): Unit = {
 stateLock.synchronized {
   launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach 
{ state =>
-launchedDrivers(state.taskId.getValue) = state
-pendingRecover(state.taskId.getValue) = state.slaveId
+launchedDrivers(state.driverDescription.submissionId) = state
+pendingRecover(state.taskId.toString) = state.slaveId
--- End diff --

I will check, probably missed the getValue call here it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-09 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r143344688
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler(
 s"${frameworkId}-${desc.submissionId}${retries}"
   }
 
+  private def getDriverTaskId(desc: MesosDriverDescription): String = {
+val sId = desc.submissionId
+desc.retryState.map(state => sId + 
s"-retry-${state.retries.toString}").getOrElse(sId)
+  }
+
+  private def getSumbmissionIdFromTaskId(taskId: String): String = {
--- End diff --

typo: "Submission"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-09 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r143484031
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -804,45 +814,52 @@ private[spark] class MesosClusterScheduler(
 logInfo(s"Received status update: taskId=${taskId}" +
   s" state=${status.getState}" +
   s" message=${status.getMessage}" +
-  s" reason=${status.getReason}");
+  s" reason=${status.getReason}")
 
 stateLock.synchronized {
-  if (launchedDrivers.contains(taskId)) {
+  val subId = getSumbmissionIdFromTaskId(taskId)
+  if (launchedDrivers.contains(subId)) {
 if (status.getReason == Reason.REASON_RECONCILIATION &&
   !pendingRecover.contains(taskId)) {
   // Task has already received update and no longer requires 
reconciliation.
   return
 }
-val state = launchedDrivers(taskId)
+val state = launchedDrivers(subId)
 // Check if the driver is supervise enabled and can be relaunched.
 if (state.driverDescription.supervise && 
shouldRelaunch(status.getState)) {
-  removeFromLaunchedDrivers(taskId)
+  removeFromLaunchedDrivers(subId)
   state.finishDate = Some(new Date())
   val retryState: Option[MesosClusterRetryState] = 
state.driverDescription.retryState
   val (retries, waitTimeSec) = retryState
 .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, 
rs.waitTime * 2)) }
 .getOrElse{ (1, 1) }
   val nextRetry = new Date(new Date().getTime + waitTimeSec * 
1000L)
-
   val newDriverDescription = state.driverDescription.copy(
 retryState = Some(new MesosClusterRetryState(status, retries, 
nextRetry, waitTimeSec)))
-  addDriverToPending(newDriverDescription, taskId);
+  addDriverToPending(newDriverDescription, 
newDriverDescription.submissionId)
 } else if 
(TaskState.isFinished(mesosToTaskState(status.getState))) {
-  removeFromLaunchedDrivers(taskId)
-  state.finishDate = Some(new Date())
-  if (finishedDrivers.size >= retainedDrivers) {
-val toRemove = math.max(retainedDrivers / 10, 1)
-finishedDrivers.trimStart(toRemove)
-  }
-  finishedDrivers += state
+  retireDriver(subId, state, status)
 }
 state.mesosTaskStatus = Option(status)
   } else {
-logError(s"Unable to find driver $taskId in status update")
+logError(s"Unable to find driver with $taskId in status update")
   }
 }
   }
 
+  private def retireDriver(
+  submissionId: String,
+  state: MesosClusterSubmissionState,
+  status: TaskStatus) = {
--- End diff --

This parameter, `status`, is not used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-09 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r143487275
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -276,8 +276,8 @@ private[spark] class MesosClusterScheduler(
   private def recoverState(): Unit = {
 stateLock.synchronized {
   launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach 
{ state =>
-launchedDrivers(state.taskId.getValue) = state
-pendingRecover(state.taskId.getValue) = state.slaveId
+launchedDrivers(state.driverDescription.submissionId) = state
--- End diff --

Maybe modify the comments up in L.138-150 ^^ to clarify which data 
structures are keyed by submission ID vs. task ID:
- Keyed by submission ID: `launchedDrivers`, `queuedDrivers`, 
`pendingRetryDrivers`, `finishedDrivers`
- Keyed by task ID: `pendingRecover`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-09 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r143361887
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -276,8 +276,8 @@ private[spark] class MesosClusterScheduler(
   private def recoverState(): Unit = {
 stateLock.synchronized {
   launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach 
{ state =>
-launchedDrivers(state.taskId.getValue) = state
-pendingRecover(state.taskId.getValue) = state.slaveId
+launchedDrivers(state.driverDescription.submissionId) = state
+pendingRecover(state.taskId.toString) = state.slaveId
--- End diff --

Why did this change? I think the original `getValue` is the standard way to 
get the value of this TaskID proto. 
http://mesos.apache.org/api/latest/java/org/apache/mesos/Protos.TaskID.html


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-09-27 Thread skonto
GitHub user skonto opened a pull request:

https://github.com/apache/spark/pull/19374

[SPARK-22145][MESOS] fix supervise with checkpointing on mesos

## What changes were proposed in this pull request?

- Fixes the issue with frameworkId being recovered by checkpointed data.
- Keeps submission driver id is the only index for all data structures in 
the dispatcher. 
Allocates a different task id per driver retry to satisfy the mesos 
requirements.
Check the relevant ticket.
## How was this patch tested?

Manually tested this. Launched a streaming job with checkpointing to hdfs, 
made the driver fail several times and observed behavior:

![image](https://user-images.githubusercontent.com/7945591/30940500-f7d2a744-a3e9-11e7-8c56-f2ccbb271e80.png)


![image](https://user-images.githubusercontent.com/7945591/30940550-19bc15de-a3ea-11e7-8a11-f48abfe36720.png)


![image](https://user-images.githubusercontent.com/7945591/30940524-083ea308-a3ea-11e7-83ae-00d3fa17b928.png)


![image](https://user-images.githubusercontent.com/7945591/30940579-2f0fb242-a3ea-11e7-82f9-86179da28b8c.png)


![image](https://user-images.githubusercontent.com/7945591/30940591-3b561b0e-a3ea-11e7-9dbd-e71912bb2ef3.png)


![image](https://user-images.githubusercontent.com/7945591/30940605-49c810ca-a3ea-11e7-8af5-67930851fd38.png)


![image](https://user-images.githubusercontent.com/7945591/30940631-59f4a288-a3ea-11e7-88cb-c3741b72bb13.png)


![image](https://user-images.githubusercontent.com/7945591/30940642-62346c9e-a3ea-11e7-8935-82e494925f67.png)


![image](https://user-images.githubusercontent.com/7945591/30940653-6c46d53c-a3ea-11e7-8dd1-5840d484d28c.png)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skonto/spark fix_retry

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19374


commit 0e5e5e0ef0d2ba030af71132955c63aadf4ca970
Author: Stavros Kontopoulos 
Date:   2017-09-27T22:04:38Z

fix supervise with checkpointing




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org