Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
HeartSaVioR commented on PR #46233: URL: https://github.com/apache/spark/pull/46233#issuecomment-2078732698 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
HeartSaVioR commented on code in PR #46233: URL: https://github.com/apache/spark/pull/46233#discussion_r1580548906 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ## @@ -437,6 +437,23 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private def putStateIntoStateCacheMap( newVersion: Long, map: HDFSBackedStateStoreMap): Unit = synchronized { +val loadedEntries = loadedMaps.size() +val earliestLoadedVersion: Option[Long] = if (loadedEntries > 0) { + Some(loadedMaps.lastKey()) +} else { + None +} + +if (earliestLoadedVersion.isDefined) { + logInfo(s"Trying to add version=$newVersion to state cache map with " + +s"current_size=$loadedEntries and earliest_loaded_version=${earliestLoadedVersion.get} " + Review Comment: Yeah probably could calculate back. Since we are not required to look into this message every time (probably only to investigate) so some effort on calculating back is probably OK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
anishshri-db commented on code in PR #46233: URL: https://github.com/apache/spark/pull/46233#discussion_r1580520020 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ## @@ -437,6 +437,23 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private def putStateIntoStateCacheMap( newVersion: Long, map: HDFSBackedStateStoreMap): Unit = synchronized { +val loadedEntries = loadedMaps.size() +val earliestLoadedVersion: Option[Long] = if (loadedEntries > 0) { + Some(loadedMaps.lastKey()) +} else { + None +} + +if (earliestLoadedVersion.isDefined) { + logInfo(s"Trying to add version=$newVersion to state cache map with " + +s"current_size=$loadedEntries and earliest_loaded_version=${earliestLoadedVersion.get} " + Review Comment: I think its ok to just know earliest ? we know current size anyway and versions are supposed to sequential here since we call this function on each commit ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
HeartSaVioR commented on code in PR #46233: URL: https://github.com/apache/spark/pull/46233#discussion_r1580489153 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ## @@ -437,6 +437,23 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private def putStateIntoStateCacheMap( newVersion: Long, map: HDFSBackedStateStoreMap): Unit = synchronized { +val loadedEntries = loadedMaps.size() +val earliestLoadedVersion: Option[Long] = if (loadedEntries > 0) { + Some(loadedMaps.lastKey()) +} else { + None +} + +if (earliestLoadedVersion.isDefined) { + logInfo(s"Trying to add version=$newVersion to state cache map with " + +s"current_size=$loadedEntries and earliest_loaded_version=${earliestLoadedVersion.get} " + Review Comment: nit: Would it be sufficient for only knowing earliest loaded version, or do we want to know the "range"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
anishshri-db commented on code in PR #46233: URL: https://github.com/apache/spark/pull/46233#discussion_r1580445447 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ## @@ -437,6 +437,18 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private def putStateIntoStateCacheMap( newVersion: Long, map: HDFSBackedStateStoreMap): Unit = synchronized { +val loadedEntries = loadedMaps.size() +val lastKey: Option[Long] = if (loadedEntries > 0) Some(loadedMaps.lastKey()) else None +if (lastKey.isDefined) { + logInfo(s"Trying to add version=$newVersion to state cache map with " + +s"current_size=$loadedEntries and last_loaded_version=${lastKey.get} and " + Review Comment: Yup, agreed. Updated the diff. Thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
huanliwang-db commented on code in PR #46233: URL: https://github.com/apache/spark/pull/46233#discussion_r1580437124 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ## @@ -437,6 +437,18 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private def putStateIntoStateCacheMap( newVersion: Long, map: HDFSBackedStateStoreMap): Unit = synchronized { +val loadedEntries = loadedMaps.size() +val lastKey: Option[Long] = if (loadedEntries > 0) Some(loadedMaps.lastKey()) else None +if (lastKey.isDefined) { + logInfo(s"Trying to add version=$newVersion to state cache map with " + +s"current_size=$loadedEntries and last_loaded_version=${lastKey.get} and " + Review Comment: the keys are stored in the reverse order, so the last key is the earliest version: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L385-L386 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
anishshri-db commented on code in PR #46233: URL: https://github.com/apache/spark/pull/46233#discussion_r1580169570 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ## @@ -437,6 +437,18 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private def putStateIntoStateCacheMap( newVersion: Long, map: HDFSBackedStateStoreMap): Unit = synchronized { +val loadedEntries = loadedMaps.size() +val lastKey: Option[Long] = if (loadedEntries > 0) Some(loadedMaps.lastKey()) else None +if (lastKey.isDefined) { + logInfo(s"Trying to add version=$newVersion to state cache map with " + +s"current_size=$loadedEntries and last_loaded_version=${lastKey.get} and " + Review Comment: This should return last/highest key/version present in the map right ? so maybe ok to keep this as it is ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
huanliwang-db commented on code in PR #46233: URL: https://github.com/apache/spark/pull/46233#discussion_r1580155250 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ## @@ -437,6 +437,18 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private def putStateIntoStateCacheMap( newVersion: Long, map: HDFSBackedStateStoreMap): Unit = synchronized { +val loadedEntries = loadedMaps.size() +val lastKey: Option[Long] = if (loadedEntries > 0) Some(loadedMaps.lastKey()) else None +if (lastKey.isDefined) { + logInfo(s"Trying to add version=$newVersion to state cache map with " + +s"current_size=$loadedEntries and last_loaded_version=${lastKey.get} and " + Review Comment: maybe "earliest_loaded_version"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]
anishshri-db commented on PR #46233: URL: https://github.com/apache/spark/pull/46233#issuecomment-2078196043 cc - @HeartSaVioR - PTAL, thx ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org