HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1013649051
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
}
}
+ def hasRangeExpr(e: Expression): Boolean = e.exists {
+ case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _:
GreaterThan) =>
+ hasEventTimeColNeq(neq)
+ case _ => false
+ }
+
+ def hasEventTimeColNeq(neq: Expression): Boolean = {
+ val exp = neq.asInstanceOf[BinaryComparison]
+ hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+ }
+
+ def hasEventTimeCol(exps: Expression): Boolean =
+ exps.exists {
+ case a: AttributeReference =>
a.metadata.contains(EventTimeWatermark.delayKey)
+ case _ => false
+ }
+
+ // TODO: This function and hasRangeExpr
+ // should be deleted after we support range join with states
+ def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+ plan match {
+ case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+ left.isStreaming && right.isStreaming
+ otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+ case _ => false
+ }
+ }
+
/**
* Checks for possible correctness issue in chained stateful operators. The
behavior is
* controlled by SQL config
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise,
Spark will just
* print a warning message.
*/
def checkStreamingQueryGlobalWatermarkLimit(
- plan: LogicalPlan,
- outputMode: OutputMode): Unit = {
+ plan: LogicalPlan): Unit = {
def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p
match {
- case s: Aggregate
- if s.isStreaming && outputMode == InternalOutputModes.Append => true
case Join(left, right, joinType, _, _)
Review Comment:
We can remove this line as we support outer join as well. We only have issue
with stream-stream time interval join (with all types) and
flatMapGroupsWithState.
(Arguably flatMapGroupsWithState followed by other stateful operator with
all output modes should be disallowed, but I believe we will have a separate
check for output mode so OK.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]