This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b71f71e196 [SPARK-42567][SS][SQL] Track load time for state store 
provider and log warning if it exceeds threshold
6b71f71e196 is described below

commit 6b71f71e196718474a0e204a6b29aec2d2f8530d
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Sat Feb 25 18:40:52 2023 +0900

    [SPARK-42567][SS][SQL] Track load time for state store provider and log 
warning if it exceeds threshold
    
    ### What changes were proposed in this pull request?
    Track load time for state store provider and log warning if it exceeds 
threshold
    
    ### Why are the changes needed?
    We have seen that the initial state store provider load can be blocked by 
external factors such as filesystem initialization. This log enables us to 
track cases where this load takes too long and we log a warning in such cases.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Augmented some of the tests to verify the logging is working as expected.
    Sample logs:
    ```
    14:58:51.784 WARN 
org.apache.spark.sql.execution.streaming.state.StateStore: Loaded state store 
provider in loadTimeMs=2049 for storeId=StateStoreId[ 
checkpointRootLocation=file:/Users/anish.shrigondekar/spark/spark/target/tmp/streaming.metadata-1f2ff296-1ece-4a0c-b4b4-48aa0e909b49/
    state, operatorId=0, partitionId=2, storeName=default ] and 
queryRunId=a4063603-3929-4340-9920-eca206ebec36
    14:58:53.838 WARN 
org.apache.spark.sql.execution.streaming.state.StateStore: Loaded state store 
provider in loadTimeMs=2046 for storeId=StateStoreId[ 
checkpointRootLocation=file:/Users/anish.shrigondekar/spark/spark/target/tmp/streaming.metadata-1f2ff296-1ece-4a0c-b4b4-48aa0e909b49/
    state, operatorId=0, partitionId=3, storeName=default ] and 
queryRunId=a4063603-3929-4340-9920-eca206ebec36
    14:58:55.885 WARN 
org.apache.spark.sql.execution.streaming.state.StateStore: Loaded state store 
provider in loadTimeMs=2044 for storeId=StateStoreId[ 
checkpointRootLocation=file:/Users/anish.shrigondekar/spark/spark/target/tmp/streaming.metadata-1f2ff296-1ece-4a0c-b4b4-48aa0e909b49/
    state, operatorId=0, partitionId=4, storeName=default ] and 
queryRunId=a4063603-3929-4340-9920-eca206ebec36
    ```
    
    Closes #40163 from anishshri-db/task/SPARK-42567.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/StateStore.scala | 27 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 787f4e390e5..beb6500fe3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -398,6 +398,12 @@ case class StateStoreId(
       new Path(checkpointRootLocation, s"$operatorId/$partitionId/$storeName")
     }
   }
+
+  override def toString: String = {
+    s"""StateStoreId[ checkpointRootLocation=$checkpointRootLocation, 
operatorId=$operatorId,
+       | partitionId=$partitionId, storeName=$storeName ]
+       |""".stripMargin.replaceAll("\n", "")
+  }
 }
 
 object StateStoreId {
@@ -533,11 +539,22 @@ object StateStore extends Logging {
         }
       }
 
-      val provider = loadedProviders.getOrElseUpdate(
-        storeProviderId,
-        StateStoreProvider.createAndInit(
-          storeProviderId, keySchema, valueSchema, numColsPrefixKey, 
storeConf, hadoopConf)
-      )
+      // SPARK-42567 - Track load time for state store provider and log 
warning if takes longer
+      // than 2s.
+      val (provider, loadTimeMs) = Utils.timeTakenMs {
+        loadedProviders.getOrElseUpdate(
+          storeProviderId,
+          StateStoreProvider.createAndInit(
+            storeProviderId, keySchema, valueSchema, numColsPrefixKey, 
storeConf, hadoopConf)
+        )
+      }
+
+      if (loadTimeMs > 2000L) {
+        logWarning(s"Loaded state store provider in loadTimeMs=$loadTimeMs " +
+          s"for storeId=${storeProviderId.storeId.toString} and " +
+          s"queryRunId=${storeProviderId.queryRunId}")
+      }
+
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, 
otherProviderIds)
       providerIdsToUnload.foreach(unload(_))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to