Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/7600#discussion_r35696749
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.scalatest.Matchers._
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+class RateControllerSuite extends TestSuiteBase {
+
+ override def actuallyWait: Boolean = true
+
+ test("rate controller publishes updates") {
+ val ssc = new StreamingContext(conf, batchDuration)
+ withStreamingContext(ssc) { ssc =>
+ val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1)
+ val output = new TestOutputStreamWithPartitions(dstream)
+ output.register()
+ runStreams(ssc, 1, 1)
+
+ eventually(timeout(2.seconds)) {
+ assert(dstream.publishCalls === 1)
+ }
+ }
+ }
+
+ test("publish rates reach receivers") {
+ val ssc = new StreamingContext(conf, batchDuration)
+ withStreamingContext(ssc) { ssc =>
+ val dstream = new RateLimitInputDStream(ssc) {
+ override val rateController =
+ Some(new ReceiverRateController(id, new
ConstantEstimator(200.0)))
+ }
+ SingletonDummyReceiver.reset()
+
+ val output = new TestOutputStreamWithPartitions(dstream)
+ output.register()
+ runStreams(ssc, 2, 2)
+
+ eventually(timeout(5.seconds)) {
+ assert(dstream.getCurrentRateLimit === Some(200))
+ }
+ }
+ }
+
+ test("multiple publish rates reach receivers") {
+ val ssc = new StreamingContext(conf, batchDuration)
+ withStreamingContext(ssc) { ssc =>
+ val rates = Seq(100L, 200L, 300L)
+
+ val dstream = new RateLimitInputDStream(ssc) {
+ override val rateController =
+ Some(new ReceiverRateController(id, new
ConstantEstimator(rates.map(_.toDouble): _*)))
+ }
+ SingletonDummyReceiver.reset()
+
+ val output = new TestOutputStreamWithPartitions(dstream)
+ output.register()
+
+ val observedRates = mutable.HashSet.empty[Long]
+
+ @volatile var done = false
+ runInBackground {
+ while (!done) {
+ try {
+ dstream.getCurrentRateLimit.foreach(observedRates += _)
+ } catch {
+ case NonFatal(_) => () // don't stop if the executor wasn't
installed yet
+ }
+ Thread.sleep(20)
--- End diff --
This is going to be a flaky test. There is no guarantee that this 20 ms
sleep will be 20 ms. Can very well be 100 ms. And it cannot be guaranteed that
will always get a the 100, 200, 300 will be received by receiver by the time 4
batches are over through `runStreams`. This going to be SUPER FLAKY. The
non-flaky way of doing this
1. run streamingContext normally in real time (no real need for
`runStreams`)
2. `eventually { observedRates should contain theSameElementsAs (rates :+
Long.MaxValue) }`
3. `observedRates` must be a synchronized hashset.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]