Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139803500 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => + j.copy( + stateInfo = Some(nextStatefulOperationStateInfo), --- End diff -- So, I think this may not be robust. At each trigger, we create a new IncrementalExecution, so the `statefulOperatorId` we hope gets incremented in a deterministic manner. I can imagine adding things to the Optimizer in the future which may move an `EquiJoin` before an aggregation. In this case, the state store id's of the aggregation and join may switch. I'm not sure if we're protected against that somehow, so just wanted to bring it up.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org