Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/21194#discussion_r185851172
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
---
@@ -173,55 +173,154 @@ class RateSourceSuite extends StreamTest {
assert(readData.map(_.getLong(1)).sorted == Range(0, 33))
}
- test("valueAtSecond") {
+ test("valueAtSecond without ramp-up") {
import RateStreamProvider._
+ val rowsPerSec = Seq(1,10,50,100,1000,10000)
+ val secs = Seq(1, 10, 100, 1000, 10000, 100000)
+ for {
+ sec <- secs
+ rps <- rowsPerSec
+ } yield {
+ assert(valueAtSecond(seconds = sec, rowsPerSecond = rps,
rampUpTimeSeconds = 0) === sec * rps)
+ }
+ }
- assert(valueAtSecond(seconds = 0, rowsPerSecond = 5, rampUpTimeSeconds
= 0) === 0)
- assert(valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds
= 0) === 5)
+ test("valueAtSecond with ramp-up") {
+ import RateStreamProvider._
+ val rowsPerSec = Seq(1, 5, 10, 50, 100, 1000, 10000)
+ val rampUpSec = Seq(10, 100, 1000)
+
+ // for any combination, value at zero = 0
+ for {
+ rps <- rowsPerSec
+ rampUp <- rampUpSec
+ } yield {
+ assert(valueAtSecond(seconds = 0, rowsPerSecond = rps,
rampUpTimeSeconds = rampUp) === 0)
+ }
- assert(valueAtSecond(seconds = 0, rowsPerSecond = 5, rampUpTimeSeconds
= 2) === 0)
- assert(valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds
= 2) === 1)
- assert(valueAtSecond(seconds = 2, rowsPerSecond = 5, rampUpTimeSeconds
= 2) === 3)
- assert(valueAtSecond(seconds = 3, rowsPerSecond = 5, rampUpTimeSeconds
= 2) === 8)
--- End diff --
I try your implement local and it changes the original behavior
```
valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds = 2) = 1
valueAtSecond(seconds = 2, rowsPerSecond = 5, rampUpTimeSeconds = 2) = 5
valueAtSecond(seconds = 3, rowsPerSecond = 5, rampUpTimeSeconds = 2) = 10
valueAtSecond(seconds = 4, rowsPerSecond = 5, rampUpTimeSeconds = 2) = 15
```
I think the bug fix should not change the value on `seconds >
rampUpTimeSeconds`, just my opinion, you can ping other committers to review.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]