Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20828#discussion_r179599245
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
    @@ -53,32 +53,24 @@ class ContinuousSuiteBase extends StreamTest {
       // A continuous trigger that will only fire the initial time for the 
duration of a test.
       // This allows clean testing with manual epoch advancement.
       protected val longContinuousTrigger = Trigger.Continuous("1 hour")
    +
    +  override protected implicit val defaultTrigger = Trigger.Continuous(100)
    +  override protected val defaultUseV2Sink = true
     }
     
     class ContinuousSuite extends ContinuousSuiteBase {
       import testImplicits._
     
    -  test("basic rate source") {
    -    val df = spark.readStream
    -      .format("rate")
    -      .option("numPartitions", "5")
    -      .option("rowsPerSecond", "5")
    -      .load()
    -      .select('value)
    +  test("basic") {
    +    val input = MemoryStream[Int]
    --- End diff --
    
    Looking at this test code, its very confusing that this is using continuous 
memory stream, and not ordinary memory stream. The implicit ContinuousTrigger 
magic is not intuitive. I would rather have an explicitly 
    `ContinuousMemoryStream` rather than `MemoryStream` magically generating 
two different kinds based on different implicit values. And because of this 
polymorphism, multiple testsuites that do not have continuous stream tests had 
to be changed (functions returning MemoryStream had to return MemoryStreamBase).
    
    
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to