Repository: spark
Updated Branches:
  refs/heads/master 8a72734f3 -> 8a1efe307


[SPARK-23683][SQL] FileCommitProtocol.instantiate() hardening

## What changes were proposed in this pull request?

With SPARK-20236, `FileCommitProtocol.instantiate()` looks for a three argument 
constructor, passing in the `dynamicPartitionOverwrite` parameter. If there is 
no such constructor, it falls back to the classic two-arg one.

When `InsertIntoHadoopFsRelationCommand` passes down that 
`dynamicPartitionOverwrite` flag `to FileCommitProtocol.instantiate(`), it 
assumes that the instantiated protocol supports the specific requirements of 
dynamic partition overwrite. It does not notice when this does not hold, and so 
the output generated may be incorrect.

This patch changes  `FileCommitProtocol.instantiate()` so  when 
`dynamicPartitionOverwrite == true`, it requires the protocol implementation to 
have a 3-arg constructor. Classic two arg constructors are supported when it is 
false.

Also it adds some debug level logging for anyone trying to understand what's 
going on.

## How was this patch tested?

Unit tests verify that

* classes with only 2-arg constructor cannot be used with dynamic overwrite
* classes with only 2-arg constructor can be used without dynamic overwrite
* classes with 3 arg constructors can be used with both.
* the fallback to any two arg ctor takes place after the attempt to load the 
3-arg ctor,
* passing in invalid class types fail as expected (regression tests on expected 
behavior)

Author: Steve Loughran <ste...@hortonworks.com>

Closes #20824 from steveloughran/stevel/SPARK-23683-protocol-instantiate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a1efe30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a1efe30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a1efe30

Branch: refs/heads/master
Commit: 8a1efe3076f29259151f1fba2ff894487efb6c4e
Parents: 8a72734
Author: Steve Loughran <ste...@hortonworks.com>
Authored: Fri Mar 16 15:40:21 2018 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Mar 16 15:40:21 2018 -0700

----------------------------------------------------------------------
 .../spark/internal/io/FileCommitProtocol.scala  |  11 +-
 .../FileCommitProtocolInstantiationSuite.scala  | 148 +++++++++++++++++++
 2 files changed, 158 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a1efe30/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala 
b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index 6d0059b..e6e9c9e 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -20,6 +20,7 @@ package org.apache.spark.internal.io
 import org.apache.hadoop.fs._
 import org.apache.hadoop.mapreduce._
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
 
@@ -132,7 +133,7 @@ abstract class FileCommitProtocol {
 }
 
 
