Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/7471#discussion_r35048780
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
---
@@ -72,15 +77,50 @@ class ReceiverTrackerSuite extends TestSuiteBase {
assert(locations(0).length === 1)
assert(locations(3).length === 1)
}
+
+ test("Receiver tracker - propagates rate limit") {
+ val newRateLimit = 100L
+ val ids = new TestReceiverInputDStream(ssc)
+ val tracker = new ReceiverTracker(ssc)
+ tracker.start()
+ eventually(timeout(5 seconds)) {
+ assert(TestDummyReceiver.started)
+ }
+ tracker.sendRateUpdate(ids.id, newRateLimit)
+ // this is an async message, we need to wait a bit for it to be
processed
+ eventually(timeout(3 seconds)) {
+ assert(ids.getCurrentRateLimit.get === newRateLimit)
+ }
+ }
}
+/** An input DStream with a hard-coded receiver that gives access to
internals for testing. */
+private class TestReceiverInputDStream(@transient ssc_ : StreamingContext)
+ extends ReceiverInputDStream[Int](ssc_) {
+
+ override def getReceiver(): DummyReceiver = TestDummyReceiver
+
+ def getCurrentRateLimit: Option[Long] = {
+ TestDummyReceiver.executor.getCurrentRateLimit
+ }
+}
+
+/**
+ * We need the receiver to be an object, otherwise serialization will
create another one
+ * and we won't be able to read its rate limit.
+ */
+private object TestDummyReceiver extends DummyReceiver
+
/**
* Dummy receiver implementation
*/
private class DummyReceiver(host: Option[String] = None)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+ var started = false
--- End diff --
This needs to be @volatile, otherwise waiting threads (one calling
`eventually`) may never see the changed value ---> flakiness
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]