This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e58f12d1843 [SPARK-41411][SS] Multi-Stateful Operator watermark 
support bug fix
e58f12d1843 is described below

commit e58f12d1843af39ed4ac0c2ff108490ae303e7e4
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Wed Dec 7 08:35:21 2022 +0900

    [SPARK-41411][SS] Multi-Stateful Operator watermark support bug fix
    
    ### What changes were proposed in this pull request?
    
    Fix a typo in passing event time watermark 
to`StreamingSymmetricHashJoinExec` that causes logic errors.
    
    ### Why are the changes needed?
    
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    Closes #38945 from WweiL/multi-stateful-ops-fix.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../org/apache/spark/sql/execution/streaming/IncrementalExecution.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 574709d05b0..e5e4dc7d0dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -242,7 +242,7 @@ class IncrementalExecution(
         j.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = 
Some(eventTimeWatermarkForLateEvents),
+          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
           stateWatermarkPredicates =
             StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
               j.left.output, j.right.output, j.leftKeys, j.rightKeys, 
j.condition.full,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to