[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r157111981
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
@@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging {
   config: HadoopWriteConfigUtil[K, V]): Unit = {
 // Extract context and configuration from RDD.
 val sparkContext = rdd.context
-val stageId = rdd.id
+val commitJobId = rdd.id
 
 // Set up a job.
 val jobTrackerId = createJobTrackerID(new Date())
--- End diff --

`jobTrackerId` is also not unique, is that OK?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-14 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r157111526
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
--- End diff --

@cloud-fan yeah, but as I mentioned above, removing jobId from the 
signature of commitTask will cause a binary incompatibility error, since 
commitTask here is a public method. So although we no longer use it, we cannot 
remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r157110808
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
--- End diff --

 the `jobId` parameter is not used now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-14 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r157068238
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends 
NewFakeFormat {
   }
 }
 
+class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
+  def setupJob(j: NewJobContext): Unit = {
+JobID.jobid = j.getJobID().getId
+  }
+
+  def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
+
+  def setupTask(t: NewTaskAttempContext): Unit = {
+val jobId = t.getTaskAttemptID().getJobID().getId
+assert(jobId === JobID.jobid)
+  }
+
+  def commitTask(t: NewTaskAttempContext): Unit = {}
+
+  def abortTask(t: NewTaskAttempContext): Unit = {}
+}
+
+class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
+
+  def checkOutputSpecs(j: NewJobContext): Unit = {}
+
+  def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, 
Integer] = {
+new NewFakeWriter()
+  }
+
+  def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
+new YetAnotherFakeCommitter()
+  }
+}
+
+object JobID {
+  var jobid = -1
--- End diff --

@steveloughran, this isn't used by committers. It is just used for storing 
the jobId. For test purposes here I used -1  to just make sure this variable is 
not set.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-14 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r157067651
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+val stageId = TaskContext.get().stageId()
+val canCommit = outputCommitCoordinator.canCommit(stageId, 
splitId, taskAttemptNumber)
--- End diff --

Thanks @steveloughran for the coment. The reason will be logged in method 
handleAskPermissionToCommit in 
org.apache.spark.scheduler.OutputCommitCoordinator.scala


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-14 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r157064132
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends 
NewFakeFormat {
   }
 }
 
