Repository: spark
Updated Branches:
  refs/heads/master 3352d6fe9 -> 4c388bccf


[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch measures and logs elapsed time for each operation which communicate 
with file system (mostly remote HDFS in production) in 
HDFSBackedStateStoreProvider to help investigating any latency issue.

## How was this patch tested?

Manually tested.

Author: Jungtaek Lim <kabh...@gmail.com>

Closes #21506 from HeartSaVioR/SPARK-24485.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c388bcc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c388bcc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c388bcc

Branch: refs/heads/master
Commit: 4c388bccf1bcac8f833fd9214096dd164c3ea065
Parents: 3352d6f
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Wed Jun 13 12:36:20 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Wed Jun 13 12:36:20 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 11 ++-
 .../state/HDFSBackedStateStoreProvider.scala    | 83 ++++++++++++--------
 .../execution/streaming/statefulOperators.scala |  9 +--
 3 files changed, 62 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c388bcc/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7428db2..c139db4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -31,6 +31,7 @@ import java.nio.file.Files
 import java.security.SecureRandom
 import java.util.{Locale, Properties, Random, UUID}
 import java.util.concurrent._
+import java.util.concurrent.TimeUnit.NANOSECONDS
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.zip.GZIPInputStream
 
@@ -434,7 +435,7 @@ private[spark] object Utils extends Logging {
     new URI("file:///" + rawFileName).getPath.substring(1)
   }
 
-    /**
+  /**
    * Download a file or directory to target directory. Supports fetching the 
file in a variety of
    * ways, including HTTP, Hadoop-compatible filesystems, and files on a 
standard filesystem, based
    * on the URL parameter. Fetching directories is only supported from 
Hadoop-compatible
@@ -507,6 +508,14 @@ private[spark] object Utils extends Logging {
     targetFile
   }
 
+  /** Records the duration of running `body`. */
+  def timeTakenMs[T](body: => T): (T, Long) = {
+    val startTime = System.nanoTime()
+    val result = body
+    val endTime = System.nanoTime()
+    (result, math.max(NANOSECONDS.toMillis(endTime - startTime), 0))
+  }
+
   /**
    * Download `in` to `tempFile`, then move it to `destFile`.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/4c388bcc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index df722b9..118c82a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.sql.execution.streaming.state
 
 import java.io._
-import java.nio.channels.ClosedChannelException
 import java.util.Locale
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.util.Random
 import scala.util.control.NonFatal
 
 import com.google.common.io.ByteStreams
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
     if (loadedCurrentVersionMap.isDefined) {
       return loadedCurrentVersionMap.get
     }
-    val snapshotCurrentVersionMap = readSnapshotFile(version)
-    if (snapshotCurrentVersionMap.isDefined) {
-      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
-      return snapshotCurrentVersionMap.get
-    }
 
-    // Find the most recent map before this version that we can.
-    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
-    var lastAvailableVersion = version
-    var lastAvailableMap: Option[MapType] = None
-    while (lastAvailableMap.isEmpty) {
-      lastAvailableVersion -= 1
+    logWarning(s"The state for version $version doesn't exist in loadedMaps. " 
+
+      "Reading snapshot file and delta files if needed..." +
+      "Note that this is normal for the first batch of starting query.")
 
-      if (lastAvailableVersion <= 0) {
-        // Use an empty map for versions 0 or less.
-        lastAvailableMap = Some(new MapType)
-      } else {
-        lastAvailableMap =
-          synchronized { loadedMaps.get(lastAvailableVersion) }
-            .orElse(readSnapshotFile(lastAvailableVersion))
+    val (result, elapsedMs) = Utils.timeTakenMs {
+      val snapshotCurrentVersionMap = readSnapshotFile(version)
+      if (snapshotCurrentVersionMap.isDefined) {
+        synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
+        return snapshotCurrentVersionMap.get
+      }
+
+      // Find the most recent map before this version that we can.
+      // [SPARK-22305] This must be done iteratively to avoid stack overflow.
+      var lastAvailableVersion = version
+      var lastAvailableMap: Option[MapType] = None
+      while (lastAvailableMap.isEmpty) {
+        lastAvailableVersion -= 1
+
+        if (lastAvailableVersion <= 0) {
+          // Use an empty map for versions 0 or less.
+          lastAvailableMap = Some(new MapType)
+        } else {
+          lastAvailableMap =
+            synchronized { loadedMaps.get(lastAvailableVersion) }
+              .orElse(readSnapshotFile(lastAvailableVersion))
+        }
+      }
+
+      // Load all the deltas from the version after the last available one up 
to the target version.
+      // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+      val resultMap = new MapType(lastAvailableMap.get)
+      for (deltaVersion <- lastAvailableVersion + 1 to version) {
+        updateFromDeltaFile(deltaVersion, resultMap)
       }
-    }
 
-    // Load all the deltas from the version after the last available one up to 
the target version.
-    // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
-    val resultMap = new MapType(lastAvailableMap.get)
-    for (deltaVersion <- lastAvailableVersion + 1 to version) {
-      updateFromDeltaFile(deltaVersion, resultMap)
+      synchronized { loadedMaps.put(version, resultMap) }
+      resultMap
     }
 
-    synchronized { loadedMaps.put(version, resultMap) }
-    resultMap
+    logDebug(s"Loading state for $version takes $elapsedMs ms.")
+
+    result
   }
 
   private def writeUpdateToDeltaFile(
@@ -490,7 +499,9 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
   /** Perform a snapshot of the store to allow delta files to be consolidated 
*/
   private def doSnapshot(): Unit = {
     try {
-      val files = fetchFiles()
+      val (files, e1) = Utils.timeTakenMs(fetchFiles())
+      logDebug(s"fetchFiles() took $e1 ms.")
+
       if (files.nonEmpty) {
         val lastVersion = files.last.version
         val deltaFilesForLastVersion =
@@ -498,7 +509,8 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
         synchronized { loadedMaps.get(lastVersion) } match {
           case Some(map) =>
             if (deltaFilesForLastVersion.size > 
storeConf.minDeltasForSnapshot) {
-              writeSnapshotFile(lastVersion, map)
+              val (_, e2) = Utils.timeTakenMs(writeSnapshotFile(lastVersion, 
map))
+              logDebug(s"writeSnapshotFile() took $e2 ms.")
             }
           case None =>
             // The last map is not loaded, probably some other instance is in 
charge
@@ -517,7 +529,9 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
    */
   private[state] def cleanup(): Unit = {
     try {
-      val files = fetchFiles()
+      val (files, e1) = Utils.timeTakenMs(fetchFiles())
+      logDebug(s"fetchFiles() took $e1 ms.")
+
       if (files.nonEmpty) {
         val earliestVersionToRetain = files.last.version - 
storeConf.minVersionsToRetain
         if (earliestVersionToRetain > 0) {
@@ -527,9 +541,12 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
             mapsToRemove.foreach(loadedMaps.remove)
           }
           val filesToDelete = files.filter(_.version < 
earliestFileToRetain.version)
-          filesToDelete.foreach { f =>
-            fm.delete(f.path)
+          val (_, e2) = Utils.timeTakenMs {
+            filesToDelete.foreach { f =>
+              fm.delete(f.path)
+            }
           }
+          logDebug(s"deleting files took $e2 ms.")
           logInfo(s"Deleted files older than ${earliestFileToRetain.version} 
for $this: " +
             filesToDelete.mkString(", "))
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/4c388bcc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 1691a63..6759fb4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLMetrics}
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress}
 import org.apache.spark.sql.types._
-import org.apache.spark.util.{CompletionIterator, NextIterator}
+import org.apache.spark.util.{CompletionIterator, NextIterator, Utils}
 
 
 /** Used to identify the state store for a given operator. */
@@ -97,12 +97,7 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
   }
 
   /** Records the duration of running `body` for the next query progress 
update. */
-  protected def timeTakenMs(body: => Unit): Long = {
-    val startTime = System.nanoTime()
-    val result = body
-    val endTime = System.nanoTime()
-    math.max(NANOSECONDS.toMillis(endTime - startTime), 0)
-  }
+  protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
 
   /**
    * Set the SQL metrics related to the state store.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to