EnricoMi commented on a change in pull request #26969:
URL: https://github.com/apache/spark/pull/26969#discussion_r484965678
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -500,6 +501,67 @@ class Dataset[T] private[sql](
select(newCols : _*)
}
+ /**
+ * Returns a new Dataset where each record has been mapped on to the
specified type.
+ *
+ * This is similar to `as[U]`, which is lazy in terms of the schema. With
`as[U]`, the result
+ * `Dataset[U]` still has the schema of `T`, not `U`. Hence, it may contain
additional columns or
+ * the columns may be in different order. The schema of `U` manifests only
after operations that
+ * utilize the encoder, e.g. `map` or `collect`, but not `save`.
+ *
+ * This method returns a `Dataset[U]` that is strictly derived from `U`.
This works for any case
+ * class with standard encoder. This is done through projection of `T`'s
columns onto `U`'s
+ * schema, if possible. Otherwise, it throws an `AnalysisException`. Columns
are matched by name
+ * and type, where column names' case sensitivity is determined by
`spark.sql.caseSensitive`.
+ *
+ * Where `as[U]` supports types with inner classes that have extra fields or
different field
+ * order, this is not possible through projection and hence not supported by
this method.
+ * An example for an inner type is `Inner`:
+ * {{{
+ * case class Inner(a: Int)
+ * case class Outer(i: Inner)
+ * }}}
+ * In that case you should use `map(identity)`, if this is really needed.
+ *
+ * As for `as[U]`, if the schema of the Dataset does not match the desired
`U` type, you can use
+ * `select` along with `alias` or `as` to rearrange or rename as required.
+ *
+ * @group basic
+ * @since 3.1.0
+ */
+ def toDS[U : Encoder]: Dataset[U] = {
+ // column names case-sensitivity is configurable
+ def considerCase(field: StructField): StructField =
SQLConf.get.caseSensitiveAnalysis match {
+ case true => field
+ case false => field.copy(name = field.name.toLowerCase(Locale.ROOT))
+ }
+
+ // we can project this dataset[T] to Dataset[U] when it provides all of
the Encoder[U]'s columns
+ val encoder = implicitly[Encoder[U]]
+ val projectedColumns = encoder.schema.fields
+ val availableColumns = this.schema.fields.map(considerCase).map(col =>
col.name -> col).toMap
+ val columnsMissing = projectedColumns.map(considerCase).exists(proj =>
+ ! availableColumns.get(proj.name).exists(avail =>
proj.dataType.acceptsType(avail.dataType))
+ )
+ if (columnsMissing) {
+ // give precedence to `as[U]`s Exception if that one would fail either
+ this.as[U]
+
+ // helper to pretty-print columns in exception message
+ def toString(columns: Iterable[StructField]): String =
+ s"(${ columns.map(c => s"${c.name}: ${c.dataType}").mkString(", ") })"
+
+ throw new AnalysisException(
+ s"Dataset type not supported by toDS[T]: ${encoder.clsTag}, use
map(identity)\n" +
+ "Columns\n" +
+ s"\t${toString(availableColumns.values)} cannot be projected to\n" +
+ s"\t${toString(projectedColumns)}"
+ )
+ }
+
+ this.select(projectedColumns.map(c => col(c.name)): _*).as[U]
Review comment:
And if we define the `toDS[U]` operation as applying the schema of `U`
only and not any other semantics of the encoder. The encoder has an output
schema, and this is the output schema of `toDS[U]`. I think that is a sensible
scope and a common use case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]