Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/20675#discussion_r170830121
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
---
@@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
spark.sparkContext.addSparkListener(listener)
try {
testStream(df, useV2Sink = true)(
- StartStream(Trigger.Continuous(100)),
+ StartStream(longContinuousTrigger),
+ AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 2)),
+ IncrementEpoch(),
Execute { _ =>
// Wait until a task is started, then kill its first attempt.
eventually(timeout(streamingTimeout)) {
assert(taskId != -1)
}
spark.sparkContext.killTaskAttempt(taskId)
},
- ExpectFailure[SparkException] { e =>
- e.getCause != null &&
e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
- })
+ Execute(waitForRateSourceTriggers(_, 4)),
+ IncrementEpoch(),
+ // Check the answer exactly, if there's duplicated result,
CheckAnserRowsContains
+ // will also return true.
+ CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
--- End diff --
Actually I firstly use `CheckAnswer(0 to 19: _*)` here, but I found the
test case failure probably because the CP maybe not stop between Range(0, 20)
every time. See the logs below:
```
== Plan ==
== Parsed Logical Plan ==
WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
+- StreamingDataSourceV2Relation [timestamp#12, value#13L],
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
== Analyzed Logical Plan ==
WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
+- StreamingDataSourceV2Relation [timestamp#12, value#13L],
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
== Optimized Logical Plan ==
WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
+- StreamingDataSourceV2Relation [timestamp#12, value#13L],
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
== Physical Plan ==
WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- *(1) Project [value#13L]
+- *(1) DataSourceV2Scan [timestamp#12, value#13L],
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
ScalaTestFailureLocation: org.apache.spark.sql.streaming.StreamTest$class
at (StreamTest.scala:436)
org.scalatest.exceptions.TestFailedException:
== Results ==
!== Correct Answer - 20 == == Spark Answer - 25 ==
!struct<value:int> struct<value:bigint>
[0] [0]
[10] [10]
[11] [11]
[12] [12]
[13] [13]
[14] [14]
[15] [15]
[16] [16]
[17] [17]
[18] [18]
[19] [19]
[1] [1]
![2] [20]
![3] [21]
![4] [22]
![5] [23]
![6] [24]
![7] [2]
![8] [3]
![9] [4]
! [5]
! [6]
! [7]
! [8]
! [9]
== Progress ==
StartStream(ContinuousTrigger(3600000),org.apache.spark.util.SystemClock@343e225a,Map(),null)
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
AssertOnQuery(<condition>, )
=> CheckAnswer:
[0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19]
StopStream
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]