+class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
+  def setupJob(j: NewJobContext): Unit = {
+JobID.jobid = j.getJobID().getId
+  }
+
+  def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
+
+  def setupTask(t: NewTaskAttempContext): Unit = {
+val jobId = t.getTaskAttemptID().getJobID().getId
+assert(jobId === JobID.jobid)
+  }
+
+  def commitTask(t: NewTaskAttempContext): Unit = {}
+
+  def abortTask(t: NewTaskAttempContext): Unit = {}
+}
+
+class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
+
+  def checkOutputSpecs(j: NewJobContext): Unit = {}
+
+  def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, 
Integer] = {
+new NewFakeWriter()
+  }
+
+  def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
+new YetAnotherFakeCommitter()
+  }
+}
+
+object JobID {
+  var jobid = -1
--- End diff --

committers may not like a -ve job ID; they do tend to assume and for v1 
commit rely on things starting at 0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-14 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r157063770
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+val stageId = TaskContext.get().stageId()
+val canCommit = outputCommitCoordinator.canCommit(stageId, 
splitId, taskAttemptNumber)
--- End diff --

Ever thought of returning why the commit was refused, e.g: unknown stage ID 
vs other task attempt committed, vs you ar considered failed?
Not that the task committer should behave differently, but it might be nice 
to pass that info back for logging alone


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19848


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-12-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154403383
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+val stageId = TaskContext.get().stageId()
+val canCommit = outputCommitCoordinator.canCommit(stageId, 
splitId, taskAttemptNumber)
--- End diff --

Ah, great, another public class that has no good reason to be public...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154256205
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+val stageId = TaskContext.get().stageId()
+val canCommit = outputCommitCoordinator.canCommit(stageId, 
splitId, taskAttemptNumber)
--- End diff --

 Removing jobId from the signature of commitTask will cause a binary 
incompatibility error, since commitTask here is a public method. So we will 
ended up with a parameter that will stay unused. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154238528
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+val stageId = TaskContext.get().stageId()
+val canCommit = outputCommitCoordinator.canCommit(stageId, 
splitId, taskAttemptNumber)
--- End diff --

It make sense to use stageId there, since before jobId was used instead of 
stageId. I will test that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154237986
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
@@ -102,14 +103,15 @@ object SparkHadoopWriter extends Logging {
   context: TaskContext,
   config: HadoopWriteConfigUtil[K, V],
   jobTrackerId: String,
+  commitJobId: Int,
   sparkStageId: Int,
   sparkPartitionId: Int,
   sparkAttemptNumber: Int,
   committer: FileCommitProtocol,
   iterator: Iterator[(K, V)]): TaskCommitMessage = {
 // Set up a task.
 val taskContext = config.createTaskAttemptContext(
-  jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
+  jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
--- End diff --

I removed it a few min ago. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154235818
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+val stageId = TaskContext.get().stageId()
+val canCommit = outputCommitCoordinator.canCommit(stageId, 
splitId, taskAttemptNumber)
--- End diff --

Shouldn't `CommitDeniedException` (below) be updated to use the stage ID 
also? Otherwise the exception might have incomplete information.

With that change it's possible that `jobId` might become unused in this 
method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154236366
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
@@ -102,14 +103,15 @@ object SparkHadoopWriter extends Logging {
   context: TaskContext,
   config: HadoopWriteConfigUtil[K, V],
   jobTrackerId: String,
+  commitJobId: Int,
   sparkStageId: Int,
   sparkPartitionId: Int,
   sparkAttemptNumber: Int,
   committer: FileCommitProtocol,
   iterator: Iterator[(K, V)]): TaskCommitMessage = {
 // Set up a task.
 val taskContext = config.createTaskAttemptContext(
-  jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
+  jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
--- End diff --

`sparkStageId` is now unused in this method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154168397
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +79,14 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+var canCommit: Boolean = true
+// This checks whether the commitTask provided by stageId, which 
if not the canCommit
+// will use jobId as stageId to decide whether the commit should 
be possible
+if (stageId != -1) {
--- End diff --

Thank you very much, Marcelo. I had a misunderstanding about Mirdul's 
comment. We can get stageId from TaskContext.get as he suggested there. That 
way everything is much easier. I will update this PR soon after finish testing. 
Thank you again. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154156234
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
@@ -106,6 +106,12 @@ abstract class FileCommitProtocol {
*/
   def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
 
+  /**
+   * Commits a task which blongs to a specific stage after the writes 
succeed.
--- End diff --

belongs


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154157813
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -70,7 +79,14 @@ object SparkHadoopMapRedUtil extends Logging {
   if (shouldCoordinateWithDriver) {
 val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
 val taskAttemptNumber = TaskContext.get().attemptNumber()
-val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+var canCommit: Boolean = true
+// This checks whether the commitTask provided by stageId, which 
if not the canCommit
+// will use jobId as stageId to decide whether the commit should 
be possible
+if (stageId != -1) {
--- End diff --

In which case would this happen? Would it be hard to change the API so that 
the stage id is always provided to `commitTask`?

Mridul suggested in the previous PR to use the MR job configuration to 
propagate this (which you can access in the `mrTaskContext` parameter above). 
Any reason why you didn't go that route?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154156530
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
 pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
   }
 
+  test("The JobId on driver and executor should be the same during the 
commit") {
+// Create more than one rdd to mimic stageId not equal to rddId
+val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2).
+  map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }.filter { 
p => p._1 > 0 }
--- End diff --

nit: move `.filter` to next line for readability.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154156477
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
 pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
   }
 
+  test("The JobId on driver and executor should be the same during the 
commit") {
+// Create more than one rdd to mimic stageId not equal to rddId
+val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2).
--- End diff --

`.` goes on next line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154158671
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
 pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
   }
 
+  test("The JobId on driver and executor should be the same during the 
commit") {
+// Create more than one rdd to mimic stageId not equal to rddId
+val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2).
+  map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }.filter { 
p => p._1 > 0 }
+pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
+  }
--- End diff --

Add `assert(JobID.jobid != -1)` to make sure the test code is actually 
running.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r154137939
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
@@ -106,6 +106,12 @@ abstract class FileCommitProtocol {
*/
   def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
 
+  /**
+   * Commits a task which blongs to a specific stage after the writes 
succeed.
+   * Must be called on the executors when running tasks.
+   */
+  private[spark] def commitTask(taskContext: TaskAttemptContext, stageId: 
Int): TaskCommitMessage
--- End diff --

These classes are in an `.internal` package so there's no need for 
`private[spark]`. Or at least that's what I see in the sbt build scripts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19848#discussion_r153957791
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
@@ -106,6 +106,13 @@ abstract class FileCommitProtocol {
*/
   def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
 
+  /**
+   * Commits a task which blongs to a specific stage after the writes 
succeed.
+   * Must be called on the executors when running tasks.
+   */
+  private[spark]
+  def commitTask(taskContext: TaskAttemptContext, stageId: Int): 
TaskCommitMessage
--- End diff --

Modifiers should be in the same line as method declarations, all over your 
patch. Just look at existing code and follow the style.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

2017-11-29 Thread rezasafi
GitHub user rezasafi opened a pull request:

https://github.com/apache/spark/pull/19848

[SPARK-22162] Executors and the driver should use consistent JobIDs in the 
RDD commit protocol

I have modified SparkHadoopWriter so that executors and the driver always 
use consistent JobIds during the hadoop commit. Before SPARK-18191, spark 
always used the rddId, it just incorrectly named the variable stageId. After 
SPARK-18191, it used the rddId in the driver, and the stageId in the executors. 
With this change, FileCommitProtocol now has a commitTask method that will 
receive  rddId as the JobId  in addition to the stageId. Then during the hadoop 
commit protocol, the jobId will be used by hadoop while spark can still uses 
stageId like before. This way executors and the driver will consistently uses 
stageId.
In addition to the existing unit tests, a test has been added to check 
whether executors and the driver are using the same JobId. The test failed 
before this change and passed after applying this fix.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rezasafi/spark stagerddsimple

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19848.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19848


commit 4dbdbe77435630e3b35581c59189ec75c9c2484d
Author: Reza Safi 
Date:   2017-11-28T23:03:37Z

[SPARK-22162] Executors and the driver should use consistent JobIDs in the 
RDD commit protocol




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org