Github user dragos commented on a diff in the pull request:
https://github.com/apache/spark/pull/8199#discussion_r37077457
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
---
@@ -36,72 +36,89 @@ class PIDRateEstimatorSuite extends SparkFunSuite with
Matchers {
test("estimator checks ranges") {
intercept[IllegalArgumentException] {
- new PIDRateEstimator(0, 1, 2, 3)
+ new PIDRateEstimator(batchIntervalMillis = 0, 1, 2, 3, 10)
}
intercept[IllegalArgumentException] {
- new PIDRateEstimator(100, -1, 2, 3)
+ new PIDRateEstimator(100, proportional = -1, 2, 3, 10)
}
intercept[IllegalArgumentException] {
- new PIDRateEstimator(100, 0, -1, 3)
+ new PIDRateEstimator(100, 0, integral = -1, 3, 10)
}
intercept[IllegalArgumentException] {
- new PIDRateEstimator(100, 0, 0, -1)
+ new PIDRateEstimator(100, 0, 0, derivative = -1, 10)
+ }
+ intercept[IllegalArgumentException] {
+ new PIDRateEstimator(100, 0, 0, 0, minRate = 0)
+ }
+ intercept[IllegalArgumentException] {
+ new PIDRateEstimator(100, 0, 0, 0, minRate = -10)
}
}
- private def createDefaultEstimator: PIDRateEstimator = {
- new PIDRateEstimator(20, 1D, 0D, 0D)
- }
-
- test("first bound is None") {
- val p = createDefaultEstimator
+ test("first estimate is None") {
+ val p = createDefaultEstimator()
p.compute(0, 10, 10, 0) should equal(None)
}
- test("second bound is rate") {
- val p = createDefaultEstimator
+ test("second estimate is not None") {
+ val p = createDefaultEstimator()
p.compute(0, 10, 10, 0)
// 1000 elements / s
p.compute(10, 10, 10, 0) should equal(Some(1000))
}
- test("works even with no time between updates") {
- val p = createDefaultEstimator
+ test("no estimate when no time difference between successive calls") {
+ val p = createDefaultEstimator()
+ p.compute(0, 10, 10, 0)
+ p.compute(time = 10, 10, 10, 0) shouldNot equal(None)
+ p.compute(time = 10, 10, 10, 0) should equal(None)
+ }
+
+ test("no estimate when no records in previous batch") {
+ val p = createDefaultEstimator()
p.compute(0, 10, 10, 0)
- p.compute(10, 10, 10, 0)
- p.compute(10, 10, 10, 0) should equal(None)
+ p.compute(10, numElements = 0, 10, 0) should equal(None)
+ p.compute(20, numElements = -10, 10, 0) should equal(None)
}
- test("bound is never negative") {
- val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+ test("no estimate when there is no processing delay") {
+ val p = createDefaultEstimator()
+ p.compute(0, 10, 10, 0)
+ p.compute(10, 10, processingDelay = 0, 0) should equal(None)
+ p.compute(20, 10, processingDelay = -10, 0) should equal(None)
+ }
+
+ test("estimate is never less than min rate") {
+ val minRate = 5D
+ val p = new PIDRateEstimator(20, 1D, 1D, 0D, minRate)
// prepare a series of batch updates, one every 20ms, 0 processed
elements, 2ms of processing
// this might point the estimator to try and decrease the bound, but
we test it never
- // goes below zero, which would be nonsensical.
+ // goes below the min rate, which would be nonsensical.
val times = List.tabulate(50)(x => x * 20) // every 20ms
- val elements = List.fill(50)(0) // no processing
+ val elements = List.fill(50)(1) // no processing
--- End diff --
Oh, I missed it, the diff algorithm is a bit confusing. Ok then.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]