HyukjinKwon commented on code in PR #38192:
URL: https://github.com/apache/spark/pull/38192#discussion_r995398147


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, 
SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = 
s"table${UUID.randomUUID().toString.replace("-", "")}"

Review Comment:
   I believe we don't need this because the tests won't run in parallel, and 
`withTable` will drop the table.



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, 
SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"

Review Comment:
   This one too. I believe we don't need a random path now. We could just 
inline any path into the test



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -34,59 +36,109 @@ package object dsl {
       val identifier = CatalystSqlParser.parseMultipartIdentifier(s)
 
       def protoAttr: proto.Expression =
-        proto.Expression.newBuilder()
+        proto.Expression
+          .newBuilder()
           .setUnresolvedAttribute(
-            proto.Expression.UnresolvedAttribute.newBuilder()
+            proto.Expression.UnresolvedAttribute
+              .newBuilder()
               .addAllParts(identifier.asJava)
               .build())
           .build()
     }
 
     implicit class DslExpression(val expr: proto.Expression) {
-      def as(alias: String): proto.Expression = 
proto.Expression.newBuilder().setAlias(
-        
proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr)).build()
-
-      def < (other: proto.Expression): proto.Expression =
-        proto.Expression.newBuilder().setUnresolvedFunction(
-          proto.Expression.UnresolvedFunction.newBuilder()
-            .addParts("<")
-            .addArguments(expr)
-            .addArguments(other)
-        ).build()
+      def as(alias: String): proto.Expression = proto.Expression
+        .newBuilder()
+        
.setAlias(proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr))
+        .build()
+
+      def <(other: proto.Expression): proto.Expression =
+        proto.Expression
+          .newBuilder()
+          .setUnresolvedFunction(
+            proto.Expression.UnresolvedFunction
+              .newBuilder()
+              .addParts("<")
+              .addArguments(expr)
+              .addArguments(other))
+          .build()
     }
 
     implicit def intToLiteral(i: Int): proto.Expression =
-      proto.Expression.newBuilder().setLiteral(
-        proto.Expression.Literal.newBuilder().setI32(i)
-      ).build()
+      proto.Expression
+        .newBuilder()
+        .setLiteral(proto.Expression.Literal.newBuilder().setI32(i))
+        .build()
+  }
+
+  object commands { // scalastyle:ignore
+    implicit class DslCommands(val logicalPlan: proto.Relation) {
+      def write(
+          format: Option[String] = None,
+          path: Option[String] = None,
+          tableName: Option[String] = None,
+          mode: Option[String] = None,
+          sortByColumns: Seq[String] = Seq.empty,
+          partitionByCols: Seq[String] = Seq.empty,
+          bucketByCols: Seq[String] = Seq.empty,
+          numBuckets: Option[Int] = None): proto.Command = {
+        val writeOp = proto.WriteOperation.newBuilder()
+        format.foreach(writeOp.setSource(_))
+
+        mode
+          .map(SaveMode.valueOf(_))
+          .map(DataTypeProtoConverter.toSaveModeProto(_))
+          .foreach(writeOp.setMode(_))
+
+        if (tableName.nonEmpty) {
+          tableName.foreach(writeOp.setTableName(_))
+        } else {
+          path.foreach(writeOp.setPath(_))
+        }
+        sortByColumns.foreach(writeOp.addSortColumnNames(_))
+        partitionByCols.foreach(writeOp.addPartitioningColumns(_))
+
+        if (numBuckets.nonEmpty && bucketByCols.nonEmpty) {
+          val op = proto.WriteOperation.BucketBy.newBuilder()
+          numBuckets.foreach(op.setNumBuckets(_))
+          bucketByCols.foreach(op.addBucketColumnNames(_))
+          writeOp.setBucketBy(op.build())
+        }
+        writeOp.setInput(logicalPlan)
+        proto.Command.newBuilder().setWriteOperation(writeOp.build()).build()
+      }
+    }
   }
 
   object plans { // scalastyle:ignore
     implicit class DslLogicalPlan(val logicalPlan: proto.Relation) {
       def select(exprs: proto.Expression*): proto.Relation = {
-        proto.Relation.newBuilder().setProject(

Review Comment:
   I would remove these style changes since the previous codes already follow 
the style guides we have.



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:
##########
@@ -88,16 +108,20 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
   }
 
   test("Simple Project") {
-    val readWithTable = proto.Read.newBuilder()
+    val readWithTable = proto.Read
+      .newBuilder()
       .setNamedTable(proto.Read.NamedTable.newBuilder.addParts("name").build())
       .build()
     val project =
-      proto.Project.newBuilder()
+      proto.Project
+        .newBuilder()
         .setInput(proto.Relation.newBuilder().setRead(readWithTable).build())
         .addExpressions(
-          proto.Expression.newBuilder()
-            .setUnresolvedStar(UnresolvedStar.newBuilder().build()).build()
-        ).build()
+          proto.Expression
+            .newBuilder()
+            .setUnresolvedStar(UnresolvedStar.newBuilder().build())
+            .build())
+        .build()

Review Comment:
   Maybe we should get rid of these changes too?



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import java.nio.file.{Files, Paths}
+import java.util.UUID
+
+import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.connect.command.{InvalidCommandInput, 
SparkConnectCommandPlanner}
+import org.apache.spark.sql.connect.dsl.commands._
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SparkConnectCommandPlannerSuite
+    extends SQLTestUtils
+    with SparkConnectPlanTest
+    with SharedSparkSession {
+
+  lazy val localRelation = createLocalRelationProto(Seq($"id".int))
+
+  /**
+   * Returns a unique path name on every invocation.
+   * @return
+   */
+  private def path(): String = s"/tmp/${UUID.randomUUID()}"
+
+  /**
+   * Returns a unique valid table name indentifier on each invocation.
+   * @return
+   */
+  private def table(): String = 
s"table${UUID.randomUUID().toString.replace("-", "")}"
+
+  def transform(cmd: proto.Command): Unit = {
+    new SparkConnectCommandPlanner(spark, cmd).process()
+  }
+
+  test("Writes fails without path or table") {
+    assertThrows[UnsupportedOperationException] {
+      transform(localRelation.write())
+    }
+  }
+
+  test("Write fails with unknown table - AnalysisException") {
+    val cmd = readRel.write(tableName = Some("dest"))
+    assertThrows[AnalysisException] {
+      transform(cmd)
+    }
+  }
+
+  test("Write with partitions") {
+    val name = table()

Review Comment:
   maybe `withTable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to