Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7471#discussion_r35177346
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 ---
    @@ -72,8 +78,62 @@ class ReceiverTrackerSuite extends TestSuiteBase {
         assert(locations(0).length === 1)
         assert(locations(3).length === 1)
       }
    +
    +  test("Receiver tracker - propagates rate limit") {
    +    object streamingListener extends StreamingListener {
    +      @volatile
    +      var started = false
    +
    +      override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
    +        started = true
    +      }
    +    }
    +
    +    ssc.addStreamingListener(streamingListener)
    +    ssc.scheduler.listenerBus.start(ssc.sc)
    +
    +    val newRateLimit = 100L
    +    val ids = new TestReceiverInputDStream(ssc)
    +    val tracker = new ReceiverTracker(ssc)
    +    tracker.start()
    +
    +    // we wait until the Receiver has registered with the tracker,
    +    // otherwise our rate update is lost
    +    eventually(timeout(5 seconds)) {
    +      assert(streamingListener.started)
    +    }
    +    tracker.sendRateUpdate(ids.id, newRateLimit)
    +    // this is an async message, we need to wait a bit for it to be 
processed
    +    eventually(timeout(3 seconds)) {
    +      assert(ids.getCurrentRateLimit.get === newRateLimit)
    +    }
    +  }
     }
     
    +/** An input DStream with a hard-coded receiver that gives access to 
internals for testing. */
    +private class TestReceiverInputDStream(@transient ssc_ : StreamingContext)
    --- End diff --
    
    TestReceiverInputDStream --> DummyReceiverInputDStream


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to