WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1437952483
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite
}
}
- test("stream-stream time interval join - output watermark for various
intervals") {
- def testOutputWatermarkInJoin(
- df: DataFrame,
- input: MemoryStream[(String, Timestamp)],
- expectedOutputWatermark: Long): Unit = {
- testStream(df)(
- // dummy row to trigger execution
- AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))),
- CheckAnswer(),
- Execute { query =>
- val lastExecution = query.lastExecution
- val joinOperator = lastExecution.executedPlan.collect {
- case j: StreamingSymmetricHashJoinExec => j
- }.head
+ test("SPARK-45637-1 join on window, append mode") {
Review Comment:
```
-=-=-=-=-=-=-=
currentBatchId = 0, lastExecutionRequiresAnotherBatch = false,
isNewDataAvailable = true, shouldConstructNextBatch (with currbatchID) = true
stateKeyWatermarkPredicateFunc: (input[0,
struct<start:timestamp,end:timestamp>, false].end <= 0)
stateKeyWatermarkPredicateFunc: (input[0,
struct<start:timestamp,end:timestamp>, false].end <= 0)
wei==thisrow: [0,1000000018,0,0,4c4b40]
this row filtered: [0,1000000018,0,0,4c4b40]
getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 0
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,0,4c4b40]
wei==thisrow: [0,1000000018,0,4c4b40,989680]
this row filtered: [0,1000000018,0,4c4b40,989680]
getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 0
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,4c4b40,989680]
wei==thisrow: [0,1000000018,0,0,4c4b40]
this row filtered: [0,1000000018,0,0,4c4b40]
getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,0,4c4b40]
wei==thisrow: [0,1000000018,0,0,4c4b40]
this row filtered: [0,1000000018,0,0,4c4b40]
getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,0,4c4b40]
wei==thisrow: [0,1000000018,0,0,4c4b40]
this row filtered: [0,1000000018,0,0,4c4b40]
getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,0,4c4b40]
wei==thisrow: [0,1000000018,0,0,4c4b40]
this row filtered: [0,1000000018,0,0,4c4b40]
getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,0,4c4b40]
wei==thisrow: [0,1000000018,0,0,4c4b40]
this row filtered: [0,1000000018,0,0,4c4b40]
getJoinedRows ------ key: [0,1000000018,0,0,4c4b40] numValues: 1
joinedRow: {[0,1000000018,0,0,4c4b40] + [0,1000000018,0,0,4c4b40]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,0,4c4b40]
wei==thisrow: [0,1000000018,0,4c4b40,989680]
this row filtered: [0,1000000018,0,4c4b40,989680]
getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,4c4b40,989680]
wei==thisrow: [0,1000000018,0,4c4b40,989680]
this row filtered: [0,1000000018,0,4c4b40,989680]
getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,4c4b40,989680]
wei==thisrow: [0,1000000018,0,4c4b40,989680]
this row filtered: [0,1000000018,0,4c4b40,989680]
getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,4c4b40,989680]
wei==thisrow: [0,1000000018,0,4c4b40,989680]
this row filtered: [0,1000000018,0,4c4b40,989680]
getJoinedRows ------ key: [0,1000000018,0,4c4b40,989680] numValues: 1
joinedRow: {[0,1000000018,0,4c4b40,989680] + [0,1000000018,0,4c4b40,989680]}
---joinedRow emitted
!stateKeyWatermarkPredicateFunc(key): true
!stateValueWatermarkPredicateFunc(thisRow): true
wei==add to state: [0,1000000018,0,4c4b40,989680]
allKeyToNumValues.hasNext: true
---------------removeByKeyCondition--------------- allKeyToNumValues.hasNext
18:21:47.427 ERROR org.apache.spark.util.Utils: Aborting task
java.lang.NullPointerException: Cannot invoke
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()"
because the return value of
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()"
is null
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
18:21:47.429 ERROR
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborting
commit for partition 0 (task 2, attempt 0, stage 2.0)
18:21:47.429 ERROR
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborted
commit for partition 0 (task 2, attempt 0, stage 2.0)
18:21:47.438 ERROR org.apache.spark.executor.Executor: Exception in task 0.0
in stage 2.0 (TID 2)
java.lang.NullPointerException: Cannot invoke
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()"
because the return value of
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()"
is null
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
18:21:47.445 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0
in stage 2.0 (TID 2) (10.0.0.11 executor driver):
java.lang.NullPointerException: Cannot invoke
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()"
because the return value of
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()"
is null
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
at org.apache...
18:21:47.446 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in
stage 2.0 failed 1 times; aborting job
18:21:47.450 ERROR
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec: Data
source write support MicroBatchWrite[epoch: 0, writer:
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@553911d0]
is aborting.
18:21:47.450 ERROR
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec: Data
source write support MicroBatchWrite[epoch: 0, writer:
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@553911d0]
aborted.
18:21:47.496 ERROR
org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id =
716405ff-bb4c-4d94-8604-6d160c7b6ab9, runId =
39081f98-c9a5-4def-a219-d5a119cbda74] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID
2) (10.0.0.11 executor driver): java.lang.NullPointerException: Cannot invoke
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyAndNumValues.key()"
because the return value of
"org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKeyToNumValue()"
is null
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.currentKey(SymmetricHashJoinStateManager.scala:155)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:178)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$1.getNext(SymmetricHashJoinStateManager.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1149)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:479)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]