Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20675#discussion_r170665692
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
---
@@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
spark.sparkContext.addSparkListener(listener)
try {
testStream(df, useV2Sink = true)(
- StartStream(Trigger.Continuous(100)),
+ StartStream(longContinuousTrigger),
+ AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 2)),
+ IncrementEpoch(),
Execute { _ =>
// Wait until a task is started, then kill its first attempt.
eventually(timeout(streamingTimeout)) {
assert(taskId != -1)
}
spark.sparkContext.killTaskAttempt(taskId)
},
- ExpectFailure[SparkException] { e =>
- e.getCause != null &&
e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
- })
+ Execute(waitForRateSourceTriggers(_, 4)),
+ IncrementEpoch(),
+ // Check the answer exactly, if there's duplicated result,
CheckAnserRowsContains
+ // will also return true.
+ CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
--- End diff --
Checking exact answer can just be `CheckAnswer(0 to 20: _*)`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]