cloud-fan commented on a change in pull request #35657:
URL: https://github.com/apache/spark/pull/35657#discussion_r827775484
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
##########
@@ -44,20 +52,108 @@ object V2ExpressionUtils extends SQLConfHelper {
refs.map(ref => resolveRef[T](ref, plan))
}
- def toCatalyst(expr: V2Expression, query: LogicalPlan): Expression = {
+ /**
+ * Converts the array of input V2 [[V2SortOrder]] into their counterparts in
catalyst.
+ */
+ def toCatalystOrdering(ordering: Array[V2SortOrder], query: LogicalPlan):
Seq[SortOrder] = {
+ sequenceToOption(ordering.map(toCatalyst(_,
query))).asInstanceOf[Option[Seq[SortOrder]]]
+ .getOrElse(Seq.empty)
+ }
+
+ /**
+ * Converts the V2 [[V2Distribution]] into its counterpart in catalyst
[[Distribution]].
+ *
+ * If `funCatalogOpt` is set, it will be used to lookup any V2 function
referenced
+ * by the input distribution. For instance, a bucket transform in the
distribution requires a
+ * corresponding function to exist in the catalog, in order for Spark to
leverage bucket join
+ * for the V2 data sources.
+ *
+ * This returns [[UnspecifiedDistribution]] if any non-identity transform is
used in the
+ * distribution, AND the `funCatalogOpt` is not set OR the corresponding
function is not
+ * defined in the catalog.
+ */
+ def toCatalystDistribution(
+ distribution: V2Distribution,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Distribution =
distribution match {
+ case d: V2OrderedDistribution =>
+ val resolvedOrdering = toCatalystOrdering(d.ordering(), query)
+ OrderedDistribution(resolvedOrdering)
+ case d: V2ClusteredDistribution =>
+ sequenceToOption(d.clustering.map(toCatalyst(_, query, funCatalogOpt)))
+ .map(ClusteredDistribution(_))
+ .getOrElse(UnspecifiedDistribution)
+ case _: V2UnspecifiedDistribution =>
+ UnspecifiedDistribution
+ }
+
+ def toCatalyst(
+ expr: V2Expression,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = {
expr match {
+ case t: Transform =>
+ toCatalystTransform(t, query, funCatalogOpt)
case SortValue(child, direction, nullOrdering) =>
- val catalystChild = toCatalyst(child, query)
- SortOrder(catalystChild, toCatalyst(direction),
toCatalyst(nullOrdering), Seq.empty)
- case IdentityTransform(ref) =>
- resolveRef[NamedExpression](ref, query)
+ toCatalyst(child, query, funCatalogOpt).map { catalystChild =>
+ SortOrder(catalystChild, toCatalyst(direction),
toCatalyst(nullOrdering), Seq.empty)
+ }
case ref: FieldReference =>
- resolveRef[NamedExpression](ref, query)
+ Some(resolveRef[NamedExpression](ref, query))
case _ =>
throw new AnalysisException(s"$expr is not currently supported")
}
}
+ def toCatalystTransform(
+ trans: Transform,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] =
trans match {
+ case IdentityTransform(ref) =>
+ Some(resolveRef[NamedExpression](ref, query))
+ case BucketTransform(numBuckets, refs, sorted) if sorted.isEmpty &&
refs.length == 1 =>
+ val resolvedRefs = refs.map(r => resolveRef[NamedExpression](r, query))
+ funCatalogOpt.flatMap { catalog =>
+ loadV2Function(catalog, "bucket", resolvedRefs).map { bound =>
+ DataSourceBucketTransformExpression(numBuckets, bound, resolvedRefs)
Review comment:
`numBuckets` can be part of the transform function inputs as an int
literal. Seems we only need a singe class `DataSourceTransform`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
##########
@@ -44,20 +52,108 @@ object V2ExpressionUtils extends SQLConfHelper {
refs.map(ref => resolveRef[T](ref, plan))
}
- def toCatalyst(expr: V2Expression, query: LogicalPlan): Expression = {
+ /**
+ * Converts the array of input V2 [[V2SortOrder]] into their counterparts in
catalyst.
+ */
+ def toCatalystOrdering(ordering: Array[V2SortOrder], query: LogicalPlan):
Seq[SortOrder] = {
+ sequenceToOption(ordering.map(toCatalyst(_,
query))).asInstanceOf[Option[Seq[SortOrder]]]
+ .getOrElse(Seq.empty)
+ }
+
+ /**
+ * Converts the V2 [[V2Distribution]] into its counterpart in catalyst
[[Distribution]].
+ *
+ * If `funCatalogOpt` is set, it will be used to lookup any V2 function
referenced
+ * by the input distribution. For instance, a bucket transform in the
distribution requires a
+ * corresponding function to exist in the catalog, in order for Spark to
leverage bucket join
+ * for the V2 data sources.
+ *
+ * This returns [[UnspecifiedDistribution]] if any non-identity transform is
used in the
+ * distribution, AND the `funCatalogOpt` is not set OR the corresponding
function is not
+ * defined in the catalog.
+ */
+ def toCatalystDistribution(
+ distribution: V2Distribution,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Distribution =
distribution match {
+ case d: V2OrderedDistribution =>
+ val resolvedOrdering = toCatalystOrdering(d.ordering(), query)
+ OrderedDistribution(resolvedOrdering)
+ case d: V2ClusteredDistribution =>
+ sequenceToOption(d.clustering.map(toCatalyst(_, query, funCatalogOpt)))
+ .map(ClusteredDistribution(_))
+ .getOrElse(UnspecifiedDistribution)
+ case _: V2UnspecifiedDistribution =>
+ UnspecifiedDistribution
+ }
+
+ def toCatalyst(
+ expr: V2Expression,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = {
expr match {
+ case t: Transform =>
+ toCatalystTransform(t, query, funCatalogOpt)
case SortValue(child, direction, nullOrdering) =>
- val catalystChild = toCatalyst(child, query)
- SortOrder(catalystChild, toCatalyst(direction),
toCatalyst(nullOrdering), Seq.empty)
- case IdentityTransform(ref) =>
- resolveRef[NamedExpression](ref, query)
+ toCatalyst(child, query, funCatalogOpt).map { catalystChild =>
+ SortOrder(catalystChild, toCatalyst(direction),
toCatalyst(nullOrdering), Seq.empty)
+ }
case ref: FieldReference =>
- resolveRef[NamedExpression](ref, query)
+ Some(resolveRef[NamedExpression](ref, query))
case _ =>
throw new AnalysisException(s"$expr is not currently supported")
}
}
+ def toCatalystTransform(
+ trans: Transform,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] =
trans match {
+ case IdentityTransform(ref) =>
+ Some(resolveRef[NamedExpression](ref, query))
+ case BucketTransform(numBuckets, refs, sorted) if sorted.isEmpty &&
refs.length == 1 =>
+ val resolvedRefs = refs.map(r => resolveRef[NamedExpression](r, query))
+ funCatalogOpt.flatMap { catalog =>
+ loadV2Function(catalog, "bucket", resolvedRefs).map { bound =>
+ DataSourceBucketTransformExpression(numBuckets, bound, resolvedRefs)
Review comment:
`numBuckets` can be part of the transform function inputs as an int
literal. Seems we only need a single class `DataSourceTransform`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]