[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21506


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21506#discussion_r194295068
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 if (loadedCurrentVersionMap.isDefined) {
   return loadedCurrentVersionMap.get
 }
-val snapshotCurrentVersionMap = readSnapshotFile(version)
-if (snapshotCurrentVersionMap.isDefined) {
-  synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
-  return snapshotCurrentVersionMap.get
-}
 
-// Find the most recent map before this version that we can.
-// [SPARK-22305] This must be done iteratively to avoid stack overflow.
-var lastAvailableVersion = version
-var lastAvailableMap: Option[MapType] = None
-while (lastAvailableMap.isEmpty) {
-  lastAvailableVersion -= 1
+logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
+  "Reading snapshot file and delta files if needed..." +
+  "Note that this is normal for the first batch of starting query.")
 
-  if (lastAvailableVersion <= 0) {
-// Use an empty map for versions 0 or less.
-lastAvailableMap = Some(new MapType)
-  } else {
-lastAvailableMap =
-  synchronized { loadedMaps.get(lastAvailableVersion) }
-.orElse(readSnapshotFile(lastAvailableVersion))
+val (result, elapsedMs) = Utils.timeTakenMs {
+  val snapshotCurrentVersionMap = readSnapshotFile(version)
+  if (snapshotCurrentVersionMap.isDefined) {
+synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
+return snapshotCurrentVersionMap.get
+  }
+
+  // Find the most recent map before this version that we can.
+  // [SPARK-22305] This must be done iteratively to avoid stack 
overflow.
+  var lastAvailableVersion = version
+  var lastAvailableMap: Option[MapType] = None
+  while (lastAvailableMap.isEmpty) {
+lastAvailableVersion -= 1
+
+if (lastAvailableVersion <= 0) {
+  // Use an empty map for versions 0 or less.
+  lastAvailableMap = Some(new MapType)
+} else {
+  lastAvailableMap =
+synchronized { loadedMaps.get(lastAvailableVersion) }
+  .orElse(readSnapshotFile(lastAvailableVersion))
+}
+  }
+
+  // Load all the deltas from the version after the last available one 
up to the target version.
+  // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+  val resultMap = new MapType(lastAvailableMap.get)
+  for (deltaVersion <- lastAvailableVersion + 1 to version) {
+updateFromDeltaFile(deltaVersion, resultMap)
   }
-}
 
-// Load all the deltas from the version after the last available one 
up to the target version.
-// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
-val resultMap = new MapType(lastAvailableMap.get)
-for (deltaVersion <- lastAvailableVersion + 1 to version) {
-  updateFromDeltaFile(deltaVersion, resultMap)
+  synchronized { loadedMaps.put(version, resultMap) }
+  resultMap
 }
 
-synchronized { loadedMaps.put(version, resultMap) }
-resultMap
+logWarning(s"Loading state for $version takes $elapsedMs ms.")
--- End diff --

Changed log level to DEBUG.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21506#discussion_r194293481
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 if (loadedCurrentVersionMap.isDefined) {
   return loadedCurrentVersionMap.get
 }
-val snapshotCurrentVersionMap = readSnapshotFile(version)
-if (snapshotCurrentVersionMap.isDefined) {
-  synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
-  return snapshotCurrentVersionMap.get
-}
 
-// Find the most recent map before this version that we can.
-// [SPARK-22305] This must be done iteratively to avoid stack overflow.
-var lastAvailableVersion = version
-var lastAvailableMap: Option[MapType] = None
-while (lastAvailableMap.isEmpty) {
-  lastAvailableVersion -= 1
+logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
+  "Reading snapshot file and delta files if needed..." +
+  "Note that this is normal for the first batch of starting query.")
 
-  if (lastAvailableVersion <= 0) {
-// Use an empty map for versions 0 or less.
-lastAvailableMap = Some(new MapType)
-  } else {
-lastAvailableMap =
-  synchronized { loadedMaps.get(lastAvailableVersion) }
-.orElse(readSnapshotFile(lastAvailableVersion))
+val (result, elapsedMs) = Utils.timeTakenMs {
+  val snapshotCurrentVersionMap = readSnapshotFile(version)
+  if (snapshotCurrentVersionMap.isDefined) {
+synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
+return snapshotCurrentVersionMap.get
+  }
+
+  // Find the most recent map before this version that we can.
+  // [SPARK-22305] This must be done iteratively to avoid stack 
overflow.
+  var lastAvailableVersion = version
+  var lastAvailableMap: Option[MapType] = None
+  while (lastAvailableMap.isEmpty) {
+lastAvailableVersion -= 1
+
+if (lastAvailableVersion <= 0) {
+  // Use an empty map for versions 0 or less.
+  lastAvailableMap = Some(new MapType)
+} else {
+  lastAvailableMap =
+synchronized { loadedMaps.get(lastAvailableVersion) }
+  .orElse(readSnapshotFile(lastAvailableVersion))
+}
+  }
+
+  // Load all the deltas from the version after the last available one 
up to the target version.
+  // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+  val resultMap = new MapType(lastAvailableMap.get)
+  for (deltaVersion <- lastAvailableVersion + 1 to version) {
+updateFromDeltaFile(deltaVersion, resultMap)
   }
-}
 
-// Load all the deltas from the version after the last available one 
up to the target version.
-// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
-val resultMap = new MapType(lastAvailableMap.get)
-for (deltaVersion <- lastAvailableVersion + 1 to version) {
-  updateFromDeltaFile(deltaVersion, resultMap)
+  synchronized { loadedMaps.put(version, resultMap) }
+  resultMap
 }
 
-synchronized { loadedMaps.put(version, resultMap) }
-resultMap
+logWarning(s"Loading state for $version takes $elapsedMs ms.")
--- End diff --

I just thought about making a pair between warning message above and this, 
but once we are guiding end users to turn on DEBUG level to see information 
regarding addition latencies, turning this to DEBUG would be also OK.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21506#discussion_r194293251
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 if (loadedCurrentVersionMap.isDefined) {
   return loadedCurrentVersionMap.get
 }
-val snapshotCurrentVersionMap = readSnapshotFile(version)
-if (snapshotCurrentVersionMap.isDefined) {
-  synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
-  return snapshotCurrentVersionMap.get
-}
 
-// Find the most recent map before this version that we can.
-// [SPARK-22305] This must be done iteratively to avoid stack overflow.
-var lastAvailableVersion = version
-var lastAvailableMap: Option[MapType] = None
-while (lastAvailableMap.isEmpty) {
-  lastAvailableVersion -= 1
+logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
+  "Reading snapshot file and delta files if needed..." +
+  "Note that this is normal for the first batch of starting query.")
 
-  if (lastAvailableVersion <= 0) {
-// Use an empty map for versions 0 or less.
-lastAvailableMap = Some(new MapType)
-  } else {
-lastAvailableMap =
-  synchronized { loadedMaps.get(lastAvailableVersion) }
-.orElse(readSnapshotFile(lastAvailableVersion))
+val (result, elapsedMs) = Utils.timeTakenMs {
--- End diff --

Yup right. Most of the code change is just wrapping codes into timeTakenMs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21506#discussion_r194266029
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 if (loadedCurrentVersionMap.isDefined) {
   return loadedCurrentVersionMap.get
 }
-val snapshotCurrentVersionMap = readSnapshotFile(version)
-if (snapshotCurrentVersionMap.isDefined) {
-  synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
-  return snapshotCurrentVersionMap.get
-}
 
-// Find the most recent map before this version that we can.
-// [SPARK-22305] This must be done iteratively to avoid stack overflow.
-var lastAvailableVersion = version
-var lastAvailableMap: Option[MapType] = None
-while (lastAvailableMap.isEmpty) {
-  lastAvailableVersion -= 1
+logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
+  "Reading snapshot file and delta files if needed..." +
+  "Note that this is normal for the first batch of starting query.")
 
-  if (lastAvailableVersion <= 0) {
-// Use an empty map for versions 0 or less.
-lastAvailableMap = Some(new MapType)
-  } else {
-lastAvailableMap =
-  synchronized { loadedMaps.get(lastAvailableVersion) }
-.orElse(readSnapshotFile(lastAvailableVersion))
+val (result, elapsedMs) = Utils.timeTakenMs {
--- End diff --

Github has an... interesting idea of how to display this diff. The only 
change was the existing code moving inside timeTakenMs, and adding the 
logWarning statements, correct?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21506#discussion_r194266060
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 if (loadedCurrentVersionMap.isDefined) {
   return loadedCurrentVersionMap.get
 }
-val snapshotCurrentVersionMap = readSnapshotFile(version)
-if (snapshotCurrentVersionMap.isDefined) {
-  synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
-  return snapshotCurrentVersionMap.get
-}
 
-// Find the most recent map before this version that we can.
-// [SPARK-22305] This must be done iteratively to avoid stack overflow.
-var lastAvailableVersion = version
-var lastAvailableMap: Option[MapType] = None
-while (lastAvailableMap.isEmpty) {
-  lastAvailableVersion -= 1
+logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
+  "Reading snapshot file and delta files if needed..." +
+  "Note that this is normal for the first batch of starting query.")
 
-  if (lastAvailableVersion <= 0) {
-// Use an empty map for versions 0 or less.
-lastAvailableMap = Some(new MapType)
-  } else {
-lastAvailableMap =
-  synchronized { loadedMaps.get(lastAvailableVersion) }
-.orElse(readSnapshotFile(lastAvailableVersion))
+val (result, elapsedMs) = Utils.timeTakenMs {
+  val snapshotCurrentVersionMap = readSnapshotFile(version)
+  if (snapshotCurrentVersionMap.isDefined) {
+synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
+return snapshotCurrentVersionMap.get
+  }
+
+  // Find the most recent map before this version that we can.
+  // [SPARK-22305] This must be done iteratively to avoid stack 
overflow.
+  var lastAvailableVersion = version
+  var lastAvailableMap: Option[MapType] = None
+  while (lastAvailableMap.isEmpty) {
+lastAvailableVersion -= 1
+
+if (lastAvailableVersion <= 0) {
+  // Use an empty map for versions 0 or less.
+  lastAvailableMap = Some(new MapType)
+} else {
+  lastAvailableMap =
+synchronized { loadedMaps.get(lastAvailableVersion) }
+  .orElse(readSnapshotFile(lastAvailableVersion))
+}
+  }
+
+  // Load all the deltas from the version after the last available one 
up to the target version.
+  // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+  val resultMap = new MapType(lastAvailableMap.get)
+  for (deltaVersion <- lastAvailableVersion + 1 to version) {
+updateFromDeltaFile(deltaVersion, resultMap)
   }
-}
 
-// Load all the deltas from the version after the last available one 
up to the target version.
-// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
-val resultMap = new MapType(lastAvailableMap.get)
-for (deltaVersion <- lastAvailableVersion + 1 to version) {
-  updateFromDeltaFile(deltaVersion, resultMap)
+  synchronized { loadedMaps.put(version, resultMap) }
+  resultMap
 }
 
-synchronized { loadedMaps.put(version, resultMap) }
-resultMap
+logWarning(s"Loading state for $version takes $elapsedMs ms.")
--- End diff --

I'm not sure this should be a warning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-07 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21506

[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch measures and logs elapsed time for each operation which 
communicate with file system (mostly remote HDFS in production) in 
HDFSBackedStateStoreProvider to help investigating any latency issue.

## How was this patch tested?

Manually tested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24485

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21506.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21506


commit d84f98fc978262f4165f78b3b223b8bb3151f735
Author: Jungtaek Lim 
Date:   2018-06-07T14:14:46Z

[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org