Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20689#discussion_r174585821
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
---
@@ -106,7 +106,19 @@ case class RateStreamContinuousDataReaderFactory(
partitionIndex: Int,
increment: Long,
rowsPerSecond: Double)
- extends DataReaderFactory[Row] {
+ extends ContinuousDataReaderFactory[Row] {
+
+ override def createDataReaderWithOffset(offset: PartitionOffset):
DataReader[Row] = {
+ val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset]
+ assert(rateStreamOffset.partition == partitionIndex)
--- End diff --
ditto
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]