HeartSaVioR commented on a change in pull request #25725:
[SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow receiver
has been initialized, but with hard timeout
URL: https://github.com/apache/spark/pull/25725#discussion_r322515449
##########
File path:
streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
##########
@@ -974,6 +977,7 @@ class SlowTestReceiver(totalRecords: Int,
recordsPerSecond: Int)
}
receivingThreadOption = Some(thread)
thread.start()
+ SlowTestReceiver.initialized = true
Review comment:
If we consider only timing, technically it can be placed anywhere, even
first line of `onStart()`, as whether receiver is registered within time or not
is the key. For verification, it doesn't make existing test failing without
patch even we add `Thread.sleep(1000)` in first line of `onStart()`.
So no strong opinion on where to put.
Btw, maybe we can apply "more verbose but clearer" solution (without this
flag) via adding below code in test side:
```
// tracks whether the receiver is started or not
var isReceiverStarted = false
val listener = new StreamingListener {
override def onReceiverStarted(receiverStarted:
StreamingListenerReceiverStarted): Unit = {
isReceiverStarted = receiverStarted.receiverInfo.name.startsWith(
input.getReceiver().getClass.getSimpleName)
}
}
ssc.addStreamingListener(listener)
ssc.start()
ssc.awaitTerminationOrTimeout(500)
eventually(timeout(10.seconds), interval(10.millis)) {
assert(isReceiverStarted)
}
```
Which one do you think is better?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]