Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


HeartSaVioR commented on PR #46233:
URL: https://github.com/apache/spark/pull/46233#issuecomment-2078732698

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


HeartSaVioR commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580548906


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,23 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val earliestLoadedVersion: Option[Long] = if (loadedEntries > 0) {
+  Some(loadedMaps.lastKey())
+} else {
+  None
+}
+
+if (earliestLoadedVersion.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and 
earliest_loaded_version=${earliestLoadedVersion.get} " +

Review Comment:
   Yeah probably could calculate back. Since we are not required to look into 
this message every time (probably only to investigate) so some effort on 
calculating back is probably OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


anishshri-db commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580520020


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,23 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val earliestLoadedVersion: Option[Long] = if (loadedEntries > 0) {
+  Some(loadedMaps.lastKey())
+} else {
+  None
+}
+
+if (earliestLoadedVersion.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and 
earliest_loaded_version=${earliestLoadedVersion.get} " +

Review Comment:
   I think its ok to just know earliest ? we know current size anyway and 
versions are supposed to sequential here since we call this function on each 
commit ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


HeartSaVioR commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580489153


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,23 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val earliestLoadedVersion: Option[Long] = if (loadedEntries > 0) {
+  Some(loadedMaps.lastKey())
+} else {
+  None
+}
+
+if (earliestLoadedVersion.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and 
earliest_loaded_version=${earliestLoadedVersion.get} " +

Review Comment:
   nit: Would it be sufficient for only knowing earliest loaded version, or do 
we want to know the "range"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


anishshri-db commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580445447


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,18 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val lastKey: Option[Long] = if (loadedEntries > 0) 
Some(loadedMaps.lastKey()) else None
+if (lastKey.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and last_loaded_version=${lastKey.get} 
and " +

Review Comment:
   Yup, agreed. Updated the diff. Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


huanliwang-db commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580437124


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,18 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val lastKey: Option[Long] = if (loadedEntries > 0) 
Some(loadedMaps.lastKey()) else None
+if (lastKey.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and last_loaded_version=${lastKey.get} 
and " +

Review Comment:
   the keys are stored in the reverse order, so the last key is the earliest 
version: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L385-L386



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


anishshri-db commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580169570


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,18 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val lastKey: Option[Long] = if (loadedEntries > 0) 
Some(loadedMaps.lastKey()) else None
+if (lastKey.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and last_loaded_version=${lastKey.get} 
and " +

Review Comment:
   This should return last/highest key/version present in the map right ? so 
maybe ok to keep this as it is ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


huanliwang-db commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580155250


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,18 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val lastKey: Option[Long] = if (loadedEntries > 0) 
Some(loadedMaps.lastKey()) else None
+if (lastKey.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and last_loaded_version=${lastKey.get} 
and " +

Review Comment:
   maybe "earliest_loaded_version"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-25 Thread via GitHub


anishshri-db commented on PR #46233:
URL: https://github.com/apache/spark/pull/46233#issuecomment-2078196043

   cc - @HeartSaVioR - PTAL, thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org