dongjoon-hyun commented on code in PR #56721:
URL: https://github.com/apache/spark/pull/56721#discussion_r3464763320


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -1062,8 +1063,26 @@ class StateDataSourceTransformWithStateSuite extends 
StateStoreMetricsTest
           ProcessAllAvailable(),
           AddData(inputData, (17, 1L), (18, 2L), (19, 3L), (20, 4L)),
           ProcessAllAvailable(),
-          // Ensure that we get a chance to upload created snapshots
-          Execute { _ => Thread.sleep(5000) },
+          // Wait deterministically for the maintenance thread to upload the 
snapshot files the
+          // snapshotStartBatchId reader needs (snapshot version 2 for the 
partitions read below),
+          // instead of a fixed sleep which is flaky under CI load (the 
snapshot upload is
+          // asynchronous, so a too-short sleep leaves `2.zip` missing -> 
FileNotFoundException).
+          Execute { _ =>
+            val opStateDir = new File(tmpDir, "state/0")
+            eventually(timeout(60.seconds), interval(100.milliseconds)) {

Review Comment:
   So, technically, we increase the timeout from `5s` to `60s`?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -1062,8 +1063,26 @@ class StateDataSourceTransformWithStateSuite extends 
StateStoreMetricsTest
           ProcessAllAvailable(),
           AddData(inputData, (17, 1L), (18, 2L), (19, 3L), (20, 4L)),
           ProcessAllAvailable(),
-          // Ensure that we get a chance to upload created snapshots
-          Execute { _ => Thread.sleep(5000) },
+          // Wait deterministically for the maintenance thread to upload the 
snapshot files the
+          // snapshotStartBatchId reader needs (snapshot version 2 for the 
partitions read below),
+          // instead of a fixed sleep which is flaky under CI load (the 
snapshot upload is
+          // asynchronous, so a too-short sleep leaves `2.zip` missing -> 
FileNotFoundException).
+          Execute { _ =>
+            val opStateDir = new File(tmpDir, "state/0")
+            eventually(timeout(60.seconds), interval(100.milliseconds)) {

Review Comment:
   So, technically, we increase the maximum timeout from `5s` to `60s`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to