Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7471#discussion_r35177402
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 ---
    @@ -72,8 +78,62 @@ class ReceiverTrackerSuite extends TestSuiteBase {
         assert(locations(0).length === 1)
         assert(locations(3).length === 1)
       }
    +
    +  test("Receiver tracker - propagates rate limit") {
    +    object streamingListener extends StreamingListener {
    +      @volatile
    +      var started = false
    +
    +      override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
    +        started = true
    +      }
    +    }
    +
    +    ssc.addStreamingListener(streamingListener)
    +    ssc.scheduler.listenerBus.start(ssc.sc)
    +
    +    val newRateLimit = 100L
    +    val ids = new TestReceiverInputDStream(ssc)
    +    val tracker = new ReceiverTracker(ssc)
    +    tracker.start()
    +
    +    // we wait until the Receiver has registered with the tracker,
    +    // otherwise our rate update is lost
    +    eventually(timeout(5 seconds)) {
    +      assert(streamingListener.started)
    +    }
    +    tracker.sendRateUpdate(ids.id, newRateLimit)
    +    // this is an async message, we need to wait a bit for it to be 
processed
    +    eventually(timeout(3 seconds)) {
    +      assert(ids.getCurrentRateLimit.get === newRateLimit)
    +    }
    +  }
     }
     
    +/** An input DStream with a hard-coded receiver that gives access to 
internals for testing. */
    +private class TestReceiverInputDStream(@transient ssc_ : StreamingContext)
    +  extends ReceiverInputDStream[Int](ssc_) {
    +
    +  override def getReceiver(): DummyReceiver = TestDummyReceiver
    +
    +  def getCurrentRateLimit: Option[Long] = {
    +    invokeExecutorMethod.getCurrentRateLimit
    +  }
    +
    +  private def invokeExecutorMethod: ReceiverSupervisor = {
    +    val c = classOf[Receiver[_]]
    +    val ex = c.getDeclaredMethod("executor")
    +    ex.setAccessible(true)
    +    ex.invoke(TestDummyReceiver).asInstanceOf[ReceiverSupervisor]
    +  }
    +}
    +
    +/**
    + * We need the receiver to be an object, otherwise serialization will 
create another one
    --- End diff --
    
    nit: make it more scala docs style to begin with. "A receiver as an object, 
so that we can read its rate limit. Otherwise,.."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to