Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184006570 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { - val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) + val input = ContinuousMemoryStream[Int] + val df = input.toDF().map(_.getInt(0) * 2) - testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_)))) + testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).map(_ * 2): _*)) } test("flatMap") { - val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2)) + val input = ContinuousMemoryStream[Int] + val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 2)) - testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n * 2)).map(Row(_)))) + testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*)) } test("filter") { - val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .where('value > 5) + val input = ContinuousMemoryStream[Int] + val df = input.toDF().where('value > 5) --- End diff -- I intended to use untyped filter because of SPARK-24061. Once #21136 is merged we could change this, but not sure we want to have both untyped and typed for every tests.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org