[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21506 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21506#discussion_r194295068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Reading snapshot file and delta files if needed..." + + "Note that this is normal for the first batch of starting query.") - if (lastAvailableVersion <= 0) { -// Use an empty map for versions 0 or less. -lastAvailableMap = Some(new MapType) - } else { -lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } -.orElse(readSnapshotFile(lastAvailableVersion)) +val (result, elapsedMs) = Utils.timeTakenMs { + val snapshotCurrentVersionMap = readSnapshotFile(version) + if (snapshotCurrentVersionMap.isDefined) { +synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } +return snapshotCurrentVersionMap.get + } + + // Find the most recent map before this version that we can. + // [SPARK-22305] This must be done iteratively to avoid stack overflow. + var lastAvailableVersion = version + var lastAvailableMap: Option[MapType] = None + while (lastAvailableMap.isEmpty) { +lastAvailableVersion -= 1 + +if (lastAvailableVersion <= 0) { + // Use an empty map for versions 0 or less. + lastAvailableMap = Some(new MapType) +} else { + lastAvailableMap = +synchronized { loadedMaps.get(lastAvailableVersion) } + .orElse(readSnapshotFile(lastAvailableVersion)) +} + } + + // Load all the deltas from the version after the last available one up to the target version. + // The last available version is the one with a full snapshot, so it doesn't need deltas. + val resultMap = new MapType(lastAvailableMap.get) + for (deltaVersion <- lastAvailableVersion + 1 to version) { +updateFromDeltaFile(deltaVersion, resultMap) } -} -// Load all the deltas from the version after the last available one up to the target version. -// The last available version is the one with a full snapshot, so it doesn't need deltas. -val resultMap = new MapType(lastAvailableMap.get) -for (deltaVersion <- lastAvailableVersion + 1 to version) { - updateFromDeltaFile(deltaVersion, resultMap) + synchronized { loadedMaps.put(version, resultMap) } + resultMap } -synchronized { loadedMaps.put(version, resultMap) } -resultMap +logWarning(s"Loading state for $version takes $elapsedMs ms.") --- End diff -- Changed log level to DEBUG. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21506#discussion_r194293481 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Reading snapshot file and delta files if needed..." + + "Note that this is normal for the first batch of starting query.") - if (lastAvailableVersion <= 0) { -// Use an empty map for versions 0 or less. -lastAvailableMap = Some(new MapType) - } else { -lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } -.orElse(readSnapshotFile(lastAvailableVersion)) +val (result, elapsedMs) = Utils.timeTakenMs { + val snapshotCurrentVersionMap = readSnapshotFile(version) + if (snapshotCurrentVersionMap.isDefined) { +synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } +return snapshotCurrentVersionMap.get + } + + // Find the most recent map before this version that we can. + // [SPARK-22305] This must be done iteratively to avoid stack overflow. + var lastAvailableVersion = version + var lastAvailableMap: Option[MapType] = None + while (lastAvailableMap.isEmpty) { +lastAvailableVersion -= 1 + +if (lastAvailableVersion <= 0) { + // Use an empty map for versions 0 or less. + lastAvailableMap = Some(new MapType) +} else { + lastAvailableMap = +synchronized { loadedMaps.get(lastAvailableVersion) } + .orElse(readSnapshotFile(lastAvailableVersion)) +} + } + + // Load all the deltas from the version after the last available one up to the target version. + // The last available version is the one with a full snapshot, so it doesn't need deltas. + val resultMap = new MapType(lastAvailableMap.get) + for (deltaVersion <- lastAvailableVersion + 1 to version) { +updateFromDeltaFile(deltaVersion, resultMap) } -} -// Load all the deltas from the version after the last available one up to the target version. -// The last available version is the one with a full snapshot, so it doesn't need deltas. -val resultMap = new MapType(lastAvailableMap.get) -for (deltaVersion <- lastAvailableVersion + 1 to version) { - updateFromDeltaFile(deltaVersion, resultMap) + synchronized { loadedMaps.put(version, resultMap) } + resultMap } -synchronized { loadedMaps.put(version, resultMap) } -resultMap +logWarning(s"Loading state for $version takes $elapsedMs ms.") --- End diff -- I just thought about making a pair between warning message above and this, but once we are guiding end users to turn on DEBUG level to see information regarding addition latencies, turning this to DEBUG would be also OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21506#discussion_r194293251 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Reading snapshot file and delta files if needed..." + + "Note that this is normal for the first batch of starting query.") - if (lastAvailableVersion <= 0) { -// Use an empty map for versions 0 or less. -lastAvailableMap = Some(new MapType) - } else { -lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } -.orElse(readSnapshotFile(lastAvailableVersion)) +val (result, elapsedMs) = Utils.timeTakenMs { --- End diff -- Yup right. Most of the code change is just wrapping codes into timeTakenMs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21506#discussion_r194266029 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Reading snapshot file and delta files if needed..." + + "Note that this is normal for the first batch of starting query.") - if (lastAvailableVersion <= 0) { -// Use an empty map for versions 0 or less. -lastAvailableMap = Some(new MapType) - } else { -lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } -.orElse(readSnapshotFile(lastAvailableVersion)) +val (result, elapsedMs) = Utils.timeTakenMs { --- End diff -- Github has an... interesting idea of how to display this diff. The only change was the existing code moving inside timeTakenMs, and adding the logWarning statements, correct? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21506#discussion_r194266060 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Reading snapshot file and delta files if needed..." + + "Note that this is normal for the first batch of starting query.") - if (lastAvailableVersion <= 0) { -// Use an empty map for versions 0 or less. -lastAvailableMap = Some(new MapType) - } else { -lastAvailableMap = - synchronized { loadedMaps.get(lastAvailableVersion) } -.orElse(readSnapshotFile(lastAvailableVersion)) +val (result, elapsedMs) = Utils.timeTakenMs { + val snapshotCurrentVersionMap = readSnapshotFile(version) + if (snapshotCurrentVersionMap.isDefined) { +synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } +return snapshotCurrentVersionMap.get + } + + // Find the most recent map before this version that we can. + // [SPARK-22305] This must be done iteratively to avoid stack overflow. + var lastAvailableVersion = version + var lastAvailableMap: Option[MapType] = None + while (lastAvailableMap.isEmpty) { +lastAvailableVersion -= 1 + +if (lastAvailableVersion <= 0) { + // Use an empty map for versions 0 or less. + lastAvailableMap = Some(new MapType) +} else { + lastAvailableMap = +synchronized { loadedMaps.get(lastAvailableVersion) } + .orElse(readSnapshotFile(lastAvailableVersion)) +} + } + + // Load all the deltas from the version after the last available one up to the target version. + // The last available version is the one with a full snapshot, so it doesn't need deltas. + val resultMap = new MapType(lastAvailableMap.get) + for (deltaVersion <- lastAvailableVersion + 1 to version) { +updateFromDeltaFile(deltaVersion, resultMap) } -} -// Load all the deltas from the version after the last available one up to the target version. -// The last available version is the one with a full snapshot, so it doesn't need deltas. -val resultMap = new MapType(lastAvailableMap.get) -for (deltaVersion <- lastAvailableVersion + 1 to version) { - updateFromDeltaFile(deltaVersion, resultMap) + synchronized { loadedMaps.put(version, resultMap) } + resultMap } -synchronized { loadedMaps.put(version, resultMap) } -resultMap +logWarning(s"Loading state for $version takes $elapsedMs ms.") --- End diff -- I'm not sure this should be a warning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21506 [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider ## What changes were proposed in this pull request? This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue. ## How was this patch tested? Manually tested. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24485 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21506.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21506 commit d84f98fc978262f4165f78b3b223b8bb3151f735 Author: Jungtaek Lim Date: 2018-06-07T14:14:46Z [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org