gaborgsomogyi commented on a change in pull request #21506: [SPARK-24485][SS]
Measure and log elapsed time for filesystem operations in
HDFSBackedStateStoreProvider
URL: https://github.com/apache/spark/pull/21506#discussion_r345270395
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
##########
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider
extends StateStoreProvider wit
if (loadedCurrentVersionMap.isDefined) {
return loadedCurrentVersionMap.get
}
- val snapshotCurrentVersionMap = readSnapshotFile(version)
- if (snapshotCurrentVersionMap.isDefined) {
- synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
- return snapshotCurrentVersionMap.get
- }
- // Find the most recent map before this version that we can.
- // [SPARK-22305] This must be done iteratively to avoid stack overflow.
- var lastAvailableVersion = version
- var lastAvailableMap: Option[MapType] = None
- while (lastAvailableMap.isEmpty) {
- lastAvailableVersion -= 1
+ logWarning(s"The state for version $version doesn't exist in loadedMaps. "
+
Review comment:
@HeartSaVioR this is normal operation and not yet understand why use
`logWarning` then. Can we lower this to debug or was there a reason to use
warning? It's kinda overkill in unit tests...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]