attilapiros commented on code in PR #37468:
URL: https://github.com/apache/spark/pull/37468#discussion_r949452000


##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -114,10 +108,38 @@ class PathOutputCommitProtocol(
         // failures. Warn
         logTrace(s"Committer $committer may not be tolerant of task commit 
failures")
       }
+    } else {
+

Review Comment:
   Nit: remove this line



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -114,10 +108,38 @@ class PathOutputCommitProtocol(
         // failures. Warn
         logTrace(s"Committer $committer may not be tolerant of task commit 
failures")
       }
+    } else {
+
+      // if required other committers need to be checked for dynamic partition
+      // compatibility through a StreamCapabilities probe.
+      if (dynamicPartitionOverwrite) {
+        logDebug(s"Checking dynamic partition overwrite support in committer 
$committer")

Review Comment:
   Nit: remove this debug line as the next line will be a DEBUG or an exception 
with all the information about the committer. 



##########
hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala:
##########
@@ -49,18 +49,27 @@ class CommitterBindingSuite extends SparkFunSuite {
    * [[BindingParquetOutputCommitter]] committer bind to the schema-specific
    * committer declared for the destination path? And that lifecycle events
    * are correctly propagated?
+   * This only works with a hadoop build where BindingPathOutputCommitter
+   * does passthrough of stream capabilities, so check that first.
    */
   test("BindingParquetOutputCommitter binds to the inner committer") {
+

Review Comment:
   Nit: extra lines



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -140,6 +162,28 @@ class PathOutputCommitProtocol(
     file.toString
   }
 
+  /**
+   * Reject any requests for an absolute path file on a committer which
+   * is not compatible with it.
+   *
+   * @param taskContext task context
+   * @param absoluteDir final directory
+   * @param spec output filename
+   * @return a path string
+   * @throws UnsupportedOperationException if incompatible
+   */
+  override def newTaskTempFileAbsPath(
+    taskContext: TaskAttemptContext,
+    absoluteDir: String,
+    spec: FileNameSpec): String =  {
+
+    if (supportsDynamicPartitions) {
+      super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
+    } else {
+      throw new UnsupportedOperationException(s"Absolute output locations not 
supported" +
+        s" by committer $committer")
+    }
+  }

Review Comment:
   Do we really need this? I mean in the `setupCommitter()` we already checked 
the `supportsDynamicPartitions` and an exception was thrown.



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -51,14 +51,7 @@ class PathOutputCommitProtocol(
     jobId: String,
     dest: String,
     dynamicPartitionOverwrite: Boolean = false)
-  extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {
-
-  if (dynamicPartitionOverwrite) {
-    // until there's explicit extensions to the PathOutputCommitProtocols
-    // to support the spark mechanism, it's left to the individual committer
-    // choice to handle partitioning.
-    throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
-  }
+  extends HadoopMapReduceCommitProtocol(jobId, dest, 
dynamicPartitionOverwrite) with Serializable {

Review Comment:
   Please update the comment above the class:
   > dynamic partition overwrite is not supported, so that committers for 
stores which do not support rename will not get confused.



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -88,6 +81,7 @@ class PathOutputCommitProtocol(
     logTrace(s"Setting up committer for path $destination")
     committer = PathOutputCommitterFactory.createCommitter(destPath, context)
 
+

Review Comment:
   Nit: remove this line



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -161,7 +205,18 @@ object PathOutputCommitProtocol {
   val REJECT_FILE_OUTPUT_DEFVAL = false
 
   /** Error string for tests. */
-  private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not 
support" +
+  private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not 
support" +
     " dynamicPartitionOverwrite"
 
+

Review Comment:
   Nit: excess line



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -161,7 +205,18 @@ object PathOutputCommitProtocol {
   val REJECT_FILE_OUTPUT_DEFVAL = false
 
   /** Error string for tests. */
-  private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not 
support" +
+  private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not 
support" +
     " dynamicPartitionOverwrite"
 
+
+  /**
+     * Stream Capabilities probe for spark dynamic partitioning compatibility.
+     */
+  private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING = 
"mapreduce.job.committer.dynamic.partitioning"
+
+

Review Comment:
   Nit: extra line
   



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -114,10 +108,38 @@ class PathOutputCommitProtocol(
         // failures. Warn
         logTrace(s"Committer $committer may not be tolerant of task commit 
failures")
       }
+    } else {
+
+      // if required other committers need to be checked for dynamic partition
+      // compatibility through a StreamCapabilities probe.
+      if (dynamicPartitionOverwrite) {
+        logDebug(s"Checking dynamic partition overwrite support in committer 
$committer")
+        if (supportsDynamicPartitions) {
+          logDebug(
+            s"Committer $committer has declared compatibility with dynamic 
partition overwrite")
+        } else {
+          throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + 
committer)
+        }
+      }
     }
+

Review Comment:
   Nit: remove these 3 lines.



##########
hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala:
##########
@@ -49,18 +49,27 @@ class CommitterBindingSuite extends SparkFunSuite {
    * [[BindingParquetOutputCommitter]] committer bind to the schema-specific
    * committer declared for the destination path? And that lifecycle events
    * are correctly propagated?
+   * This only works with a hadoop build where BindingPathOutputCommitter
+   * does passthrough of stream capabilities, so check that first.
    */
   test("BindingParquetOutputCommitter binds to the inner committer") {
+
+
     val path = new Path("http://example/data";)
     val job = newJob(path)
     val conf = job.getConfiguration
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
     conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
 
-    StubPathOutputCommitterFactory.bind(conf, "http")
-    val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+
+
+    StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
+    val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, 
taskAttemptId0)
+
+

Review Comment:
   Nit: extra lines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to