Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19271#discussion_r139803500
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
    @@ -114,6 +115,16 @@ class IncrementalExecution(
               stateInfo = Some(nextStatefulOperationStateInfo),
               batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
               eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
    +
    +      case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, 
_, left, right) =>
    +        j.copy(
    +          stateInfo = Some(nextStatefulOperationStateInfo),
    --- End diff --
    
    So, I think this may not be robust. At each trigger, we create a new 
IncrementalExecution, so the `statefulOperatorId` we hope gets incremented in a 
deterministic manner.
    I can imagine adding things to the Optimizer in the future which may move 
an `EquiJoin` before an aggregation.
    In this case, the state store id's of the aggregation and join may switch. 
I'm not sure if we're protected against that somehow, so just wanted to bring 
it up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to