Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/6964#discussion_r33104219
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---
@@ -356,34 +360,46 @@ private[sql] abstract class BaseWriterContainer(
}
private def newOutputCommitter(context: TaskAttemptContext):
OutputCommitter = {
- val committerClass = context.getConfiguration.getClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
- Option(committerClass).map { clazz =>
- logInfo(s"Using user defined output committer class
${clazz.getCanonicalName}")
-
- // Every output format based on
org.apache.hadoop.mapreduce.lib.output.OutputFormat
- // has an associated output committer. To override this output
committer,
- // we will first try to use the output committer set in
SQLConf.OUTPUT_COMMITTER_CLASS.
- // If a data source needs to override the output committer, it needs
to set the
- // output committer in prepareForWrite method.
- if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
- // The specified output committer is a FileOutputCommitter.
- // So, we will use the FileOutputCommitter-specified constructor.
- val ctor = clazz.getDeclaredConstructor(classOf[Path],
classOf[TaskAttemptContext])
- ctor.newInstance(new Path(outputPath), context)
- } else {
- // The specified output committer is just a OutputCommitter.
- // So, we will use the no-argument constructor.
- val ctor = clazz.getDeclaredConstructor()
- ctor.newInstance()
- }
- }.getOrElse {
- // If output committer class is not set, we will use the one
associated with the
- // file output format.
+ if (isAppend) {
+ // If we are appending data to an existing dir, we will only use the
output committer
+ // associated with the file output format since it is not save to
use a custom
+ // committer for appending. For example, in S3, direct parquet
output committer may
+ // leave partial data in the destination dir when the the appending
job fails.
val outputCommitter =
outputFormatClass.newInstance().getOutputCommitter(context)
- logInfo(s"Using output committer class
${outputCommitter.getClass.getCanonicalName}")
+ logInfo(
+ s"Using output committer class
${outputCommitter.getClass.getCanonicalName} " +
+ "for appending.")
outputCommitter
+ } else {
+ val committerClass = context.getConfiguration.getClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+ Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class
${clazz.getCanonicalName}")
+
+ // Every output format based on
org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output
committer,
+ // we will first try to use the output committer set in
SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it
needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz))
{
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path],
classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ } else {
+ // The specified output committer is just a OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ ctor.newInstance()
+ }
+ }.getOrElse {
+ // If output committer class is not set, we will use the one
associated with the
+ // file output format.
+ val outputCommitter =
outputFormatClass.newInstance().getOutputCommitter(context)
+ logInfo(s"Using output committer class
${outputCommitter.getClass.getCanonicalName}")
+ outputCommitter
+ }
--- End diff --
A minor refactoring for this `if` to avoid duplicate code:
```scala
val defaultOutputCommitter = ???
if (isAppend) {
logInfo(s"Using output committer class ...")
defaultOutputCommitter
} else {
val committerClass = ???
Option(committerClass).map {
???
}.getOrElse {
logInfo(s"Using output committer class ...")
defaultOutputCommitter
}
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]