Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20675#discussion_r170830121
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
    @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
         spark.sparkContext.addSparkListener(listener)
         try {
           testStream(df, useV2Sink = true)(
    -        StartStream(Trigger.Continuous(100)),
    +        StartStream(longContinuousTrigger),
    +        AwaitEpoch(0),
             Execute(waitForRateSourceTriggers(_, 2)),
    +        IncrementEpoch(),
             Execute { _ =>
               // Wait until a task is started, then kill its first attempt.
               eventually(timeout(streamingTimeout)) {
                 assert(taskId != -1)
               }
               spark.sparkContext.killTaskAttempt(taskId)
             },
    -        ExpectFailure[SparkException] { e =>
    -          e.getCause != null && 
e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
    -        })
    +        Execute(waitForRateSourceTriggers(_, 4)),
    +        IncrementEpoch(),
    +        // Check the answer exactly, if there's duplicated result, 
CheckAnserRowsContains
    +        // will also return true.
    +        CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
    --- End diff --
    
    Actually I firstly use `CheckAnswer(0 to 19: _*)` here, but I found the 
test case failure probably because the CP maybe not stop between Range(0, 20) 
every time. See the logs below:
    ```
    == Plan ==
    == Parsed Logical Plan ==
    WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
    +- Project [value#13L]
       +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
    
    == Analyzed Logical Plan ==
    WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
    +- Project [value#13L]
       +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
    
    == Optimized Logical Plan ==
    WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
    +- Project [value#13L]
       +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
    
    == Physical Plan ==
    WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
    +- *(1) Project [value#13L]
       +- *(1) DataSourceV2Scan [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
             
             
    ScalaTestFailureLocation: org.apache.spark.sql.streaming.StreamTest$class 
at (StreamTest.scala:436)
    org.scalatest.exceptions.TestFailedException: 
    
    == Results ==
    !== Correct Answer - 20 ==   == Spark Answer - 25 ==
    !struct<value:int>           struct<value:bigint>
     [0]                         [0]
     [10]                        [10]
     [11]                        [11]
     [12]                        [12]
     [13]                        [13]
     [14]                        [14]
     [15]                        [15]
     [16]                        [16]
     [17]                        [17]
     [18]                        [18]
     [19]                        [19]
     [1]                         [1]
    ![2]                         [20]
    ![3]                         [21]
    ![4]                         [22]
    ![5]                         [23]
    ![6]                         [24]
    ![7]                         [2]
    ![8]                         [3]
    ![9]                         [4]
    !                            [5]
    !                            [6]
    !                            [7]
    !                            [8]
    !                            [9]
        
    
    == Progress ==
       
StartStream(ContinuousTrigger(3600000),org.apache.spark.util.SystemClock@343e225a,Map(),null)
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
    => CheckAnswer: 
[0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19]
       StopStream
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to