[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19611


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147824694
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
--- End diff --

we should put the map read from snapshot file into `loadedMaps`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147784280
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) {
+  return currentVersionMap.get
+}
+
--- End diff --

nit: extra empty line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147824364
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) {
+  return currentVersionMap.get
+}
+
+
+// Find the most recent map before this version that we can.
+// [SPARK-22305] This must be done iteratively to avoid stack overflow.
+var lastAvailableVersion = version
+var lastAvailableMap: Option[MapType] = None
+while (lastAvailableMap.isEmpty) {
+  lastAvailableVersion -= 1
+
+  if (lastAvailableVersion <= 0) {
+// Use an empty map for versions 0 or less.
+lastAvailableMap = Some(new MapType)
+  } else {
+lastAvailableMap =
+  synchronized { loadedMaps.get(lastAvailableVersion) }
+.orElse(readSnapshotFile(lastAvailableVersion))
   }
-  loadedMaps.put(version, mapFromFile)
-  mapFromFile
 }
+
+// Load all the deltas from the version after the last available one 
up to the target version.
+// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+val resultMap = lastAvailableMap.get
--- End diff --

We should create a new map here to not change the previous map in case the 
maintenance task is using it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147826631
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) {
+  return currentVersionMap.get
+}
+
+
+// Find the most recent map before this version that we can.
+// [SPARK-22305] This must be done iteratively to avoid stack overflow.
+var lastAvailableVersion = version
+var lastAvailableMap: Option[MapType] = None
+while (lastAvailableMap.isEmpty) {
+  lastAvailableVersion -= 1
+
+  if (lastAvailableVersion <= 0) {
+// Use an empty map for versions 0 or less.
+lastAvailableMap = Some(new MapType)
+  } else {
+lastAvailableMap =
+  synchronized { loadedMaps.get(lastAvailableVersion) }
+.orElse(readSnapshotFile(lastAvailableVersion))
   }
-  loadedMaps.put(version, mapFromFile)
-  mapFromFile
 }
+
+// Load all the deltas from the version after the last available one 
up to the target version.
+// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+val resultMap = lastAvailableMap.get
+for (deltaVersion <- lastAvailableVersion + 1 to version) {
+  updateFromDeltaFile(deltaVersion, resultMap)
+}
+
+loadedMaps.put(version, resultMap)
--- End diff --

`loadedMaps.put(version, resultMap)` -> `synchronized { 
loadedMaps.put(version, resultMap) }`

This is a different issue but since you are touching this, it's better to 
fix it as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147761708
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,39 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) return currentVersionMap.get
+
+
+// Find the most recent map before this version that we can.
+// [SPARK-22305] This must be done iteratively to avoid stack overflow.
+var lastAvailableVersion = version
+var lastAvailableMap: Option[MapType] = None
+while (lastAvailableMap.isEmpty) {
+  lastAvailableVersion = lastAvailableVersion - 1
+
+  if (lastAvailableVersion <= 0) {
+// Use an empty map for versions 0 or less.
+lastAvailableMap = Some(new MapType)
+  } else {
+lastAvailableMap =
+  synchronized { loadedMaps.get(lastAvailableVersion) }
+.orElse(readSnapshotFile(lastAvailableVersion))
   }
-  loadedMaps.put(version, mapFromFile)
-  mapFromFile
 }
+
+// Load all the deltas from the version after the last available one 
up to the target version.
+// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+var resultMap = lastAvailableMap.get
--- End diff --

val?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147761361
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,39 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) return currentVersionMap.get
--- End diff --

I'd unroll this onto two lines with braces for consistency


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147761102
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,39 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) return currentVersionMap.get
+
+
+// Find the most recent map before this version that we can.
+// [SPARK-22305] This must be done iteratively to avoid stack overflow.
+var lastAvailableVersion = version
+var lastAvailableMap: Option[MapType] = None
+while (lastAvailableMap.isEmpty) {
+  lastAvailableVersion = lastAvailableVersion - 1
--- End diff --

Nit: -= 1?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147761742
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,39 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) return currentVersionMap.get
+
+
+// Find the most recent map before this version that we can.
+// [SPARK-22305] This must be done iteratively to avoid stack overflow.
+var lastAvailableVersion = version
+var lastAvailableMap: Option[MapType] = None
+while (lastAvailableMap.isEmpty) {
+  lastAvailableVersion = lastAvailableVersion - 1
+
+  if (lastAvailableVersion <= 0) {
+// Use an empty map for versions 0 or less.
+lastAvailableMap = Some(new MapType)
+  } else {
+lastAvailableMap =
+  synchronized { loadedMaps.get(lastAvailableVersion) }
+.orElse(readSnapshotFile(lastAvailableVersion))
   }
-  loadedMaps.put(version, mapFromFile)
-  mapFromFile
 }
+
+// Load all the deltas from the version after the last available one 
up to the target version.
+// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+var resultMap = lastAvailableMap.get
+for (deltaVersion <- lastAvailableVersion + 1 to version) {
+  updateFromDeltaFile(deltaVersion, resultMap)
+}
+
+loadedMaps.put(version, resultMap)
+return resultMap
--- End diff --

Don't need return


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19611

[SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively

## What changes were proposed in this pull request?
Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents 
stack overflow if too many deltas stack up in a low memory environment.

## How was this patch tested?

existing unit tests for functional equivalence, new unit test to check for 
stack overflow

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark SPARK-22305

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19611


commit 6366347516853abd7afd7e89452e656b1011cf6e
Author: Jose Torres 
Date:   2017-10-30T15:48:13Z

rewrite loadMap iteratively

commit 33ea2fb59f5ad47ed4713ca73945a9630486677c
Author: Jose Torres 
Date:   2017-10-30T16:28:28Z

add test exercising stack overflow




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org