Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19925#discussion_r156779156
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.DataSource
+import
org.apache.spark.sql.execution.streaming.continuous.{ContinuousRateStreamPartitionOffset,
ContinuousRateStreamReader, RateStreamDataReader, RateStreamReadTask}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport,
DataSourceV2Options, MicroBatchReadSupport}
+import org.apache.spark.sql.streaming.StreamTest
+
+class RateSourceV2Suite extends StreamTest {
+ test("microbatch in registry") {
+ DataSource.lookupDataSource("rate",
spark.sqlContext.conf).newInstance() match {
+ case ds: MicroBatchReadSupport =>
+ val reader = ds.createMicroBatchReader(Optional.empty(), "",
DataSourceV2Options.empty())
+ assert(reader.isInstanceOf[RateStreamV2Reader])
+ case _ =>
+ throw new IllegalStateException("Could not find v2 read support
for rate")
+ }
+ }
+
+ test("microbatch - options propagated") {
+ val reader = new RateStreamV2Reader(
+ new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond"
-> "33").asJava))
+ reader.setOffsetRange(Optional.empty(),
+ Optional.of(LongOffset(System.currentTimeMillis() + 1001)))
+ val tasks = reader.createReadTasks()
+ assert(tasks.size == 11)
+ tasks.asScala.foreach {
+ // for 1 second, size of each task is (rowsPerSecond / numPartitions)
+ case RateStreamBatchTask(vals) => vals.size == 3
+ case _ => throw new IllegalStateException("Unexpected task type")
+ }
+ }
+
+ test("microbatch - set offset") {
+ val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+ reader.setOffsetRange(Optional.of(LongOffset(12345)),
Optional.of(LongOffset(54321)))
+ assert(reader.getStartOffset() == LongOffset(12345))
+ assert(reader.getEndOffset() == LongOffset(54321))
+ }
+
+ test("microbatch - infer offsets") {
+ val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+ reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
+ reader.setOffsetRange(Optional.empty(), Optional.empty())
+ assert(reader.getStartOffset() == LongOffset(reader.creationTimeMs))
+ assert(reader.getEndOffset().asInstanceOf[LongOffset].offset >=
reader.creationTimeMs + 100)
+ }
+
+
+ test("microbatch - data read") {
+ val reader = new RateStreamV2Reader(
+ new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond"
-> "33").asJava))
+ reader.setOffsetRange(Optional.empty(),
+ Optional.of(LongOffset(System.currentTimeMillis() + 1001)))
--- End diff --
this will be flaky when the Jenkins is overload. Could you add a clock
parameter to RateStreamV2Reader and use `ManualClock` instead so that we can
control the clock?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]