grundprinzip commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r993783613
##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -74,4 +82,64 @@ class SparkConnectCommandPlanner(session: SparkSession,
command: proto.Command)
session.udf.registerPython(cf.getPartsList.asScala.head, udf)
}
+ /**
+ * Transforms the write operation and executes it.
+ *
+ * The input write operation contains a reference to the input plan and
transforms it to the
+ * corresponding logical plan. Afterwards, creates the DataFrameWriter and
translates the
+ * parameters of the WriteOperation into the corresponding methods calls.
+ *
+ * @param writeOperation
+ */
+ def handleWriteOperation(writeOperation: WriteOperation): Unit = {
+ // Transform the input plan into the logical plan.
+ val planner = new SparkConnectPlanner(writeOperation.getInput, session)
+ val plan = planner.transform()
+ // And create a Dataset from the plan.
+ val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+ val w = dataset.write
+ if (writeOperation.getMode !=
proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED) {
+ w.mode(DataTypeProtoConverter.toSaveMode(writeOperation.getMode))
+ }
+
+ if (writeOperation.getOptionsCount > 0) {
+ writeOperation.getOptionsMap.asScala.foreach { case (key, value) =>
w.option(key, value) }
+ }
+
+ if (writeOperation.getSortColumnNamesCount > 0) {
+ val names = writeOperation.getSortColumnNamesList.asScala
+ w.sortBy(names.head, names.tail.toSeq: _*)
+ }
+
+ if (writeOperation.hasBucketBy) {
+ val op = writeOperation.getBucketBy
+ val cols = op.getColumnsList.asScala
+ if (op.getBucketCount <= 0) {
+ throw InvalidCommandInput(
+ s"BucketBy must specify a bucket count > 0, received
${op.getBucketCount} instead.")
+ }
+ w.bucketBy(op.getBucketCount, cols.head, cols.tail.toSeq: _*)
+ }
+
+ if (writeOperation.getPartitionByColumnsCount > 0) {
+ val names = writeOperation.getPartitionByColumnsList.asScala
+ w.partitionBy(names.toSeq: _*)
+ }
+
+ if (writeOperation.getFormat != null) {
+ w.format(writeOperation.getFormat)
+ }
+
+ writeOperation.getSaveTypeCase match {
+ case proto.WriteOperation.SaveTypeCase.PATH =>
w.save(writeOperation.getPath)
+ case proto.WriteOperation.SaveTypeCase.TABLE_NAME =>
+ w.saveAsTable(writeOperation.getTableName)
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"WriteOperation:SaveTypeCase not supported "
+ + "${writeOperation.getSaveTypeCase.getNumber}")
Review Comment:
In case the proto does not have the enum value there is no name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]