sunchao commented on a change in pull request #35657:
URL: https://github.com/apache/spark/pull/35657#discussion_r829317532
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
##########
@@ -44,20 +52,108 @@ object V2ExpressionUtils extends SQLConfHelper {
refs.map(ref => resolveRef[T](ref, plan))
}
- def toCatalyst(expr: V2Expression, query: LogicalPlan): Expression = {
+ /**
+ * Converts the array of input V2 [[V2SortOrder]] into their counterparts in
catalyst.
+ */
+ def toCatalystOrdering(ordering: Array[V2SortOrder], query: LogicalPlan):
Seq[SortOrder] = {
+ sequenceToOption(ordering.map(toCatalyst(_,
query))).asInstanceOf[Option[Seq[SortOrder]]]
+ .getOrElse(Seq.empty)
+ }
+
+ /**
+ * Converts the V2 [[V2Distribution]] into its counterpart in catalyst
[[Distribution]].
+ *
+ * If `funCatalogOpt` is set, it will be used to lookup any V2 function
referenced
+ * by the input distribution. For instance, a bucket transform in the
distribution requires a
+ * corresponding function to exist in the catalog, in order for Spark to
leverage bucket join
+ * for the V2 data sources.
+ *
+ * This returns [[UnspecifiedDistribution]] if any non-identity transform is
used in the
+ * distribution, AND the `funCatalogOpt` is not set OR the corresponding
function is not
+ * defined in the catalog.
+ */
+ def toCatalystDistribution(
+ distribution: V2Distribution,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Distribution =
distribution match {
+ case d: V2OrderedDistribution =>
+ val resolvedOrdering = toCatalystOrdering(d.ordering(), query)
+ OrderedDistribution(resolvedOrdering)
+ case d: V2ClusteredDistribution =>
+ sequenceToOption(d.clustering.map(toCatalyst(_, query, funCatalogOpt)))
+ .map(ClusteredDistribution(_))
+ .getOrElse(UnspecifiedDistribution)
+ case _: V2UnspecifiedDistribution =>
+ UnspecifiedDistribution
+ }
+
+ def toCatalyst(
+ expr: V2Expression,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = {
expr match {
+ case t: Transform =>
+ toCatalystTransform(t, query, funCatalogOpt)
case SortValue(child, direction, nullOrdering) =>
- val catalystChild = toCatalyst(child, query)
- SortOrder(catalystChild, toCatalyst(direction),
toCatalyst(nullOrdering), Seq.empty)
- case IdentityTransform(ref) =>
- resolveRef[NamedExpression](ref, query)
+ toCatalyst(child, query, funCatalogOpt).map { catalystChild =>
+ SortOrder(catalystChild, toCatalyst(direction),
toCatalyst(nullOrdering), Seq.empty)
+ }
case ref: FieldReference =>
- resolveRef[NamedExpression](ref, query)
+ Some(resolveRef[NamedExpression](ref, query))
case _ =>
throw new AnalysisException(s"$expr is not currently supported")
}
}
+ def toCatalystTransform(
+ trans: Transform,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] =
trans match {
+ case IdentityTransform(ref) =>
+ Some(resolveRef[NamedExpression](ref, query))
+ case BucketTransform(numBuckets, refs, sorted) if sorted.isEmpty &&
refs.length == 1 =>
+ val resolvedRefs = refs.map(r => resolveRef[NamedExpression](r, query))
+ funCatalogOpt.flatMap { catalog =>
+ loadV2Function(catalog, "bucket", resolvedRefs).map { bound =>
+ DataSourceBucketTransformExpression(numBuckets, bound, resolvedRefs)
Review comment:
It's a function for the bucket transform: `bucket(num_buckets, c)` (it
could be `bucket(num_buckets, c1, c2, ..)` in future).
The issue here is `canonicalName` for the bucket `BoundFunction`, for
obvious reason, doesn't consider the value of `numBuckets`. However, to check
of two bucket transforms are compatible, we need to take that into account.
That's why we need the extra `DataSourceBucketTransformExpression`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]