WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014302073
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { + case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) + case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { + val exp = neq.asInstanceOf[BinaryComparison] + hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = + exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false + } + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { + plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => + left.isStreaming && right.isStreaming + otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false + } + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate - if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: Will do! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org