HeartSaVioR commented on a change in pull request #21506: [SPARK-24485][SS] 
Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider
URL: https://github.com/apache/spark/pull/21506#discussion_r346672308
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ##########
 @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
     if (loadedCurrentVersionMap.isDefined) {
       return loadedCurrentVersionMap.get
     }
-    val snapshotCurrentVersionMap = readSnapshotFile(version)
-    if (snapshotCurrentVersionMap.isDefined) {
-      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
-      return snapshotCurrentVersionMap.get
-    }
 
-    // Find the most recent map before this version that we can.
-    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
-    var lastAvailableVersion = version
-    var lastAvailableMap: Option[MapType] = None
-    while (lastAvailableMap.isEmpty) {
-      lastAvailableVersion -= 1
+    logWarning(s"The state for version $version doesn't exist in loadedMaps. " 
+
 
 Review comment:
   This is not a normal if it's not just restored from checkpoint. If someone 
encounters the warning message while batches are running, it should be 
considered seriously because full state is being loaded from HDFS now though we 
expect state cache should contain it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to