Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/7913#discussion_r36265676
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
---
@@ -67,69 +62,73 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
-/**
- * An input DStream with a hard-coded receiver that gives access to
internals for testing.
- *
- * @note Make sure to call {{{SingletonDummyReceiver.reset()}}} before
using this in a test,
- * or otherwise you may get {{{NotSerializableException}}} when
trying to serialize
- * the receiver.
- * @see [[[SingletonDummyReceiver]]].
- */
-private[streaming] class RateLimitInputDStream(@transient ssc_ :
StreamingContext)
+/** An input DStream with for testing rate controlling */
+private[streaming] class RateTestInputDStream(@transient ssc_ :
StreamingContext)
extends ReceiverInputDStream[Int](ssc_) {
- override def getReceiver(): RateTestReceiver = SingletonTestRateReceiver
-
- def getCurrentRateLimit: Option[Long] = {
- invokeExecutorMethod.getCurrentRateLimit
- }
+ override def getReceiver(): Receiver[Int] = new RateTestReceiver(id)
@volatile
- var publishCalls = 0
+ var publishedRates = 0
override val rateController: Option[RateController] = {
- Some(new RateController(id, new ConstantEstimator(100.0)) {
+ Some(new RateController(id, new ConstantEstimator(100)) {
override def publish(rate: Long): Unit = {
- publishCalls += 1
+ publishedRates += 1
}
})
}
+}
- private def invokeExecutorMethod: ReceiverSupervisor = {
- val c = classOf[Receiver[_]]
- val ex = c.getDeclaredMethod("executor")
- ex.setAccessible(true)
- ex.invoke(SingletonTestRateReceiver).asInstanceOf[ReceiverSupervisor]
+/** A receiver implementation for testing rate controlling */
+private[streaming] class RateTestReceiver(receiverId: Int, host:
Option[String] = None)
+ extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+ private lazy val customBlockGenerator = supervisor.createBlockGenerator(
+ new BlockGeneratorListener {
+ override def onPushBlock(blockId: StreamBlockId, arrayBuffer:
ArrayBuffer[_]): Unit = {}
+ override def onError(message: String, throwable: Throwable): Unit =
{}
+ override def onGenerateBlock(blockId: StreamBlockId): Unit = {}
+ override def onAddData(data: Any, metadata: Any): Unit = {}
+ }
+ )
+
+ setReceiverId(receiverId)
+
+ override def onStart(): Unit = {
+ customBlockGenerator
+ RateTestReceiver.registerReceiver(this)
}
-}
-/**
- * A Receiver as an object so we can read its rate limit. Make sure to
call `reset()` when
- * reusing this receiver, otherwise a non-null `executor_` field will
prevent it from being
- * serialized when receivers are installed on executors.
- *
- * @note It's necessary to be a top-level object, or else serialization
would create another
- * one on the executor side and we won't be able to read its rate
limit.
- */
-private[streaming] object SingletonTestRateReceiver extends
RateTestReceiver(0) {
--- End diff --
@dragos Removed the singleton as it was making it hard to reason about
serialization issues, and reuse issues. Rather reimplemented this test such
that a newly start receiver just registers itself to the object, so that we can
access the currently running receiver and its rate through the object methods.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]