cloud-fan commented on a change in pull request #26969:
URL: https://github.com/apache/spark/pull/26969#discussion_r472044596
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -500,6 +501,67 @@ class Dataset[T] private[sql](
select(newCols : _*)
}
+ /**
+ * Returns a new Dataset where each record has been mapped on to the
specified type.
+ *
+ * This is similar to `as[U]`, which is lazy in terms of the schema. With
`as[U]`, the result
+ * `Dataset[U]` still has the schema of `T`, not `U`. Hence, it may contain
additional columns or
+ * the columns may be in different order. The schema of `U` manifests only
after operations that
+ * utilize the encoder, e.g. `map` or `collect`, but not `save`.
+ *
+ * This method returns a `Dataset[U]` that is strictly derived from `U`.
This works for any case
+ * class with standard encoder. This is done through projection of `T`'s
columns onto `U`'s
+ * schema, if possible. Otherwise, it throws an `AnalysisException`. Columns
are matched by name
+ * and type, where column names' case sensitivity is determined by
`spark.sql.caseSensitive`.
+ *
+ * Where `as[U]` supports types with inner classes that have extra fields or
different field
+ * order, this is not possible through projection and hence not supported by
this method.
+ * An example for an inner type is `Inner`:
+ * {{{
+ * case class Inner(a: Int)
+ * case class Outer(i: Inner)
+ * }}}
+ * In that case you should use `map(identity)`, if this is really needed.
+ *
+ * As for `as[U]`, if the schema of the Dataset does not match the desired
`U` type, you can use
+ * `select` along with `alias` or `as` to rearrange or rename as required.
+ *
+ * @group basic
+ * @since 3.1.0
+ */
+ def toDS[U : Encoder]: Dataset[U] = {
+ // column names case-sensitivity is configurable
+ def considerCase(field: StructField): StructField =
SQLConf.get.caseSensitiveAnalysis match {
+ case true => field
+ case false => field.copy(name = field.name.toLowerCase(Locale.ROOT))
+ }
+
+ // we can project this dataset[T] to Dataset[U] when it provides all of
the Encoder[U]'s columns
+ val encoder = implicitly[Encoder[U]]
+ val projectedColumns = encoder.schema.fields
+ val availableColumns = this.schema.fields.map(considerCase).map(col =>
col.name -> col).toMap
+ val columnsMissing = projectedColumns.map(considerCase).exists(proj =>
+ ! availableColumns.get(proj.name).exists(avail =>
proj.dataType.acceptsType(avail.dataType))
+ )
+ if (columnsMissing) {
+ // give precedence to `as[U]`s Exception if that one would fail either
+ this.as[U]
+
+ // helper to pretty-print columns in exception message
+ def toString(columns: Iterable[StructField]): String =
+ s"(${ columns.map(c => s"${c.name}: ${c.dataType}").mkString(", ") })"
+
+ throw new AnalysisException(
+ s"Dataset type not supported by toDS[T]: ${encoder.clsTag}, use
map(identity)\n" +
+ "Columns\n" +
+ s"\t${toString(availableColumns.values)} cannot be projected to\n" +
+ s"\t${toString(projectedColumns)}"
+ )
+ }
+
+ this.select(projectedColumns.map(c => col(c.name)): _*).as[U]
Review comment:
After a second thought, I don't think it's possible to do it, as the
encoder is a black box here. We can't just assume the encoder expression is
simply selecting columns, it's not even true for case class, as case class
encoder does `Upcast` to input columns.
As I said before, `as[U].map(identity)` should be good enough, and the
performance is as good as this one if there are other typed operations before
`as[U]` and the `identity` function will be merged into the previous typed
operations and has nearly no overhead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]