EnricoMi commented on a change in pull request #26969:
URL: https://github.com/apache/spark/pull/26969#discussion_r472090024



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -500,6 +501,67 @@ class Dataset[T] private[sql](
     select(newCols : _*)
   }
 
+  /**
+   * Returns a new Dataset where each record has been mapped on to the 
specified type.
+   *
+   * This is similar to `as[U]`, which is lazy in terms of the schema. With 
`as[U]`, the result
+   * `Dataset[U]` still has the schema of `T`, not `U`. Hence, it may contain 
additional columns or
+   * the columns may be in different order. The schema of `U` manifests only 
after operations that
+   * utilize the encoder, e.g. `map` or `collect`, but not `save`.
+   *
+   * This method returns a `Dataset[U]` that is strictly derived from `U`. 
This works for any case
+   * class with standard encoder. This is done through projection of `T`'s 
columns onto `U`'s
+   * schema, if possible. Otherwise, it throws an `AnalysisException`. Columns 
are matched by name
+   * and type, where column names' case sensitivity is determined by 
`spark.sql.caseSensitive`.
+   *
+   * Where `as[U]` supports types with inner classes that have extra fields or 
different field
+   * order, this is not possible through projection and hence not supported by 
this method.
+   * An example for an inner type is `Inner`:
+   * {{{
+   *   case class Inner(a: Int)
+   *   case class Outer(i: Inner)
+   * }}}
+   * In that case you should use `map(identity)`, if this is really needed.
+   *
+   * As for `as[U]`, if the schema of the Dataset does not match the desired 
`U` type, you can use
+   * `select` along with `alias` or `as` to rearrange or rename as required.
+   *
+   * @group basic
+   * @since 3.1.0
+   */
+  def toDS[U : Encoder]: Dataset[U] = {
+    // column names case-sensitivity is configurable
+    def considerCase(field: StructField): StructField = 
SQLConf.get.caseSensitiveAnalysis match {
+        case true => field
+        case false => field.copy(name = field.name.toLowerCase(Locale.ROOT))
+      }
+
+    // we can project this dataset[T] to Dataset[U] when it provides all of 
the Encoder[U]'s columns
+    val encoder = implicitly[Encoder[U]]
+    val projectedColumns = encoder.schema.fields
+    val availableColumns = this.schema.fields.map(considerCase).map(col => 
col.name -> col).toMap
+    val columnsMissing = projectedColumns.map(considerCase).exists(proj =>
+      ! availableColumns.get(proj.name).exists(avail => 
proj.dataType.acceptsType(avail.dataType))
+    )
+    if (columnsMissing) {
+      // give precedence to `as[U]`s Exception if that one would fail either
+      this.as[U]
+
+      // helper to pretty-print columns in exception message
+      def toString(columns: Iterable[StructField]): String =
+        s"(${ columns.map(c => s"${c.name}: ${c.dataType}").mkString(", ") })"
+
+      throw new AnalysisException(
+        s"Dataset type not supported by toDS[T]: ${encoder.clsTag}, use 
map(identity)\n" +
+          "Columns\n" +
+          s"\t${toString(availableColumns.values)} cannot be projected to\n" +
+          s"\t${toString(projectedColumns)}"
+      )
+    }
+
+    this.select(projectedColumns.map(c => col(c.name)): _*).as[U]

Review comment:
       I don't see why encoders that perform `UpCast` would not work here. The 
above line still does apply the encoder via `.as[U]`, and `UpCast` does not 
involve column names in any way, i.e. the projection. Can you given a code 
example showing why this should not work?
   
   With `map(identity)` we perform an encoder round-trip encoding the given 
columns into the black-box representation, and decoding with the same encoder 
to some columns again. The decoded representation (columns) must work as input 
to encode again, hence column names on both sides are identical. I would argue 
if column names exist it can be inferred that they can be projected. I'd be 
happy to see some example code that show this assumption is wrong.
   
   It is a common use-case to use `Dataset[T]` to create typed user code. 
Defining a function `def compute(data: DataFrame): DataFrame` would allow to 
call it with arbitrary `DataFrame`s and the function would have to assert the 
schema on every call. Defining the function typed instead like `def 
compute(data: Dataset[Raw]): Dataset[Compute]` removes those assertions from 
your code and makes your code not compile when your try to use the wrong kind 
of `DataFrame`. In that scenario, you are using `.as[T]` but might never call 
any method that actually uses the encoder (like `map` or `collect`). Thus, you 
never materialize that hidden schema.
   
   I think adding `.map(identity)` to `.as[T]` in user code looks redundant and 
meaningless from a user point of view and should be moved behind the Spark API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to