cloud-fan commented on a change in pull request #32881:
URL: https://github.com/apache/spark/pull/32881#discussion_r655183467



##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * An interface to define how a single Spark job names its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the instance instantiated on 
the driver
+ *    will be used for tasks on executors.
+ * 2. An instance should not be reused across multiple Spark jobs.
+ *
+ * The proper way to call is:
+ *
+ * As part of each task's execution, whenever a new output file needs be 
created, executor calls
+ * [[getTaskTempPath]] to get a valid relative file path before commit.
+ */
+abstract class FileNamingProtocol {
+
+  /**
+   * Gets the relative path should be used for the output file.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "fileContext" if a task is going to write out multiple files to the same 
directory. The file
+   * naming protocol only guarantees that files written by different tasks 
will not conflict.
+   */
+  def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: 
FileContext): String
+}
+
+/**
+ * The context for Spark output file. This is used by [[FileNamingProtocol]] 
to create file path.
+ *
+ * @param ext Source specific file extension, e.g. ".snappy.parquet".
+ * @param relativeDir Relative directory of file. Can be used for writing 
dynamic partitions.
+ *                    E.g., "a=1/b=2" is directory for partition (a=1, b=2).
+ * @param prefix file prefix.
+ */
+final case class FileContext(
+  ext: String,
+  relativeDir: Option[String],

Review comment:
       I'm not very sure about it. It seems more clear to me to let 
`FileNamingProtocol` only generate filename, and the caller side should 
construct the proper relative path w.r.t. the partition dir.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingFileNamingProtocol.scala
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.internal.io.{FileContext, FileNamingProtocol}
+
+/**
+ * A [[FileNamingProtocol]] implementation to write output data in streaming 
processing.
+ */
+class StreamingFileNamingProtocol(jobId: String) extends FileNamingProtocol 
with Serializable {
+
+  override def getTaskTempPath(
+      taskContext: TaskAttemptContext, fileContext: FileContext): String = {
+    // The file name looks like 
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+    // the file name is fine and won't overflow.
+    val split = taskContext.getTaskAttemptID.getTaskID.getId
+    val uuid = UUID.randomUUID.toString
+    val ext = fileContext.ext
+    val filename = f"part-$split%05d-$uuid$ext"

Review comment:
       shall we fail here if `prefix` is not None? or support `prefix` here?

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * An interface to define how a single Spark job names its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the instance instantiated on 
the driver
+ *    will be used for tasks on executors.
+ * 2. An instance should not be reused across multiple Spark jobs.
+ *
+ * The proper way to call is:
+ *
+ * As part of each task's execution, whenever a new output file needs be 
created, executor calls
+ * [[getTaskTempPath]] to get a valid relative file path before commit.
+ */
+abstract class FileNamingProtocol {
+
+  /**
+   * Gets the relative path should be used for the output file.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "fileContext" if a task is going to write out multiple files to the same 
directory. The file
+   * naming protocol only guarantees that files written by different tasks 
will not conflict.
+   */
+  def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: 
FileContext): String
+}
+
+/**
+ * The context for Spark output file. This is used by [[FileNamingProtocol]] 
to create file path.
+ *
+ * @param ext Source specific file extension, e.g. ".snappy.parquet".
+ * @param relativeDir Relative directory of file. Can be used for writing 
dynamic partitions.
+ *                    E.g., "a=1/b=2" is directory for partition (a=1, b=2).
+ * @param prefix file prefix.
+ */
+final case class FileContext(
+  ext: String,
+  relativeDir: Option[String],

Review comment:
       same to `ext` and `prefix`: can we let the caller side to pretend/append 
`ext`/`prefix` to the generated file name?

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * An interface to define how a single Spark job names its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the instance instantiated on 
the driver
+ *    will be used for tasks on executors.
+ * 2. An instance should not be reused across multiple Spark jobs.
+ *
+ * The proper way to call is:
+ *
+ * As part of each task's execution, whenever a new output file needs be 
created, executor calls
+ * [[getTaskTempPath]] to get a valid relative file path before commit.
+ */
+abstract class FileNamingProtocol {
+
+  /**
+   * Gets the relative path should be used for the output file.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "fileContext" if a task is going to write out multiple files to the same 
directory. The file
+   * naming protocol only guarantees that files written by different tasks 
will not conflict.
+   */
+  def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: 
FileContext): String
+}
+
+/**
+ * The context for Spark output file. This is used by [[FileNamingProtocol]] 
to create file path.
+ *
+ * @param ext Source specific file extension, e.g. ".snappy.parquet".
+ * @param relativeDir Relative directory of file. Can be used for writing 
dynamic partitions.
+ *                    E.g., "a=1/b=2" is directory for partition (a=1, b=2).
+ * @param prefix file prefix.
+ */
+final case class FileContext(
+  ext: String,
+  relativeDir: Option[String],

Review comment:
       All the information in `FileContext` is not something that the impl can 
customize: the generated file name must have `ext` and the end, `prefix` at the 
beginning, and `relativeDir` as the parent dir. Then it's better to let the 
caller side to guarantee it.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
##########
@@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging {
    * Notifies the commit protocol to add a new file, and gets back the full 
path that should be
    * used. Must be called on the executors when running tasks.
    *
-   * Note that the returned temp file may have an arbitrary path. The commit 
protocol only
-   * promises that the file will be at the location specified by the arguments 
after job commit.
+   * Note that "relativePath" parameter specifies the relative path of 
returned temp file. The full
+   * path is left to the commit protocol to decide. The commit protocol only 
promises that the file
+   * will be at the location specified by the relative path after job commits.
    *
-   * A full file path consists of the following parts:
-   *  1. the base path
-   *  2. some sub-directory within the base path, used to specify partitioning
-   *  3. file prefix, usually some unique job id with the task id
-   *  4. bucket id
-   *  5. source specific file extension, e.g. ".snappy.parquet"
-   *
-   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 
5, and the rest
-   * are left to the commit protocol implementation to decide.
-   *
-   * Important: it is the caller's responsibility to add uniquely identifying 
content to "ext"
-   * if a task is going to write out multiple files to the same dir. The file 
commit protocol only
-   * guarantees that files written by different tasks will not conflict.
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "relativePath" if a task is going to write out multiple files to the same 
directory. The file
+   * commit protocol only guarantees that files written by different tasks 
will not conflict.
    */
-  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], 
ext: String): String
+  def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): 
String
 
   /**
    * Similar to newTaskTempFile(), but allows files to committed to an 
absolute output location.
    * Depending on the implementation, there may be weaker guarantees around 
adding files this way.
    *
-   * Important: it is the caller's responsibility to add uniquely identifying 
content to "ext"
-   * if a task is going to write out multiple files to the same dir. The file 
commit protocol only
-   * guarantees that files written by different tasks will not conflict.
+   * "relativePath" parameter specifies the relative path of returned temp 
file, and "finalPath"
+   * parameter specifies the full path of file after job commit. The commit 
protocol promises that
+   * the file will be at the location specified by the "finalPath" after job 
commits.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "relativePath" and "finalPath" if a task is going to write out multiple 
files to the same
+   * directory. The file commit protocol only guarantees that files written by 
different tasks will
+   * not conflict.
    */
   def newTaskTempFileAbsPath(
-      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String
+      taskContext: TaskAttemptContext, relativePath: String, finalPath: 
String): String

Review comment:
       note: I can see that, this API makes the code simpler, but it makes the 
semantic a bit more complicated: what if the final path doesn't have the same 
name as the `relativePath`? Maybe it's better to have `fileName: String, 
finalDir: String`. Then the semantic is pretty here: the impl should commit the 
new file to the target dir.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
##########
@@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging {
    * Notifies the commit protocol to add a new file, and gets back the full 
path that should be
    * used. Must be called on the executors when running tasks.
    *
-   * Note that the returned temp file may have an arbitrary path. The commit 
protocol only
-   * promises that the file will be at the location specified by the arguments 
after job commit.
+   * Note that "relativePath" parameter specifies the relative path of 
returned temp file. The full
+   * path is left to the commit protocol to decide. The commit protocol only 
promises that the file
+   * will be at the location specified by the relative path after job commits.
    *
-   * A full file path consists of the following parts:
-   *  1. the base path
-   *  2. some sub-directory within the base path, used to specify partitioning
-   *  3. file prefix, usually some unique job id with the task id
-   *  4. bucket id
-   *  5. source specific file extension, e.g. ".snappy.parquet"
-   *
-   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 
5, and the rest
-   * are left to the commit protocol implementation to decide.
-   *
-   * Important: it is the caller's responsibility to add uniquely identifying 
content to "ext"
-   * if a task is going to write out multiple files to the same dir. The file 
commit protocol only
-   * guarantees that files written by different tasks will not conflict.
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "relativePath" if a task is going to write out multiple files to the same 
directory. The file
+   * commit protocol only guarantees that files written by different tasks 
will not conflict.
    */
-  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], 
ext: String): String
+  def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): 
String
 
   /**
    * Similar to newTaskTempFile(), but allows files to committed to an 
absolute output location.
    * Depending on the implementation, there may be weaker guarantees around 
adding files this way.
    *
-   * Important: it is the caller's responsibility to add uniquely identifying 
content to "ext"
-   * if a task is going to write out multiple files to the same dir. The file 
commit protocol only
-   * guarantees that files written by different tasks will not conflict.
+   * "relativePath" parameter specifies the relative path of returned temp 
file, and "finalPath"
+   * parameter specifies the full path of file after job commit. The commit 
protocol promises that
+   * the file will be at the location specified by the "finalPath" after job 
commits.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "relativePath" and "finalPath" if a task is going to write out multiple 
files to the same
+   * directory. The file commit protocol only guarantees that files written by 
different tasks will
+   * not conflict.
    */
   def newTaskTempFileAbsPath(
-      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String
+      taskContext: TaskAttemptContext, relativePath: String, finalPath: 
String): String

Review comment:
       note: I can see that, this API makes the code simpler, but it makes the 
semantic a bit more complicated. What if the final path doesn't have the same 
name as the `relativePath`? Maybe it's better to have `fileName: String, 
finalDir: String`. Then the semantic is pretty here: the impl should commit the 
new file to the target dir.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
##########
@@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging {
    * Notifies the commit protocol to add a new file, and gets back the full 
path that should be
    * used. Must be called on the executors when running tasks.
    *
-   * Note that the returned temp file may have an arbitrary path. The commit 
protocol only
-   * promises that the file will be at the location specified by the arguments 
after job commit.
+   * Note that "relativePath" parameter specifies the relative path of 
returned temp file. The full
+   * path is left to the commit protocol to decide. The commit protocol only 
promises that the file
+   * will be at the location specified by the relative path after job commits.
    *
-   * A full file path consists of the following parts:
-   *  1. the base path
-   *  2. some sub-directory within the base path, used to specify partitioning
-   *  3. file prefix, usually some unique job id with the task id
-   *  4. bucket id
-   *  5. source specific file extension, e.g. ".snappy.parquet"
-   *
-   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 
5, and the rest
-   * are left to the commit protocol implementation to decide.
-   *
-   * Important: it is the caller's responsibility to add uniquely identifying 
content to "ext"
-   * if a task is going to write out multiple files to the same dir. The file 
commit protocol only
-   * guarantees that files written by different tasks will not conflict.
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "relativePath" if a task is going to write out multiple files to the same 
directory. The file
+   * commit protocol only guarantees that files written by different tasks 
will not conflict.
    */
-  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], 
ext: String): String
+  def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): 
String
 
   /**
    * Similar to newTaskTempFile(), but allows files to committed to an 
absolute output location.
    * Depending on the implementation, there may be weaker guarantees around 
adding files this way.
    *
-   * Important: it is the caller's responsibility to add uniquely identifying 
content to "ext"
-   * if a task is going to write out multiple files to the same dir. The file 
commit protocol only
-   * guarantees that files written by different tasks will not conflict.
+   * "relativePath" parameter specifies the relative path of returned temp 
file, and "finalPath"
+   * parameter specifies the full path of file after job commit. The commit 
protocol promises that
+   * the file will be at the location specified by the "finalPath" after job 
commits.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "relativePath" and "finalPath" if a task is going to write out multiple 
files to the same
+   * directory. The file commit protocol only guarantees that files written by 
different tasks will
+   * not conflict.
    */
   def newTaskTempFileAbsPath(
-      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String
+      taskContext: TaskAttemptContext, relativePath: String, finalPath: 
String): String

Review comment:
       note: I can see that, this API makes the code simpler, but it makes the 
semantic a bit more complicated. What if the final path doesn't have the same 
file name as the `relativePath`? Maybe it's better to have `fileName: String, 
finalDir: String`. Then the semantic is pretty here: the impl should commit the 
new file to the target dir.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
##########
@@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging {
    * Notifies the commit protocol to add a new file, and gets back the full 
path that should be
    * used. Must be called on the executors when running tasks.
    *
-   * Note that the returned temp file may have an arbitrary path. The commit 
protocol only
-   * promises that the file will be at the location specified by the arguments 
after job commit.
+   * Note that "relativePath" parameter specifies the relative path of 
returned temp file. The full
+   * path is left to the commit protocol to decide. The commit protocol only 
promises that the file
+   * will be at the location specified by the relative path after job commits.
    *
-   * A full file path consists of the following parts:
-   *  1. the base path
-   *  2. some sub-directory within the base path, used to specify partitioning
-   *  3. file prefix, usually some unique job id with the task id
-   *  4. bucket id
-   *  5. source specific file extension, e.g. ".snappy.parquet"
-   *
-   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 
5, and the rest
-   * are left to the commit protocol implementation to decide.
-   *
-   * Important: it is the caller's responsibility to add uniquely identifying 
content to "ext"
-   * if a task is going to write out multiple files to the same dir. The file 
commit protocol only
-   * guarantees that files written by different tasks will not conflict.
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "relativePath" if a task is going to write out multiple files to the same 
directory. The file
+   * commit protocol only guarantees that files written by different tasks 
will not conflict.
    */
-  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], 
ext: String): String
+  def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): 
String
 
   /**
    * Similar to newTaskTempFile(), but allows files to committed to an 
absolute output location.
    * Depending on the implementation, there may be weaker guarantees around 
adding files this way.
    *
-   * Important: it is the caller's responsibility to add uniquely identifying 
content to "ext"
-   * if a task is going to write out multiple files to the same dir. The file 
commit protocol only
-   * guarantees that files written by different tasks will not conflict.
+   * "relativePath" parameter specifies the relative path of returned temp 
file, and "finalPath"
+   * parameter specifies the full path of file after job commit. The commit 
protocol promises that
+   * the file will be at the location specified by the "finalPath" after job 
commits.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "relativePath" and "finalPath" if a task is going to write out multiple 
files to the same
+   * directory. The file commit protocol only guarantees that files written by 
different tasks will
+   * not conflict.
    */
   def newTaskTempFileAbsPath(
-      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String
+      taskContext: TaskAttemptContext, relativePath: String, finalPath: 
String): String

Review comment:
       note: I can see that, this API makes the code simpler, but it makes the 
semantic a bit more complicated. What if the final path doesn't have the same 
file name as the `relativePath`? Maybe it's better to have `fileName: String, 
targetDir: String`. Then the semantic is pretty here: the impl should commit 
the new file to the target dir.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * An interface to define how a single Spark job names its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the instance instantiated on 
the driver
+ *    will be used for tasks on executors.
+ * 2. An instance should not be reused across multiple Spark jobs.
+ *
+ * The proper way to call is:
+ *
+ * As part of each task's execution, whenever a new output file needs be 
created, executor calls
+ * [[getTaskTempPath]] to get a valid relative file path before commit.
+ */
+abstract class FileNamingProtocol {
+
+  /**
+   * Gets the relative path should be used for the output file.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "fileContext" if a task is going to write out multiple files to the same 
directory. The file
+   * naming protocol only guarantees that files written by different tasks 
will not conflict.
+   */
+  def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: 
FileContext): String
+}
+
+/**
+ * The context for Spark output file. This is used by [[FileNamingProtocol]] 
to create file path.
+ *
+ * @param ext Source specific file extension, e.g. ".snappy.parquet".
+ * @param relativeDir Relative directory of file. Can be used for writing 
dynamic partitions.
+ *                    E.g., "a=1/b=2" is directory for partition (a=1, b=2).
+ * @param prefix file prefix.
+ */
+final case class FileContext(
+  ext: String,
+  relativeDir: Option[String],

Review comment:
       nvm, `FileNamingProtocol` needs to return an absolute path, so we must 
provide all these info as a `FileContext` and ask the impl to follow.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * An interface to define how a single Spark job names its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the instance instantiated on 
the driver
+ *    will be used for tasks on executors.
+ * 2. An instance should not be reused across multiple Spark jobs.
+ *
+ * The proper way to call is:
+ *
+ * As part of each task's execution, whenever a new output file needs be 
created, executor calls
+ * [[getTaskTempPath]] to get a valid relative file path before commit.
+ */
+abstract class FileNamingProtocol {
+
+  /**
+   * Gets the relative path should be used for the output file.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "fileContext" if a task is going to write out multiple files to the same 
directory. The file
+   * naming protocol only guarantees that files written by different tasks 
will not conflict.
+   */
+  def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: 
FileContext): String
+}
+
+/**
+ * The context for Spark output file. This is used by [[FileNamingProtocol]] 
to create file path.
+ *
+ * @param ext Source specific file extension, e.g. ".snappy.parquet".
+ * @param relativeDir Relative directory of file. Can be used for writing 
dynamic partitions.
+ *                    E.g., "a=1/b=2" is directory for partition (a=1, b=2).
+ * @param prefix file prefix.
+ */
+final case class FileContext(
+  ext: String,
+  relativeDir: Option[String],

Review comment:
       And the API can be simply named `getTaskFileName`.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * An interface to define how a single Spark job names its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the instance instantiated on 
the driver
+ *    will be used for tasks on executors.
+ * 2. An instance should not be reused across multiple Spark jobs.
+ *
+ * The proper way to call is:
+ *
+ * As part of each task's execution, whenever a new output file needs be 
created, executor calls
+ * [[getTaskTempPath]] to get a valid relative file path before commit.
+ */
+abstract class FileNamingProtocol {
+
+  /**
+   * Gets the relative path should be used for the output file.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "fileContext" if a task is going to write out multiple files to the same 
directory. The file
+   * naming protocol only guarantees that files written by different tasks 
will not conflict.
+   */
+  def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: 
FileContext): String
+}
+
+/**
+ * The context for Spark output file. This is used by [[FileNamingProtocol]] 
to create file path.
+ *
+ * @param ext Source specific file extension, e.g. ".snappy.parquet".
+ * @param relativeDir Relative directory of file. Can be used for writing 
dynamic partitions.
+ *                    E.g., "a=1/b=2" is directory for partition (a=1, b=2).
+ * @param prefix file prefix.
+ */
+final case class FileContext(
+  ext: String,
+  relativeDir: Option[String],

Review comment:
       This API is only to abstract the naming differences between batch and 
streaming file writing. If that's not necessary, maybe we can remove this 
abstraction entirely.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * An interface to define how a single Spark job names its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the instance instantiated on 
the driver
+ *    will be used for tasks on executors.
+ * 2. An instance should not be reused across multiple Spark jobs.
+ *
+ * The proper way to call is:
+ *
+ * As part of each task's execution, whenever a new output file needs be 
created, executor calls
+ * [[getTaskTempPath]] to get a valid relative file path before commit.
+ */
+abstract class FileNamingProtocol {
+
+  /**
+   * Gets the relative path should be used for the output file.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "fileContext" if a task is going to write out multiple files to the same 
directory. The file
+   * naming protocol only guarantees that files written by different tasks 
will not conflict.
+   */
+  def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: 
FileContext): String
+}
+
+/**
+ * The context for Spark output file. This is used by [[FileNamingProtocol]] 
to create file path.
+ *
+ * @param ext Source specific file extension, e.g. ".snappy.parquet".
+ * @param relativeDir Relative directory of file. Can be used for writing 
dynamic partitions.
+ *                    E.g., "a=1/b=2" is directory for partition (a=1, b=2).
+ * @param prefix file prefix.
+ */
+final case class FileContext(
+  ext: String,
+  relativeDir: Option[String],

Review comment:
       `jobId` is generated with UUID as well, I don't see why streaming write 
needs to generate a new UUID per file, instead of using job id.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * An interface to define how a single Spark job names its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the instance instantiated on 
the driver
+ *    will be used for tasks on executors.
+ * 2. An instance should not be reused across multiple Spark jobs.
+ *
+ * The proper way to call is:
+ *
+ * As part of each task's execution, whenever a new output file needs be 
created, executor calls
+ * [[getTaskTempPath]] to get a valid relative file path before commit.
+ */
+abstract class FileNamingProtocol {
+
+  /**
+   * Gets the relative path should be used for the output file.
+   *
+   * Important: it is the caller's responsibility to add uniquely identifying 
content to
+   * "fileContext" if a task is going to write out multiple files to the same 
directory. The file
+   * naming protocol only guarantees that files written by different tasks 
will not conflict.
+   */
+  def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: 
FileContext): String
+}
+
+/**
+ * The context for Spark output file. This is used by [[FileNamingProtocol]] 
to create file path.
+ *
+ * @param ext Source specific file extension, e.g. ".snappy.parquet".
+ * @param relativeDir Relative directory of file. Can be used for writing 
dynamic partitions.
+ *                    E.g., "a=1/b=2" is directory for partition (a=1, b=2).
+ * @param prefix file prefix.
+ */
+final case class FileContext(
+  ext: String,
+  relativeDir: Option[String],

Review comment:
       I'm cleaning this up in https://github.com/apache/spark/pull/33002




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to