cloud-fan commented on a change in pull request #32881: URL: https://github.com/apache/spark/pull/32881#discussion_r655183467
########## File path: core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An interface to define how a single Spark job names its outputs. Two notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskTempPath]] to get a valid relative file path before commit. + */ +abstract class FileNamingProtocol { + + /** + * Gets the relative path should be used for the output file. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Source specific file extension, e.g. ".snappy.parquet". + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param prefix file prefix. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], Review comment: I'm not very sure about it. It seems more clear to me to let `FileNamingProtocol` only generate filename, and the caller side should construct the proper relative path w.r.t. the partition dir. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingFileNamingProtocol.scala ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.UUID + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.internal.io.{FileContext, FileNamingProtocol} + +/** + * A [[FileNamingProtocol]] implementation to write output data in streaming processing. + */ +class StreamingFileNamingProtocol(jobId: String) extends FileNamingProtocol with Serializable { + + override def getTaskTempPath( + taskContext: TaskAttemptContext, fileContext: FileContext): String = { + // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + val uuid = UUID.randomUUID.toString + val ext = fileContext.ext + val filename = f"part-$split%05d-$uuid$ext" Review comment: shall we fail here if `prefix` is not None? or support `prefix` here? ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An interface to define how a single Spark job names its outputs. Two notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskTempPath]] to get a valid relative file path before commit. + */ +abstract class FileNamingProtocol { + + /** + * Gets the relative path should be used for the output file. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Source specific file extension, e.g. ".snappy.parquet". + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param prefix file prefix. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], Review comment: same to `ext` and `prefix`: can we let the caller side to pretend/append `ext`/`prefix` to the generated file name? ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An interface to define how a single Spark job names its outputs. Two notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskTempPath]] to get a valid relative file path before commit. + */ +abstract class FileNamingProtocol { + + /** + * Gets the relative path should be used for the output file. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Source specific file extension, e.g. ".snappy.parquet". + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param prefix file prefix. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], Review comment: All the information in `FileContext` is not something that the impl can customize: the generated file name must have `ext` and the end, `prefix` at the beginning, and `relativeDir` as the parent dir. Then it's better to let the caller side to guarantee it. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ########## @@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging { * Notifies the commit protocol to add a new file, and gets back the full path that should be * used. Must be called on the executors when running tasks. * - * Note that the returned temp file may have an arbitrary path. The commit protocol only - * promises that the file will be at the location specified by the arguments after job commit. + * Note that "relativePath" parameter specifies the relative path of returned temp file. The full + * path is left to the commit protocol to decide. The commit protocol only promises that the file + * will be at the location specified by the relative path after job commits. * - * A full file path consists of the following parts: - * 1. the base path - * 2. some sub-directory within the base path, used to specify partitioning - * 3. file prefix, usually some unique job id with the task id - * 4. bucket id - * 5. source specific file extension, e.g. ".snappy.parquet" - * - * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest - * are left to the commit protocol implementation to decide. - * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" if a task is going to write out multiple files to the same directory. The file + * commit protocol only guarantees that files written by different tasks will not conflict. */ - def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String /** * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. * Depending on the implementation, there may be weaker guarantees around adding files this way. * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. + * "relativePath" parameter specifies the relative path of returned temp file, and "finalPath" + * parameter specifies the full path of file after job commit. The commit protocol promises that + * the file will be at the location specified by the "finalPath" after job commits. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" and "finalPath" if a task is going to write out multiple files to the same + * directory. The file commit protocol only guarantees that files written by different tasks will + * not conflict. */ def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String Review comment: note: I can see that, this API makes the code simpler, but it makes the semantic a bit more complicated: what if the final path doesn't have the same name as the `relativePath`? Maybe it's better to have `fileName: String, finalDir: String`. Then the semantic is pretty here: the impl should commit the new file to the target dir. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ########## @@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging { * Notifies the commit protocol to add a new file, and gets back the full path that should be * used. Must be called on the executors when running tasks. * - * Note that the returned temp file may have an arbitrary path. The commit protocol only - * promises that the file will be at the location specified by the arguments after job commit. + * Note that "relativePath" parameter specifies the relative path of returned temp file. The full + * path is left to the commit protocol to decide. The commit protocol only promises that the file + * will be at the location specified by the relative path after job commits. * - * A full file path consists of the following parts: - * 1. the base path - * 2. some sub-directory within the base path, used to specify partitioning - * 3. file prefix, usually some unique job id with the task id - * 4. bucket id - * 5. source specific file extension, e.g. ".snappy.parquet" - * - * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest - * are left to the commit protocol implementation to decide. - * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" if a task is going to write out multiple files to the same directory. The file + * commit protocol only guarantees that files written by different tasks will not conflict. */ - def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String /** * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. * Depending on the implementation, there may be weaker guarantees around adding files this way. * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. + * "relativePath" parameter specifies the relative path of returned temp file, and "finalPath" + * parameter specifies the full path of file after job commit. The commit protocol promises that + * the file will be at the location specified by the "finalPath" after job commits. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" and "finalPath" if a task is going to write out multiple files to the same + * directory. The file commit protocol only guarantees that files written by different tasks will + * not conflict. */ def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String Review comment: note: I can see that, this API makes the code simpler, but it makes the semantic a bit more complicated. What if the final path doesn't have the same name as the `relativePath`? Maybe it's better to have `fileName: String, finalDir: String`. Then the semantic is pretty here: the impl should commit the new file to the target dir. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ########## @@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging { * Notifies the commit protocol to add a new file, and gets back the full path that should be * used. Must be called on the executors when running tasks. * - * Note that the returned temp file may have an arbitrary path. The commit protocol only - * promises that the file will be at the location specified by the arguments after job commit. + * Note that "relativePath" parameter specifies the relative path of returned temp file. The full + * path is left to the commit protocol to decide. The commit protocol only promises that the file + * will be at the location specified by the relative path after job commits. * - * A full file path consists of the following parts: - * 1. the base path - * 2. some sub-directory within the base path, used to specify partitioning - * 3. file prefix, usually some unique job id with the task id - * 4. bucket id - * 5. source specific file extension, e.g. ".snappy.parquet" - * - * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest - * are left to the commit protocol implementation to decide. - * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" if a task is going to write out multiple files to the same directory. The file + * commit protocol only guarantees that files written by different tasks will not conflict. */ - def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String /** * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. * Depending on the implementation, there may be weaker guarantees around adding files this way. * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. + * "relativePath" parameter specifies the relative path of returned temp file, and "finalPath" + * parameter specifies the full path of file after job commit. The commit protocol promises that + * the file will be at the location specified by the "finalPath" after job commits. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" and "finalPath" if a task is going to write out multiple files to the same + * directory. The file commit protocol only guarantees that files written by different tasks will + * not conflict. */ def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String Review comment: note: I can see that, this API makes the code simpler, but it makes the semantic a bit more complicated. What if the final path doesn't have the same file name as the `relativePath`? Maybe it's better to have `fileName: String, finalDir: String`. Then the semantic is pretty here: the impl should commit the new file to the target dir. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ########## @@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging { * Notifies the commit protocol to add a new file, and gets back the full path that should be * used. Must be called on the executors when running tasks. * - * Note that the returned temp file may have an arbitrary path. The commit protocol only - * promises that the file will be at the location specified by the arguments after job commit. + * Note that "relativePath" parameter specifies the relative path of returned temp file. The full + * path is left to the commit protocol to decide. The commit protocol only promises that the file + * will be at the location specified by the relative path after job commits. * - * A full file path consists of the following parts: - * 1. the base path - * 2. some sub-directory within the base path, used to specify partitioning - * 3. file prefix, usually some unique job id with the task id - * 4. bucket id - * 5. source specific file extension, e.g. ".snappy.parquet" - * - * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest - * are left to the commit protocol implementation to decide. - * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" if a task is going to write out multiple files to the same directory. The file + * commit protocol only guarantees that files written by different tasks will not conflict. */ - def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String /** * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. * Depending on the implementation, there may be weaker guarantees around adding files this way. * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. + * "relativePath" parameter specifies the relative path of returned temp file, and "finalPath" + * parameter specifies the full path of file after job commit. The commit protocol promises that + * the file will be at the location specified by the "finalPath" after job commits. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" and "finalPath" if a task is going to write out multiple files to the same + * directory. The file commit protocol only guarantees that files written by different tasks will + * not conflict. */ def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String Review comment: note: I can see that, this API makes the code simpler, but it makes the semantic a bit more complicated. What if the final path doesn't have the same file name as the `relativePath`? Maybe it's better to have `fileName: String, targetDir: String`. Then the semantic is pretty here: the impl should commit the new file to the target dir. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An interface to define how a single Spark job names its outputs. Two notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskTempPath]] to get a valid relative file path before commit. + */ +abstract class FileNamingProtocol { + + /** + * Gets the relative path should be used for the output file. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Source specific file extension, e.g. ".snappy.parquet". + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param prefix file prefix. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], Review comment: nvm, `FileNamingProtocol` needs to return an absolute path, so we must provide all these info as a `FileContext` and ask the impl to follow. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An interface to define how a single Spark job names its outputs. Two notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskTempPath]] to get a valid relative file path before commit. + */ +abstract class FileNamingProtocol { + + /** + * Gets the relative path should be used for the output file. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Source specific file extension, e.g. ".snappy.parquet". + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param prefix file prefix. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], Review comment: And the API can be simply named `getTaskFileName`. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An interface to define how a single Spark job names its outputs. Two notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskTempPath]] to get a valid relative file path before commit. + */ +abstract class FileNamingProtocol { + + /** + * Gets the relative path should be used for the output file. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Source specific file extension, e.g. ".snappy.parquet". + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param prefix file prefix. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], Review comment: This API is only to abstract the naming differences between batch and streaming file writing. If that's not necessary, maybe we can remove this abstraction entirely. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An interface to define how a single Spark job names its outputs. Two notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskTempPath]] to get a valid relative file path before commit. + */ +abstract class FileNamingProtocol { + + /** + * Gets the relative path should be used for the output file. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Source specific file extension, e.g. ".snappy.parquet". + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param prefix file prefix. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], Review comment: `jobId` is generated with UUID as well, I don't see why streaming write needs to generate a new UUID per file, instead of using job id. ########## File path: core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An interface to define how a single Spark job names its outputs. Two notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskTempPath]] to get a valid relative file path before commit. + */ +abstract class FileNamingProtocol { + + /** + * Gets the relative path should be used for the output file. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Source specific file extension, e.g. ".snappy.parquet". + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param prefix file prefix. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], Review comment: I'm cleaning this up in https://github.com/apache/spark/pull/33002 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
