[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19611 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147824694 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) --- End diff -- we should put the map read from snapshot file into `loadedMaps`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147784280 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) { + return currentVersionMap.get +} + --- End diff -- nit: extra empty line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147824364 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) { + return currentVersionMap.get +} + + +// Find the most recent map before this version that we can. +// [SPARK-22305] This must be done iteratively to avoid stack overflow. +var lastAvailableVersion = version +var lastAvailableMap: Option[MapType] = None +while (lastAvailableMap.isEmpty) { + lastAvailableVersion -= 1 + + if (lastAvailableVersion <= 0) { +// Use an empty map for versions 0 or less. +lastAvailableMap = Some(new MapType) + } else { +lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } +.orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + +// Load all the deltas from the version after the last available one up to the target version. +// The last available version is the one with a full snapshot, so it doesn't need deltas. +val resultMap = lastAvailableMap.get --- End diff -- We should create a new map here to not change the previous map in case the maintenance task is using it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147826631 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) { + return currentVersionMap.get +} + + +// Find the most recent map before this version that we can. +// [SPARK-22305] This must be done iteratively to avoid stack overflow. +var lastAvailableVersion = version +var lastAvailableMap: Option[MapType] = None +while (lastAvailableMap.isEmpty) { + lastAvailableVersion -= 1 + + if (lastAvailableVersion <= 0) { +// Use an empty map for versions 0 or less. +lastAvailableMap = Some(new MapType) + } else { +lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } +.orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + +// Load all the deltas from the version after the last available one up to the target version. +// The last available version is the one with a full snapshot, so it doesn't need deltas. +val resultMap = lastAvailableMap.get +for (deltaVersion <- lastAvailableVersion + 1 to version) { + updateFromDeltaFile(deltaVersion, resultMap) +} + +loadedMaps.put(version, resultMap) --- End diff -- `loadedMaps.put(version, resultMap)` -> `synchronized { loadedMaps.put(version, resultMap) }` This is a different issue but since you are touching this, it's better to fix it as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147761708 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,39 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) return currentVersionMap.get + + +// Find the most recent map before this version that we can. +// [SPARK-22305] This must be done iteratively to avoid stack overflow. +var lastAvailableVersion = version +var lastAvailableMap: Option[MapType] = None +while (lastAvailableMap.isEmpty) { + lastAvailableVersion = lastAvailableVersion - 1 + + if (lastAvailableVersion <= 0) { +// Use an empty map for versions 0 or less. +lastAvailableMap = Some(new MapType) + } else { +lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } +.orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + +// Load all the deltas from the version after the last available one up to the target version. +// The last available version is the one with a full snapshot, so it doesn't need deltas. +var resultMap = lastAvailableMap.get --- End diff -- val? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147761361 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,39 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) return currentVersionMap.get --- End diff -- I'd unroll this onto two lines with braces for consistency --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147761102 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,39 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) return currentVersionMap.get + + +// Find the most recent map before this version that we can. +// [SPARK-22305] This must be done iteratively to avoid stack overflow. +var lastAvailableVersion = version +var lastAvailableMap: Option[MapType] = None +while (lastAvailableMap.isEmpty) { + lastAvailableVersion = lastAvailableVersion - 1 --- End diff -- Nit: -= 1? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147761742 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,39 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) return currentVersionMap.get + + +// Find the most recent map before this version that we can. +// [SPARK-22305] This must be done iteratively to avoid stack overflow. +var lastAvailableVersion = version +var lastAvailableMap: Option[MapType] = None +while (lastAvailableMap.isEmpty) { + lastAvailableVersion = lastAvailableVersion - 1 + + if (lastAvailableVersion <= 0) { +// Use an empty map for versions 0 or less. +lastAvailableMap = Some(new MapType) + } else { +lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } +.orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + +// Load all the deltas from the version after the last available one up to the target version. +// The last available version is the one with a full snapshot, so it doesn't need deltas. +var resultMap = lastAvailableMap.get +for (deltaVersion <- lastAvailableVersion + 1 to version) { + updateFromDeltaFile(deltaVersion, resultMap) +} + +loadedMaps.put(version, resultMap) +return resultMap --- End diff -- Don't need return --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19611 [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively ## What changes were proposed in this pull request? Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents stack overflow if too many deltas stack up in a low memory environment. ## How was this patch tested? existing unit tests for functional equivalence, new unit test to check for stack overflow You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22305 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19611 commit 6366347516853abd7afd7e89452e656b1011cf6e Author: Jose TorresDate: 2017-10-30T15:48:13Z rewrite loadMap iteratively commit 33ea2fb59f5ad47ed4713ca73945a9630486677c Author: Jose Torres Date: 2017-10-30T16:28:28Z add test exercising stack overflow --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org