Repository: spark
Updated Branches:
  refs/heads/master d8b81f778 -> 64fbdf1aa


[SPARK-18191][CORE][FOLLOWUP] Call `setConf` if `OutputFormat` is 
`Configurable`.

## What changes were proposed in this pull request?

We should call `setConf` if `OutputFormat` is `Configurable`, this should be 
done before we create `OutputCommitter` and `RecordWriter`.
This is follow up of #15769, see discussion 
[here](https://github.com/apache/spark/pull/15769/files#r87064229)

## How was this patch tested?

Add test of this case in `PairRDDFunctionsSuite`.

Author: jiangxingbo <jiangxb1...@gmail.com>

Closes #15823 from jiangxb1987/config-format.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64fbdf1a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64fbdf1a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64fbdf1a

Branch: refs/heads/master
Commit: 64fbdf1aa90b66269daec29f62dc9431c1173bab
Parents: d8b81f7
Author: jiangxingbo <jiangxb1...@gmail.com>
Authored: Wed Nov 9 13:14:26 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Nov 9 13:14:26 2016 -0800

----------------------------------------------------------------------
 .../internal/io/HadoopMapReduceCommitProtocol.scala  |  9 ++++++++-
 .../internal/io/SparkHadoopMapReduceWriter.scala     |  9 +++++++--
 .../org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 15 +++++++++++++++
 3 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64fbdf1a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index d643a32..6b0bcb8 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -19,6 +19,7 @@ package org.apache.spark.internal.io
 
 import java.util.Date
 
+import org.apache.hadoop.conf.Configurable
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
@@ -42,7 +43,13 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
   @transient private var committer: OutputCommitter = _
 
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
-    context.getOutputFormatClass.newInstance().getOutputCommitter(context)
+    val format = context.getOutputFormatClass.newInstance()
+    // If OutputFormat is Configurable, we should set conf to it.
+    format match {
+      case c: Configurable => c.setConf(context.getConfiguration)
+      case _ => ()
+    }
+    format.getOutputCommitter(context)
   }
 
   override def newTaskTempFile(

http://git-wip-us.apache.org/repos/asf/spark/blob/64fbdf1a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index a405c44..7964392 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -23,7 +23,7 @@ import java.util.{Date, Locale}
 import scala.reflect.ClassTag
 import scala.util.DynamicVariable
 
-import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.{JobConf, JobID}
 import org.apache.hadoop.mapreduce._
@@ -140,7 +140,12 @@ object SparkHadoopMapReduceWriter extends Logging {
       SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
 
     // Initiate the writer.
-    val taskFormat = outputFormat.newInstance
+    val taskFormat = outputFormat.newInstance()
+    // If OutputFormat is Configurable, we should set conf to it.
+    taskFormat match {
+      case c: Configurable => c.setConf(hadoopConf)
+      case _ => ()
+    }
     val writer = taskFormat.getRecordWriter(taskContext)
       .asInstanceOf[RecordWriter[K, V]]
     require(writer != null, "Unable to obtain RecordWriter")

http://git-wip-us.apache.org/repos/asf/spark/blob/64fbdf1a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index fe547d4..02df157 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -509,6 +509,21 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
       (2, ArrayBuffer(1))))
   }
 
+  test("saveNewAPIHadoopFile should call setConf if format is configurable") {
+    val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
+
+    // No error, non-configurable formats still work
+    pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
+
+    /*
+     * Check that configurable formats get configured:
+     * ConfigTestFormat throws an exception if we try to write
+     * to it when setConf hasn't been called first.
+     * Assertion is in ConfigTestFormat.getRecordWriter.
+     */
+    pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
+  }
+
   test("saveAsHadoopFile should respect configured output committers") {
     val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
     val conf = new JobConf()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to