Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/20824#discussion_r174801694
--- Diff:
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
@@ -145,15 +146,23 @@ object FileCommitProtocol {
jobId: String,
outputPath: String,
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
+
+ logDebug(s"Creating committer $className; job $jobId;
output=$outputPath;" +
+ s" dynamic=$dynamicPartitionOverwrite")
val clazz =
Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
// First try the constructor with arguments (jobId: String,
outputPath: String,
// dynamicPartitionOverwrite: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath:
String).
try {
val ctor = clazz.getDeclaredConstructor(classOf[String],
classOf[String], classOf[Boolean])
+ logDebug("Using (String, String, Boolean) constructor")
ctor.newInstance(jobId, outputPath,
dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
} catch {
case _: NoSuchMethodException =>
+ logDebug("Falling back to (String, String) constructor")
+ require(!dynamicPartitionOverwrite,
+ "Dynamic Partition Overwrite is enabled but" +
+ s" the committer ${className} does not have the appropriate
constructor")
--- End diff --
BTW, why don't we warn and continue? Just wanted to make sure that we took
this case into account. For example,
wouldn't this invalidate the case below?
```scala
private class CommitProtocol(arg1: String, arg2: String)
extends HadoopMapReduceCommitProtocol(arg1, arg2, true) {
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]