Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/21066#discussion_r186474366
--- Diff:
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter,
PathOutputCommitter, PathOutputCommitterFactory}
+
+import org.apache.spark.internal.io.{FileCommitProtocol,
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+
+/**
+ * Spark Commit protocol for Path Output Committers.
+ * This committer will work with the `FileOutputCommitter` and subclasses.
+ * All implementations *must* be serializable.
+ *
+ * Rather than ask the `FileOutputFormat` for a committer, it uses the
+ * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory`
factory
+ * API to create the committer.
+ * This is what
[[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does,
+ * but as [[HadoopMapReduceCommitProtocol]] still uses the original
+ * `org.apache.hadoop.mapred.FileOutputFormat` binding
+ * subclasses do not do this, overrides those subclasses to using the
+ * factory mechanism now supported in the base class.
+ *
+ * In `setupCommitter` the factory is bonded to and the committer for
+ * the destination path chosen.
+ *
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ * so that committers for stores which do not support rename
+ * will not get confused.
+ * @param jobId job
+ * @param destination destination
+ * @param dynamicPartitionOverwrite does the caller want support for
dynamic
+ * partition overwrite. If so, it will be
+ * refused.
+ * @throws IOException when an unsupported dynamicPartitionOverwrite
option is supplied.
+ */
+class PathOutputCommitProtocol(
+ jobId: String,
+ destination: String,
+ dynamicPartitionOverwrite: Boolean = false)
+ extends HadoopMapReduceCommitProtocol(
+ jobId,
+ destination,
+ false) with Serializable {
+
+ @transient var committer: PathOutputCommitter = _
+
+ require(destination != null, "Null destination specified")
+
+ val destPath = new Path(destination)
+
+ logInfo(s"Instantiated committer with job ID=$jobId;" +
+ s" destination=$destPath;" +
+ s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
+
+ if (dynamicPartitionOverwrite) {
+ // until there's explicit extensions to the PathOutputCommitProtocols
+ // to support the spark mechanism, it's left to the individual
committer
+ // choice to handle partitioning.
+ throw new IOException("PathOutputCommitProtocol does not support
dynamicPartitionOverwrite")
+ }
+
+ import PathOutputCommitProtocol._
+
+ /**
+ * Set up the committer.
+ * This creates it by talking directly to the Hadoop factories, instead
+ * of the V1 `mapred.FileOutputFormat` methods.
+ * @param context task attempt
+ * @return the committer to use. This will always be a subclass of
+ * [[PathOutputCommitter]].
+ */
+ override protected def setupCommitter(
+ context: TaskAttemptContext): PathOutputCommitter = {
+
+ logInfo(s"Setting up committer for path $destination")
+ committer = PathOutputCommitterFactory.createCommitter(destPath,
context)
+
+ // Special feature to force out the FileOutputCommitter, so as to
guarantee
+ // that the binding is working properly.
+ val rejectFileOutput = context.getConfiguration
+ .getBoolean(REJECT_FILE_OUTPUT, REJECT_FILE_OUTPUT_DEFVAL)
+ if (rejectFileOutput && committer.isInstanceOf[FileOutputCommitter]) {
+ // the output format returned a file output format committer, which
+ // is exactly what we do not want. So switch back to the factory.
+ val factory = PathOutputCommitterFactory.getCommitterFactory(
+ destPath,
+ context.getConfiguration)
+ logInfo(s"Using committer factory $factory")
+ committer = factory.createOutputCommitter(destPath, context)
+ }
+
+ logInfo(s"Using committer ${committer.getClass}")
+ logInfo(s"Committer details: $committer")
+ if (committer.isInstanceOf[FileOutputCommitter]) {
+ require(!rejectFileOutput,
+ s"Committer created is the FileOutputCommitter $committer")
+
+ if (committer.isCommitJobRepeatable(context)) {
+ // If FileOutputCommitter says its job commit is repeatable, it
means
+ // it is using the v2 algorithm, which is not safe for task commit
+ // failures. Warn
+ logWarning(s"Committer $committer may not be tolerant of task
commit failures")
+ }
+ }
+ committer
+ }
+
+ /**
+ * Create a temporary file for a task.
+ *
+ * @param taskContext task context
+ * @param dir optional subdirectory
+ * @param ext file extension
+ * @return a path as a string
+ */
+ override def newTaskTempFile(
+ taskContext: TaskAttemptContext,
+ dir: Option[String],
+ ext: String): String = {
+
+ val workDir = committer.getWorkPath
+ val parent = dir.map(d => new Path(workDir, d)).getOrElse(workDir)
+ val file = new Path(parent, buildFilename(taskContext, ext))
+ logInfo(s"Creating task file $file for dir $dir and ext $ext")
+ file.toString
+ }
+
+ /**
+ * Absolute files are still renamed into place with a warning.
--- End diff --
Added in the comments, not the message
1. its inefficient on any store where the copy is `O(data)`
1. If the store caches negative HEAD/GET responses briefly (i.e S3 load
balancers), then if there's an attempt to copy a file almost immediately after
completing the PUT for that file, then that rename may actually fail, "file not
found". Which can only happen here if the job is committed immediately after
this task runs so the interval between `newTaskTempFileAbsPath()` and
`HadoopMapReduceCommitProtocol.commitJob()` is within the negative cache
interval, *and* they all share the same load balancer.
Risk #2 is those kind of problems which rarely surfaces in testing. It
could be handled in `HadoopMapReduceCommitProtocol` with some retries around
FileNotFoundException, if it was really found to be a problem. I've also
promised the hotels.com circus-train team an option to create a file without
doing that existence check...
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]