WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014302073
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
}
}
+ def hasRangeExpr(e: Expression): Boolean = e.exists {
+ case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _:
GreaterThan) =>
+ hasEventTimeColNeq(neq)
+ case _ => false
+ }
+
+ def hasEventTimeColNeq(neq: Expression): Boolean = {
+ val exp = neq.asInstanceOf[BinaryComparison]
+ hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+ }
+
+ def hasEventTimeCol(exps: Expression): Boolean =
+ exps.exists {
+ case a: AttributeReference =>
a.metadata.contains(EventTimeWatermark.delayKey)
+ case _ => false
+ }
+
+ // TODO: This function and hasRangeExpr
+ // should be deleted after we support range join with states
+ def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+ plan match {
+ case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+ left.isStreaming && right.isStreaming
+ otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+ case _ => false
+ }
+ }
+
/**
* Checks for possible correctness issue in chained stateful operators. The
behavior is
* controlled by SQL config
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise,
Spark will just
* print a warning message.
*/
def checkStreamingQueryGlobalWatermarkLimit(
- plan: LogicalPlan,
- outputMode: OutputMode): Unit = {
+ plan: LogicalPlan): Unit = {
def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p
match {
- case s: Aggregate
- if s.isStreaming && outputMode == InternalOutputModes.Append => true
case Join(left, right, joinType, _, _)
Review Comment:
Will do!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]