c21 commented on a change in pull request #32881:
URL: https://github.com/apache/spark/pull/32881#discussion_r654110998
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
##########
@@ -100,9 +101,34 @@ abstract class FileCommitProtocol extends Logging {
* if a task is going to write out multiple files to the same dir. The file
commit protocol only
* guarantees that files written by different tasks will not conflict.
*/
+ @deprecated("use newTaskFile", "3.2.0")
def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String):
String
+ /**
+ * Notifies the commit protocol that a new file is added. Must be called on
the executors when
+ * running tasks.
+ *
+ * The "stagingPath" parameter is the current path of new file. The
"finalPath" parameter if
+ * specified, is the final path of file. The "finalPath" parameter is
optional here because
+ * caller can leave up to file commit protocol to decide the final path. The
"stagingDir"
+ * parameter if specified, is the sub-directory used to specify dynamic
partitioning. The
+ * "stagingDir" parameter is optional here for non-dynamic partitioning.
+ *
+ * Important: it is the caller's responsibility to add uniquely identifying
content to
+ * `stagingPath` and `finalPath`. The file commit protocol only guarantees
that files written by
+ * different tasks will not conflict. This API should be preferred to use
instead of deprecated
+ * [[newTaskTempFile]] and [[newTaskTempFileAbsPath]].
+ */
+ def newTaskFile(
+ taskContext: TaskAttemptContext,
+ stagingPath: String,
+ finalPath: Option[String],
+ stagingDir: Option[String]): Unit = {
+ // No-op as default implementation to be backward compatible with custom
[[FileCommitProtocol]]
+ // implementations before Spark 3.2.0.
Review comment:
@cloud-fan - I think it's impossible for me to keep backward
compatibility cleanly. Updated here to just introduce new method and break the
API.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1192,6 +1192,13 @@ object SQLConf {
.createWithDefault(
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
+ val FILE_NAMING_PROTOCOL_CLASS =
+ buildConf("spark.sql.sources.namingProtocolClass")
Review comment:
@cloud-fan - yes I think so. Before this PR, file naming is part of
commit protocol, and each commit protocol checked in Spark code base, has their
own specification for naming - `HadoopMapReduceCommitProtocol`,
`PathOutputCommitProtocol` and `ManifestFileCommitProtocol`. So the external
commit protocol we should expect they might already have their custom way of
naming, and we should allow them to implement their own naming protocol.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1192,6 +1192,13 @@ object SQLConf {
.createWithDefault(
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
+ val FILE_NAMING_PROTOCOL_CLASS =
+ buildConf("spark.sql.sources.namingProtocolClass")
Review comment:
Updated per discussion offline. Keep naming protocol internal and not
user-facing via config.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]