WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1437952483


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite
     }
   }
 
-  test("stream-stream time interval join - output watermark for various 
intervals") {
-    def testOutputWatermarkInJoin(
-        df: DataFrame,
-        input: MemoryStream[(String, Timestamp)],
-        expectedOutputWatermark: Long): Unit = {
-      testStream(df)(
-        // dummy row to trigger execution
-        AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))),
-        CheckAnswer(),
-        Execute { query =>
-          val lastExecution = query.lastExecution
-          val joinOperator = lastExecution.executedPlan.collect {
-            case j: StreamingSymmetricHashJoinExec => j
-          }.head
+  test("SPARK-45637-1 join on window, append mode") {

Review Comment:
   ```
   -=-=-=-=-=-=-=
   currentBatchId = 0, lastExecutionRequiresAnotherBatch = false, 
isNewDataAvailable = true, shouldConstructNextBatch (with currbatchID) = true
   stateKeyWatermarkPredicateFunc: (input[0, 
struct<start:timestamp,end:timestamp>, false].end <= 0)
   stateKeyWatermarkPredicateFunc: (input[0, 
struct<start:timestamp,end:timestamp>, false].end <= 0)
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 0
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 0
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,0,4c4b40]
   this row filtered: [0,1000000018,0,0,4c4b40]
   getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,0,4c4b40]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   wei==thisrow: [0,1000000018,0,4c4b40,989680]
   this row filtered: [0,1000000018,0,4c4b40,989680]
   getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,1000000018,0,4c4b40,989680]
   allKeyToNumValues.hasNext: true
   ---------------removeByKeyCondition--------------- allKeyToNumValues.hasNext
   18:21:47.427 ERROR org.apache.spark.util.Utils: Aborting task
   java.lang.NullPointerException: Cannot invoke 
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()"
 because the return value of 
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()"
 is null
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   18:21:47.429 ERROR 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborting 
commit for partition 0 (task 2, attempt 0, stage 2.0)
   18:21:47.429 ERROR 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborted 
commit for partition 0 (task 2, attempt 0, stage 2.0)
   18:21:47.438 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 
in stage 2.0 (TID 2)
   java.lang.NullPointerException: Cannot invoke 
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()"
 because the return value of 
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()"
 is null
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   18:21:47.445 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 
in stage 2.0 (TID 2) (10.0.0.11 executor driver): 
java.lang.NullPointerException: Cannot invoke 
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()"
 because the return value of 
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()"
 is null
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
        at org.apache...
   18:21:47.446 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in 
stage 2.0 failed 1 times; aborting job
   18:21:47.450 ERROR 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec: Data 
source write support MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@553911d0] 
is aborting.
   18:21:47.450 ERROR 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec: Data 
source write support MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@553911d0] 
aborted.
   18:21:47.496 ERROR 
org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
716405ff-bb4c-4d94-8604-6d160c7b6ab9, runId = 
39081f98-c9a5-4def-a219-d5a119cbda74] terminated with error
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 
2) (10.0.0.11 executor driver): java.lang.NullPointerException: Cannot invoke 
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()"
 because the return value of 
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()"
 is null
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to