jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052963344


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##########
@@ -1762,4 +1344,173 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
       }
     }
   }
+
+  test("test gaps in offset log") {
+    val inputData = MemoryStream[Int]
+    val streamEvent = inputData.toDF().select("value")
+
+    val resourceUri = this.getClass.getResource(
+      
"/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/").toURI
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+    testStream(streamEvent, extraOptions = Map(
+      ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
+      ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS -> "0"
+    ))(
+      AddData(inputData, 0),
+      AddData(inputData, 1),
+      AddData(inputData, 2),
+      AddData(inputData, 3),
+      AddData(inputData, 4),
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+      CheckAnswer(3, 4)
+    )
+
+  }
+
+  test("test multiple gaps in offset and commit logs") {
+    val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val ds = inputData.toDS()
+
+    val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+    // create a scenario in which the offset log only
+    // contains batch 0, 2, 5 and commit log only contain 0, 2
+    testStream(ds, extraOptions = Map(
+      ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
+      ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS -> "0"
+    ))(
+      StartStream(checkpointLocation = checkpointLocation),
+      AddData(inputData, 0),
+      CheckNewAnswer(0),
+      AddData(inputData, 1),
+      CheckNewAnswer(1),
+      AddData(inputData, 2),
+      CheckNewAnswer(2),
+      AddData(inputData, 3),
+      CheckNewAnswer(3),
+      AddData(inputData, 4),
+      CheckNewAnswer(4),
+      AddData(inputData, 5),
+      CheckNewAnswer(5),
+
+      StopStream
+    )
+
+    // delete all offset files except for batch 0, 2, 5
+    getListOfFiles(checkpointLocation + "/offsets")
+      .filterNot(f => f.getName.startsWith("0")
+        || f.getName.startsWith("2")
+        || f.getName.startsWith("5"))
+      .foreach(_.delete())
+
+    // delete all commit log files except for batch 0, 2
+    getListOfFiles(checkpointLocation + "/commits")
+      .filterNot(f => f.getName.startsWith("0") || f.getName.startsWith("2"))
+      .foreach(_.delete())
+
+    getBatchIdsSortedFromLog(checkpointLocation + "/offsets") should 
equal(Array(0, 2, 5))
+    getBatchIdsSortedFromLog(checkpointLocation + "/commits") should 
equal(Array(0, 2))
+
+    /**
+     * start new stream
+     */
+    val inputData2 = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val ds2 = inputData2.toDS()
+    testStream(ds2, extraOptions = Map(
+      ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
+      ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS -> "0"
+    ))(
+      // add back old data
+      AddData(inputData2, 0),
+      AddData(inputData2, 1),
+      AddData(inputData2, 2),
+      AddData(inputData2, 3),
+      AddData(inputData2, 4),
+      AddData(inputData2, 5),
+      StartStream(checkpointLocation = checkpointLocation),
+      // since the offset log contains batches 0, 2, 5 and the commit log 
contains
+      // batches 0, 2.  This indicates that batch we have successfully 
processed up to batch 2.
+      // Thus the data we need to process / re-process is batches 3, 4, 5
+      CheckNewAnswer(3, 4, 5),
+      Execute { q =>
+        waitPendingOffsetWrites(q)
+        eventually(timeout(Span(5, Seconds))) {
+          getBatchIdsSortedFromLog(checkpointLocation + "/offsets") should 
equal(Array(0, 2, 5))
+          getBatchIdsSortedFromLog(checkpointLocation + "/commits") should 
equal(Array(0, 2, 5))
+        }
+      },
+      StopStream
+    )
+
+    getBatchIdsSortedFromLog(checkpointLocation + "/offsets") should 
equal(Array(0, 2, 5))
+    getBatchIdsSortedFromLog(checkpointLocation + "/commits") should 
equal(Array(0, 2, 5))
+  }
+
+  test("recovery when gaps exist in offset and commit log") {
+    val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val ds = inputData.toDS()
+
+    val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+    // create a scenario in which the offset log only
+    // contains batch 0, 2 and commit log only contains 9

Review Comment:
   will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to