HeartSaVioR commented on a change in pull request #25048: [SPARK-28247][SS] Fix
flaky test "query without test harness" on ContinuousSuite
URL: https://github.com/apache/spark/pull/25048#discussion_r302211118
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
##########
@@ -40,14 +40,43 @@ class ContinuousSuiteBase extends StreamTest {
protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers:
Int): Unit = {
query match {
case s: ContinuousExecution =>
- assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure
query is initialized")
- val reader = s.lastExecution.executedPlan.collectFirst {
- case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r
- }.get
+ s.awaitEpoch(0)
+ // This is called after waiting first epoch to be committed, so we can
just treat
+ // it as partition readers for rate source are already initialized.
+ val firstCommittedTime = System.currentTimeMillis()
val deltaMs = numTriggers * 1000 + 300
- while (System.currentTimeMillis < reader.creationTime + deltaMs) {
- Thread.sleep(reader.creationTime + deltaMs -
System.currentTimeMillis)
+ while (System.currentTimeMillis < firstCommittedTime + deltaMs) {
+ Thread.sleep(firstCommittedTime + deltaMs - System.currentTimeMillis)
+ }
+ }
+ }
+
+ protected def waitForRateSourceCommittedValue(
+ query: StreamExecution,
+ desiredValue: Long,
+ maxWaitTimeMs: Long): Unit = {
+ def readHighestCommittedValue(c: ContinuousExecution): Option[Long] = {
+ c.committedOffsets.lastOption.map { case (_, offset) =>
+ offset match {
+ case o: RateStreamOffset =>
+ o.partitionToValueAndRunTimeMs.map {
+ case (_, ValueRunTimeMsPair(value, _)) => value
+ }.max
+ }
+ }
+ }
+
+ query match {
+ case c: ContinuousExecution =>
Review comment:
It might make callers a bit more verbose than handling it from here, but not
a big deal and better in perspective of avoiding no-op when misused. I'll make
a change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]