Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/4338#discussion_r24029758
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
---
@@ -319,6 +346,38 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}
+/** Custom receiver for testing whether a slow receiver can be shutdown
gracefully or not */
+class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends
Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
+
+ var receivingThreadOption: Option[Thread] = None
+
+ def onStart() {
+ val thread = new Thread() {
+ override def run() {
+ logInfo("Receiving started")
+ for(i <- 1 to totalRecords) {
+ Thread.sleep(recordsPerSecond * 1000)
--- End diff --
I dont get the logic here. Should the delay be inverse of recordsPerSecond?
Higher the rate, lower the delay?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]