EnricoMi commented on a change in pull request #26969:
URL: https://github.com/apache/spark/pull/26969#discussion_r484965678



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -500,6 +501,67 @@ class Dataset[T] private[sql](
     select(newCols : _*)
   }
 
+  /**
+   * Returns a new Dataset where each record has been mapped on to the 
specified type.
+   *
+   * This is similar to `as[U]`, which is lazy in terms of the schema. With 
`as[U]`, the result
+   * `Dataset[U]` still has the schema of `T`, not `U`. Hence, it may contain 
additional columns or
+   * the columns may be in different order. The schema of `U` manifests only 
after operations that
+   * utilize the encoder, e.g. `map` or `collect`, but not `save`.
+   *
+   * This method returns a `Dataset[U]` that is strictly derived from `U`. 
This works for any case
+   * class with standard encoder. This is done through projection of `T`'s 
columns onto `U`'s
+   * schema, if possible. Otherwise, it throws an `AnalysisException`. Columns 
are matched by name
+   * and type, where column names' case sensitivity is determined by 
`spark.sql.caseSensitive`.
+   *
+   * Where `as[U]` supports types with inner classes that have extra fields or 
different field
+   * order, this is not possible through projection and hence not supported by 
this method.
+   * An example for an inner type is `Inner`:
+   * {{{
+   *   case class Inner(a: Int)
+   *   case class Outer(i: Inner)
+   * }}}
+   * In that case you should use `map(identity)`, if this is really needed.
+   *
+   * As for `as[U]`, if the schema of the Dataset does not match the desired 
`U` type, you can use
+   * `select` along with `alias` or `as` to rearrange or rename as required.
+   *
+   * @group basic
+   * @since 3.1.0
+   */
+  def toDS[U : Encoder]: Dataset[U] = {
+    // column names case-sensitivity is configurable
+    def considerCase(field: StructField): StructField = 
SQLConf.get.caseSensitiveAnalysis match {
+        case true => field
+        case false => field.copy(name = field.name.toLowerCase(Locale.ROOT))
+      }
+
+    // we can project this dataset[T] to Dataset[U] when it provides all of 
the Encoder[U]'s columns
+    val encoder = implicitly[Encoder[U]]
+    val projectedColumns = encoder.schema.fields
+    val availableColumns = this.schema.fields.map(considerCase).map(col => 
col.name -> col).toMap
+    val columnsMissing = projectedColumns.map(considerCase).exists(proj =>
+      ! availableColumns.get(proj.name).exists(avail => 
proj.dataType.acceptsType(avail.dataType))
+    )
+    if (columnsMissing) {
+      // give precedence to `as[U]`s Exception if that one would fail either
+      this.as[U]
+
+      // helper to pretty-print columns in exception message
+      def toString(columns: Iterable[StructField]): String =
+        s"(${ columns.map(c => s"${c.name}: ${c.dataType}").mkString(", ") })"
+
+      throw new AnalysisException(
+        s"Dataset type not supported by toDS[T]: ${encoder.clsTag}, use 
map(identity)\n" +
+          "Columns\n" +
+          s"\t${toString(availableColumns.values)} cannot be projected to\n" +
+          s"\t${toString(projectedColumns)}"
+      )
+    }
+
+    this.select(projectedColumns.map(c => col(c.name)): _*).as[U]

Review comment:
       And if we define the `toDS[U]` operation as applying the schema of `U` 
only and not any other semantics of the encoder. The encoder has an output 
schema, and this is the output schema of `toDS[U]`. I think that is a sensible 
scope and a common use case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to