Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12725#discussion_r62758276
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with 
SharedSQLContext {
         }
       }
     
    -  // This would fail for now -- error is "Timed out waiting for stream"
    -  // Root cause is that data generated in batch 0 may not get processed in 
batch 1
    -  // Let's enable this after SPARK-14942: Reduce delay between batch 
construction and execution
    -  ignore("minimize delay between batch construction and execution") {
    +  test("minimize delay between batch construction and execution") {
    +
    +    // For each batch, we would retrieve new data's offsets and log them 
before we run the execution
    +    // This checks whether the key of the offset log is the expected batch 
id
    +    def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery =
    +      AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
    +        s"offsetLog's latest should be $expectedId")
    +
    +    // For each batch, we would log the state change during the execution
    +    // This checks whether the key of the state change log is the expected 
batch id
    +    def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): 
AssertOnQuery =
    +      
AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId 
== expectedId,
    +        s"lastExecution's currentBatchId should be $expectedId")
    +
    +    // For each batch, we would log the sink change after the execution
    +    // This checks whether the key of the sink change log is the expected 
batch id
    +    def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery =
    +      AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == 
expectedId,
    +        s"sink's lastBatchId should be $expectedId")
    +
         val inputData = MemoryStream[Int]
         testStream(inputData.toDS())(
           StartStream(ProcessingTime("10 seconds"), new ManualClock),
    +
           /* -- batch 0 ----------------------- */
    -      AddData(inputData, 1),
    -      AddData(inputData, 2),
    -      AddData(inputData, 3),
    +      // Add some data in batch 0
    +      AddData(inputData, 1, 2, 3),
           AdvanceManualClock(10 * 1000), // 10 seconds
    +
           /* -- batch 1 ----------------------- */
    -      CheckAnswer(1, 2, 3))
    +      // Check the results of batch 0
    +      CheckAnswer(1, 2, 3),
    +      CheckIncrementalExecutionCurrentBatchId(0),
    +      CheckOffsetLogLatestBatchId(0),
    +      CheckSinkLatestBatchId(0),
    +      // Add some data in batch 1
    +      AddData(inputData, 4, 5, 6),
    +      AdvanceManualClock(10 * 1000),
    +
    +      /* -- batch _ ----------------------- */
    +      // Check the results of batch 1
    +      CheckAnswer(1, 2, 3, 4, 5, 6),
    +      CheckIncrementalExecutionCurrentBatchId(1),
    +      CheckOffsetLogLatestBatchId(1),
    +      CheckSinkLatestBatchId(1),
    +
    +      AdvanceManualClock(10 * 1000),
    +      AdvanceManualClock(10 * 1000),
    +      AdvanceManualClock(10 * 1000),
    +
    +      /* -- batch __ ---------------------- */
    +      // Check the results of batch 1 again; this is to make sure that, 
when there's no new data,
    +      // the currentId does not get logged (e.g. as 2) even if the clock 
has advanced many times
    +      CheckAnswer(1, 2, 3, 4, 5, 6),
    +      CheckIncrementalExecutionCurrentBatchId(1),
    +      CheckOffsetLogLatestBatchId(1),
    +      CheckSinkLatestBatchId(1),
    +
    +      /* Stop then restart the Stream  */
    +      StopStream,
    +      StartStream(ProcessingTime("10 seconds"), new ManualClock),
    +
    +      /* -- batch 1 rerun ----------------- */
    --- End diff --
    
    I'm wondering if we can avoid to rerun a batch that has already finished 
before stopping. How about storing the offsets after finishing a batch instead 
of storing it before running a batch? @marmbrus what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to