attilapiros commented on code in PR #37468:
URL: https://github.com/apache/spark/pull/37468#discussion_r964124161


##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -114,10 +112,33 @@ class PathOutputCommitProtocol(
         // failures. Warn
         logTrace(s"Committer $committer may not be tolerant of task commit 
failures")
       }
+    } else {
+      // if required other committers need to be checked for dynamic partition
+      // compatibility through a StreamCapabilities probe.
+      if (dynamicPartitionOverwrite) {
+        if (supportsDynamicPartitions) {
+          logDebug(
+            s"Committer $committer has declared compatibility with dynamic 
partition overwrite")
+        } else {
+          throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + 
committer)
+        }
+      }
     }
     committer
   }
 
+
+  /**
+   * Does the instantiated committer support dynamic partitions?
+   * @return true if the committer declares itself compatible.
+   */
+  private def supportsDynamicPartitions = {
+    committer.isInstanceOf[FileOutputCommitter] ||

Review Comment:
   Do we change the old behaviour because of  the 
`committer.isInstanceOf[FileOutputCommitter]` part of the condition? As before 
this PR we were simply throwing an exception.
   If this is a change then the `Does this PR introduce any user-facing 
change?` section of the PR description must be updated too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to