allisonwang-db commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715076106



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -41,14 +41,18 @@ case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
 object DeduplicateRelations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     renewDuplicatedRelations(mutable.HashSet.empty, 
plan)._1.resolveOperatorsUpWithPruning(
-      _.containsAnyPattern(JOIN, LATERAL_JOIN, INTERSECT, EXCEPT, UNION, 
COMMAND), ruleId) {
+      _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, 
UNION, COMMAND),
+      ruleId) {
       case p: LogicalPlan if !p.childrenResolved => p
       // To resolve duplicate expression IDs for Join.
       case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
         j.copy(right = dedupRight(left, right))
       // Resolve duplicate output for LateralJoin.
       case j @ LateralJoin(left, right, _, _) if right.resolved && 
!j.duplicateResolved =>
         j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
+      // Resolve duplicate output for AsOfJoin.
+      case j @ AsOfJoin(left, right, _, _, _, _) if right.resolved && 
!j.duplicateResolved =>

Review comment:
       Here we only need `!j.duplicateResolved` (LateralJoin needs it because 
it's a UnaryNode)

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and 
Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, 
right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= 
left.t - tolerance

Review comment:
       Should we make sure tolerance here is non-negative?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and 
Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, 
right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)

Review comment:
       `SELECT MIN_BY(STRUCT(right.*), left.t - right.t) AS __nearest_right__`
   
   
   

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
##########
@@ -121,3 +121,24 @@ object LeftSemiOrAnti {
     case _ => None
   }
 }
+
+object AsOfJoinDirection {
+
+  def apply(direction: String): AsOfJoinDirection = {
+    direction.toLowerCase(Locale.ROOT).replace("_", "") match {

Review comment:
       Why do we need to replace `_` with ""?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to