Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20097#discussion_r159981274
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
---
@@ -28,17 +28,38 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateStreamOffset,
ValueRunTimeMsPair}
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader,
Offset}
import org.apache.spark.sql.types.{LongType, StructField, StructType,
TimestampType}
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{ManualClock, SystemClock}
-class RateStreamV2Reader(options: DataSourceV2Options)
+/**
+ * This is a temporary register as we build out v2 migration. Microbatch
read support should
+ * be implemented in the same register as v1.
+ */
+class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport
with DataSourceRegister {
+ override def createMicroBatchReader(
+ schema: Optional[StructType],
+ checkpointLocation: String,
+ options: DataSourceV2Options): MicroBatchReader = {
+ new MicroBatchRateStreamReader(options)
+ }
+
+ override def shortName(): String = "ratev2"
+}
+
+class MicroBatchRateStreamReader(options: DataSourceV2Options)
--- End diff --
As with the other kafka PR, can you rename these classes to start with
"RateStream"? Only if it is not too much refactoring, otherwise we can clean
this up later.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]