[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user brkyvz closed the pull request at: https://github.com/apache/spark/pull/20673 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r173156595 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -100,7 +102,24 @@ private[spark] object JsonProtocol { executorMetricsUpdateToJson(metricsUpdate) case blockUpdate: SparkListenerBlockUpdated => blockUpdateToJson(blockUpdate) - case _ => parse(mapper.writeValueAsString(event)) + case _ => +// Use piped streams to avoid extra memory consumption +val outputStream = new PipedOutputStream() +val inputStream = new PipedInputStream(outputStream) +try { + val thread = new Thread("SparkListenerEvent json writer") { --- End diff -- Hey @brkyvz, this approach with a thread seems a little bit overkill .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170646787 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -100,7 +102,18 @@ private[spark] object JsonProtocol { executorMetricsUpdateToJson(metricsUpdate) case blockUpdate: SparkListenerBlockUpdated => blockUpdateToJson(blockUpdate) - case _ => parse(mapper.writeValueAsString(event)) + case _ => +// Use piped streams to avoid extra memory consumption +val outputStream = new PipedOutputStream() +val inputStream = new PipedInputStream(outputStream) +try { + mapper.writeValue(outputStream, event) --- End diff -- I was actually hoping for a test to fail, but none did (the test suite has a bunch of very specific stuff). This code will likely block forever if the block size is larger. Going to add a test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170537137 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -100,7 +102,18 @@ private[spark] object JsonProtocol { executorMetricsUpdateToJson(metricsUpdate) case blockUpdate: SparkListenerBlockUpdated => blockUpdateToJson(blockUpdate) - case _ => parse(mapper.writeValueAsString(event)) + case _ => +// Use piped streams to avoid extra memory consumption +val outputStream = new PipedOutputStream() +val inputStream = new PipedInputStream(outputStream) +try { + mapper.writeValue(outputStream, event) --- End diff -- Wait wait .. does this lazily work for sure? Can we add a test (or manual test in the PR description) that reads some more data (maybe more then the buffer size in that pipe)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170477999 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -100,7 +102,16 @@ private[spark] object JsonProtocol { executorMetricsUpdateToJson(metricsUpdate) case blockUpdate: SparkListenerBlockUpdated => blockUpdateToJson(blockUpdate) - case _ => parse(mapper.writeValueAsString(event)) + case _ => +val outputStream = new PipedOutputStream() +val inputStream = new PipedInputStream(outputStream) +try { + mapper.writeValue(outputStream, event) + parse(inputStream) +} finally { + IOUtils.closeQuietly(outputStream) --- End diff -- and .. another note for `IOUtils.closeQuietly` saying that it's intentionally used in case that the close might be already attempted by Jackson's library if I understood correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170478050 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -17,13 +17,15 @@ package org.apache.spark.util +import java.io.{ByteArrayOutputStream, PipedInputStream, PipedOutputStream} --- End diff -- `ByteArrayOutputStream` seems not used here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170477844 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -100,7 +102,16 @@ private[spark] object JsonProtocol { executorMetricsUpdateToJson(metricsUpdate) case blockUpdate: SparkListenerBlockUpdated => blockUpdateToJson(blockUpdate) - case _ => parse(mapper.writeValueAsString(event)) + case _ => +val outputStream = new PipedOutputStream() --- End diff -- Hi @brkyvz, how about adding a note that `PipedOutputStream` is intentionally used to get rid of additional consumption of memory if I get this correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/20673 [SPARK-23515] Use input/output streams for large events in JsonProtocol.sparkEventToJson ## What changes were proposed in this pull request? `def sparkEventToJson(event: SparkListenerEvent)` has a fallback method which creates a JSON object by turning an unrecognized event to Json and then parsing it again. This method materializes the whole string to parse the json record, which is unnecessary, and can cause OOMs as seen in the stack trace below: ``` java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.(String.java:207) at java.lang.StringBuilder.toString(StringBuilder.java:407) at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:235) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:20) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:42) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:35) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:20) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:50) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103) ``` We should just use the stream parsing to avoid such OOMs. ## How was this patch tested? You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark eventLoggingJson Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20673 commit 774188003c5b1c1a000d69f5996dce580c7a1432 Author: Burak YavuzDate: 2018-02-25T20:07:22Z use streams for large events --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org