turboFei commented on a change in pull request #25863:
[WIP][SPARK-29037][CORE][SQL] For static partition overwrite, spark may give
duplicate result.
URL: https://github.com/apache/spark/pull/25863#discussion_r327463933
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
##########
@@ -169,4 +184,176 @@ object FileCommitProtocol extends Logging {
ctor.newInstance(jobId, outputPath)
}
}
+
+ /**
+ * Commit all committed task output to a outputPath directly.
+ * These code referred the implementation of [[FileOutputCommitter]],
+ * please keep consistent with it.
+ */
+ @throws[IOException]
+ def commitJob(
+ committer: FileOutputCommitter,
+ context: JobContext,
+ outputPath: Path): Unit = {
+ val algorithmVersion = getFieldValue(committer,
"algorithmVersion").asInstanceOf[Int]
+ val maxAttemptsOnFailure = if (algorithmVersion == 2) {
+
context.getConfiguration.getInt("mapreduce.fileoutputcommitter.failures.attempts",
1)
+ } else {
+ 1
+ }
+ var attempt = 0
+ var jobCommitNotFinished = true
+ while (jobCommitNotFinished) {
+ try {
+ commitJobInternal(committer, context, algorithmVersion, outputPath)
+ jobCommitNotFinished = false
+ } catch {
+ case e: Exception =>
+ if (attempt + 1 > maxAttemptsOnFailure) {
+ throw e
+ } else {
+ attempt += 1
+ logWarning(s"Exception get thrown in job commit, retry ($attempt)
time.", e)
+ }
+ }
+ }
+ if (committer.getClass.getName.endsWith("ParquetOutputCommitter")) {
+ // Please keep consistent with the implementation of
ParquetOutputCommitter.commitJob().
+ var parquetCommitter: Class[_] = null
+ var contextUtil: Class[_] = null
+
+ try {
+ parquetCommitter =
Utils.classForName("org.apache.parquet.hadoop.ParquetOutputCommitter")
+ contextUtil =
Utils.classForName("org.apache.parquet.hadoop.util.ContextUtil")
+ } catch {
+ case _: ClassNotFoundException =>
+ parquetCommitter =
Utils.classForName("parquet.hadoop.ParquetOutputCommitter")
+ contextUtil = Utils.classForName("parquet.hadoop.util.ContextUtil")
+ }
+
+ val getConfiguration = contextUtil.getMethod("getConfiguration",
classOf[JobContext])
+ val writeMetadata = parquetCommitter.getMethod("writeMetaDataFile",
classOf[Configuration],
+ classOf[Path])
+ val configuration = getConfiguration.invoke(null, context)
+ writeMetadata.invoke(null, configuration, outputPath)
+ }
+ }
+
+ /**
+ * The job has completed, so do following commit job, include:
+ * Move all committed tasks to the final output dir (algorithm 1 only).
+ * Delete the temporary directory, including all of the work directories.
+ * Create a _SUCCESS file to make it as successful.
+ *
+ * Copied from [[FileOutputCommitter]], please keep consistent with it.
+ */
+ @throws[IOException]
+ private def commitJobInternal(
+ committer: FileOutputCommitter,
+ context: JobContext,
+ algorithmVersion: Int,
+ outputPath: Path): Unit = {
+ if (outputPath != null) {
+ val fs = outputPath.getFileSystem(context.getConfiguration)
+
+ if (algorithmVersion == 1) {
+ for (stat <- getAllCommittedTaskPaths(committer, context)) {
+ mergePaths(committer, fs, stat, outputPath)
+ }
+ } else {
+ invokeMethod(committer, "cleanupJob", Seq(classOf[JobContext]),
Seq(context))
+ val stagingOutput = new
Path(context.getConfiguration.get(FileOutputFormat.OUTDIR))
+ mergePaths(committer, fs, fs.getFileStatus(stagingOutput), outputPath)
Review comment:
Here, For algorithmVersion=2, it has committed task output to
stagingOutputPath, so I merge paths from stagingOutputPath to tablePath.
I think we don't need implement our own `commitTask` for
InsertIntoHadoopFsRelation operation when algorithmVersion=2, which may produce
partial result.
@cloud-fan
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]