ulysses-you commented on code in PR #38939:
URL: https://github.com/apache/spark/pull/38939#discussion_r1042862929


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala:
##########
@@ -223,6 +224,19 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     doExecuteColumnar()
   }
 
+  /**
+   * Returns the result of writes as an RDD[WriterCommitMessage] variable by 
delegating to
+   * `doExecuteWrite` after preparations.
+   *
+   * Concrete implementations of SparkPlan should override `doExecuteWrite`.
+   */
+  def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = 
executeQuery {

Review Comment:
   Let me list the required things of current v1 write files:
   
   - WriteJobDescription, includes hadoop job (hadoop conf), fileFormat, 
outputSpec, partitionColumns, bucketSpec, options, statsTrackers
   - FileCommitProtocol, includes output path, dynamic partition overwrite flag
   - ConcurrentOutputWriterSpec, includes requiredOrdering, bucketSpec, 
physical sortPlan
   
   According to the existed datasource v1 writes command. `WriteFiles` should 
hold at least: FileFormat, OutputSpec, partitionColumns, bucketSpec, options, 
requiredOrdering.
   ```scala
   case class InsertIntoHadoopFsRelationCommand(
       outputPath: Path,
       staticPartitions: TablePartitionSpec,
       ifPartitionNotExists: Boolean,
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       fileFormat: FileFormat,
       options: Map[String, String],
       query: LogicalPlan,
       mode: SaveMode,
       catalogTable: Option[CatalogTable],
       fileIndex: Option[FileIndex],
       outputColumnNames: Seq[String])
   ```
   
   Due to we can not get physical plan at logical side, and 
ConcurrentOutputWriterSpec depend on physical. It should be held in 
`WriteFilesSpec`.
   
   FileCommitProtocol should be held in `WriteFilesSpec`, because `WriteFiles` 
only do the partial work about task due to the pipeline
   `setup job -> set up task -> commit task -> commit job`. And the same reason 
for statsTrackers.
   
   According to the usage of hadoop job (FileCommitProtocol.setup(Job)), I tend 
to make `WriteFilesSpec` hold hadoop job and hadoop conf.
   
   -------------
   In sum:
   
   `WriteFiles`: FileFormat, OutputSpec, partitionColumns, bucketSpec, options 
and requiredOrdering.
   `WriteFilesSpec`: FileCommitProtocol, statsTrackers, 
ConcurrentOutputWriterSpec, hadoop job and hadoop conf.
   
   Notes: the aboved does not consider how hard to implement, just based on 
semantic level.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to