HeartSaVioR opened a new pull request #34333:
URL: https://github.com/apache/spark/pull/34333
### What changes were proposed in this pull request?
This PR proposes to introduce a new data source having short name as
"rate-epoch", which produces similar input rows as "rate" (increment long
values with timestamps), but ensures that each micro-batch has a "predictable"
set of input rows.
Rate-epoch data source receives a config to specify the number of rows per
epoch, which defines the set of input rows for further micro-batches. For
example, if the number of rows per epoch is set to 1000, the first batch would
have 1000 rows having value range as 0~999, the second batch would have 1000
rows having value range as 1000~1999, and so on. This characteristic brings
different use cases compared to rate data source, as we can't predict the input
rows for rate data source like this.
For generated time (timestamp column), the data source applies the same
mechanism to make the value of column be predictable. `startTimestamp` option
defines the starting value of generated time, and `advanceMillisPerEpoch`
option defines how much time the generated time should advance per epoch. All
input rows in the same micro-batch will have same timestamp.
This source supports the following options:
* `rowsPerEpoch` (e.g. 100): How many rows should be generated per epoch.
* `numPartitions` (e.g. 10, default: Spark's default parallelism): The
partition number for the generated rows.
* `startTimestamp` (e.g. 1000, default: 0): starting value of generated time
* `advanceMillisPerEpoch` (e.g. 1000, default: 1000): the amount of time
being advanced in generated time on each epoch.
### Why are the changes needed?
The "rate" data source has been known to be used as a benchmark for
streaming query.
While this helps to put the query to the limit (how many rows the query
could process per second), the rate data source doesn't provide consistent rows
per batch into stream, which leads two environments be hard to compare with.
For example, in many cases, you may want to compare the metrics in the
batches between test environments (like running same streaming query with
different options). These metrics are strongly affected if the distribution of
input rows in batches are changing, especially a micro-batch has been lagged
(in any reason) and rate data source produces more input rows to the next batch.
Also, when you test against streaming aggregation, you may want the data
source produces the same set of input rows per batch (deterministic), so that
you can plan how these input rows will be aggregated and how state rows will be
evicted, and craft the test query based on the plan.
### Does this PR introduce _any_ user-facing change?
Yes, end users can leverage a new data source in micro-batch mode of
streaming query to test/benchmark.
### How was this patch tested?
New UTs, and manually tested via below query in spark-shell:
```
spark.readStream.format("rate-epoch").option("rowsPerEpoch",
20).option("numPartitions", 3).load().writeStream.format("console").start()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]