hvanhovell commented on code in PR #40061:
URL: https://github.com/apache/spark/pull/40061#discussion_r1109152425


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+/**
+ * Interface used to write a [[Dataset]] to external storage systems (e.g. 
file systems,
+ * key-value stores, etc). Use `Dataset.write` to access this.
+ *
+ * @since 3.4.0
+ */
+@Stable
+final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   * <ul>
+   * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
+   * <li>`SaveMode.Append`: append the data.</li>
+   * <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
+   * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
+   * </ul>
+   * <p>
+   * The default option is `ErrorIfExists`.
+   *
+   * @since 3.4.0
+   */
+  def mode(saveMode: SaveMode): DataFrameWriter[T] = {
+    this.mode = saveMode
+    this
+  }
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   * <ul>
+   * <li>`overwrite`: overwrite the existing data.</li>
+   * <li>`append`: append the data.</li>
+   * <li>`ignore`: ignore the operation (i.e. no-op).</li>
+   * <li>`error` or `errorifexists`: default option, throw an exception at 
runtime.</li>
+   * </ul>
+   *
+   * @since 3.4.0
+   */
+  def mode(saveMode: String): DataFrameWriter[T] = {
+    saveMode.toLowerCase(Locale.ROOT) match {
+      case "overwrite" => mode(SaveMode.Overwrite)
+      case "append" => mode(SaveMode.Append)
+      case "ignore" => mode(SaveMode.Ignore)
+      case "error" | "errorifexists" | "default" => 
mode(SaveMode.ErrorIfExists)
+      case _ => throw new IllegalArgumentException(s"Unknown save mode: 
$saveMode. Accepted " +
+        "save modes are 'overwrite', 'append', 'ignore', 'error', 
'errorifexists', 'default'.")
+    }
+  }
+
+  /**
+   * Specifies the underlying output data source. Built-in options include 
"parquet", "json", etc.
+   *
+   * @since 3.4.0
+   */
+  def format(source: String): DataFrameWriter[T] = {
+    this.source = Some(source)
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 3.4.0
+   */
+  def option(key: String, value: String): DataFrameWriter[T] = {
+    this.extraOptions = this.extraOptions + (key -> value)
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 3.4.0
+   */
+  def option(key: String, value: Boolean): DataFrameWriter[T] = option(key, 
value.toString)
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 3.4.0
+   */
+  def option(key: String, value: Long): DataFrameWriter[T] = option(key, 
value.toString)
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 3.4.0
+   */
+  def option(key: String, value: Double): DataFrameWriter[T] = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds output options for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 3.4.0
+   */
+  def options(options: scala.collection.Map[String, String]): 
DataFrameWriter[T] = {
+    this.extraOptions ++= options
+    this
+  }
+
+  /**
+   * Adds output options for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 3.4.0
+   */
+  def options(options: java.util.Map[String, String]): DataFrameWriter[T] = {
+    this.options(options.asScala)
+    this
+  }
+
+  /**
+   * Partitions the output by the given columns on the file system. If 
specified, the output is
+   * laid out on the file system similar to Hive's partitioning scheme. As an 
example, when we
+   * partition a dataset by year and then month, the directory layout would 
look like:
+   * <ul>
+   * <li>year=2016/month=01/</li>
+   * <li>year=2016/month=02/</li>
+   * </ul>
+   *
+   * Partitioning is one of the most widely used techniques to optimize 
physical data layout.
+   * It provides a coarse-grained index for skipping unnecessary data reads 
when queries have
+   * predicates on the partitioned columns. In order for partitioning to work 
well, the number
+   * of distinct values in each column should typically be less than tens of 
thousands.
+   *
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
+   * 2.1.0.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colNames: String*): DataFrameWriter[T] = {
+    this.partitioningColumns = Option(colNames)
+    this
+  }
+
+  /**
+   * Buckets the output by the given columns. If specified, the output is laid 
out on the file
+   * system similar to Hive's bucketing scheme, but with a different bucket 
hash function
+   * and is not compatible with Hive's bucketing.
+   *
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
+   * 2.1.0.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def bucketBy(numBuckets: Int, colName: String, colNames: String*): 
DataFrameWriter[T] = {
+    this.numBuckets = Option(numBuckets)
+    this.bucketColumnNames = Option(colName +: colNames)
+    this
+  }
+
+  /**
+   * Sorts the output in each bucket by the given columns.
+   *
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
+   * 2.1.0.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def sortBy(colName: String, colNames: String*): DataFrameWriter[T] = {
+    this.sortColumnNames = Option(colName +: colNames)
+    this
+  }
+
+  /**
+   * Saves the content of the `DataFrame` at the specified path.
+   *
+   * @since 3.4.0
+   */
+  def save(path: String): Unit = {
+    saveInternal(Some(path))
+  }
+
+  /**
+   * Saves the content of the `DataFrame` as the specified table.
+   *
+   * @since 3.4.0
+   */
+  def save(): Unit = saveInternal(None)
+
+  private def saveInternal(path: Option[String]): Unit = {
+    executeWriteOperation(builder => path.foreach(builder.setPath))
+  }
+
+  private def executeWriteOperation(f: proto.WriteOperation.Builder => Unit): 
Unit = {
+    val builder = proto.WriteOperation.newBuilder()
+
+    builder.setInput(df.plan.getRoot)
+
+    // Set path or table
+    f(builder)
+    require(builder.hasPath ^ builder.hasTable) // Only one can be set
+
+    builder.setMode(mode match {
+      case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
+      case SaveMode.Overwrite => 
proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
+      case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
+      case SaveMode.ErrorIfExists => 
proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
+    })
+
+    source.foreach(builder.setSource)
+    sortColumnNames.foreach(names => 
builder.addAllSortColumnNames(names.asJava))
+    partitioningColumns.foreach(cols => 
builder.addAllPartitioningColumns(cols.asJava))
+
+    numBuckets.foreach(n => {

Review Comment:
   Should we validate somewhere that bucket columns is not empty. We might also 
want to check that numBuckets > 0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to