-object FileCommitProtocol {
+object FileCommitProtocol extends Logging {
   class TaskCommitMessage(val obj: Any) extends Serializable
 
   object EmptyTaskCommitMessage extends TaskCommitMessage(null)
@@ -145,15 +146,23 @@ object FileCommitProtocol {
       jobId: String,
       outputPath: String,
       dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
+
+    logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" 
+
+      s" dynamic=$dynamicPartitionOverwrite")
     val clazz = 
Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
     // First try the constructor with arguments (jobId: String, outputPath: 
String,
     // dynamicPartitionOverwrite: Boolean).
     // If that doesn't exist, try the one with (jobId: string, outputPath: 
String).
     try {
       val ctor = clazz.getDeclaredConstructor(classOf[String], 
classOf[String], classOf[Boolean])
+      logDebug("Using (String, String, Boolean) constructor")
       ctor.newInstance(jobId, outputPath, 
dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
     } catch {
       case _: NoSuchMethodException =>
+        logDebug("Falling back to (String, String) constructor")
+        require(!dynamicPartitionOverwrite,
+          "Dynamic Partition Overwrite is enabled but" +
+            s" the committer ${className} does not have the appropriate 
constructor")
         val ctor = clazz.getDeclaredConstructor(classOf[String], 
classOf[String])
         ctor.newInstance(jobId, outputPath)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a1efe30/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala
 
b/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala
new file mode 100644
index 0000000..2bd32fc
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Unit tests for instantiation of FileCommitProtocol implementations.
+ */
+class FileCommitProtocolInstantiationSuite extends SparkFunSuite {
+
+  test("Dynamic partitions require appropriate constructor") {
+
+    // you cannot instantiate a two-arg client with dynamic partitions
+    // enabled.
+    val ex = intercept[IllegalArgumentException] {
+      instantiateClassic(true)
+    }
+    // check the contents of the message and rethrow if unexpected.
+    // this preserves the stack trace of the unexpected
+    // exception.
+    if (!ex.toString.contains("Dynamic Partition Overwrite")) {
+      fail(s"Wrong text in caught exception $ex", ex)
+    }
+  }
+
+  test("Standard partitions work with classic constructor") {
+    instantiateClassic(false)
+  }
+
+  test("Three arg constructors have priority") {
+    assert(3 == instantiateNew(false).argCount,
+      "Wrong constructor argument count")
+  }
+
+  test("Three arg constructors have priority when dynamic") {
+    assert(3 == instantiateNew(true).argCount,
+      "Wrong constructor argument count")
+  }
+
+  test("The protocol must be of the correct class") {
+    intercept[ClassCastException] {
+      FileCommitProtocol.instantiate(
+        classOf[Other].getCanonicalName,
+        "job",
+        "path",
+        false)
+    }
+  }
+
+  test("If there is no matching constructor, class hierarchy is irrelevant") {
+    intercept[NoSuchMethodException] {
+      FileCommitProtocol.instantiate(
+        classOf[NoMatchingArgs].getCanonicalName,
+        "job",
+        "path",
+        false)
+    }
+  }
+
+  /**
+   * Create a classic two-arg protocol instance.
+   * @param dynamic dyanmic partitioning mode
+   * @return the instance
+   */
+  private def instantiateClassic(dynamic: Boolean): 
ClassicConstructorCommitProtocol = {
+    FileCommitProtocol.instantiate(
+      classOf[ClassicConstructorCommitProtocol].getCanonicalName,
+      "job",
+      "path",
+      dynamic).asInstanceOf[ClassicConstructorCommitProtocol]
+  }
+
+  /**
+   * Create a three-arg protocol instance.
+   * @param dynamic dyanmic partitioning mode
+   * @return the instance
+   */
+  private def instantiateNew(
+    dynamic: Boolean): FullConstructorCommitProtocol = {
+    FileCommitProtocol.instantiate(
+      classOf[FullConstructorCommitProtocol].getCanonicalName,
+      "job",
+      "path",
+      dynamic).asInstanceOf[FullConstructorCommitProtocol]
+  }
+
+}
+
+/**
+ * This protocol implementation does not have the new three-arg
+ * constructor.
+ */
+private class ClassicConstructorCommitProtocol(arg1: String, arg2: String)
+  extends HadoopMapReduceCommitProtocol(arg1, arg2) {
+}
+
+/**
+ * This protocol implementation does have the new three-arg constructor
+ * alongside the original, and a 4 arg one for completeness.
+ * The final value of the real constructor is the number of arguments
+ * used in the 2- and 3- constructor, for test assertions.
+ */
+private class FullConstructorCommitProtocol(
+  arg1: String,
+  arg2: String,
+  b: Boolean,
+  val argCount: Int)
+  extends HadoopMapReduceCommitProtocol(arg1, arg2, b) {
+
+  def this(arg1: String, arg2: String) = {
+    this(arg1, arg2, false, 2)
+  }
+
+  def this(arg1: String, arg2: String, b: Boolean) = {
+    this(arg1, arg2, false, 3)
+  }
+}
+
+/**
+ * This has the 2-arity constructor, but isn't the right class.
+ */
+private class Other(arg1: String, arg2: String) {
+
+}
+
+/**
+ * This has no matching arguments as well as being the wrong class.
+ */
+private class NoMatchingArgs() {
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to