ulysses-you commented on a change in pull request #34568:
URL: https://github.com/apache/spark/pull/34568#discussion_r808658263



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
##########
@@ -204,29 +158,26 @@ object FileFormatWriter extends Logging {
     // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
     committer.setupJob(job)
 
+    val sortColumns = getBucketSortColumns(bucketSpec, dataColumns)
     try {
-      val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
-        (empty2NullPlan.execute(), None)
+      val maxWriters = 
sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
+      val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
+      val concurrentOutputWriterSpec = if (concurrentWritersEnabled) {
+        val output = empty2NullPlan.output
+        val enableRadixSort = sparkSession.sessionState.conf.enableRadixSort
+        val outputSchema = empty2NullPlan.schema
+        Some(ConcurrentOutputWriterSpec(maxWriters,
+          () => SortExec.createSorter(

Review comment:
       Look at the previous code, we create and eval a SortExec is mainly for 
the ordering of dynamic partition. For the concurrent writers, we only need the 
sorter. After we pull out the sort, create a new SortExec seems overkill.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to