[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-09-10 Thread brkyvz
Github user brkyvz closed the pull request at:

https://github.com/apache/spark/pull/20673


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-03-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20673#discussion_r173156595
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -100,7 +102,24 @@ private[spark] object JsonProtocol {
 executorMetricsUpdateToJson(metricsUpdate)
   case blockUpdate: SparkListenerBlockUpdated =>
 blockUpdateToJson(blockUpdate)
-  case _ => parse(mapper.writeValueAsString(event))
+  case _ =>
+// Use piped streams to avoid extra memory consumption
+val outputStream = new PipedOutputStream()
+val inputStream = new PipedInputStream(outputStream)
+try {
+  val thread = new Thread("SparkListenerEvent json writer") {
--- End diff --

Hey @brkyvz, this approach with a thread seems a little bit overkill ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-02-26 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/20673#discussion_r170646787
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -100,7 +102,18 @@ private[spark] object JsonProtocol {
 executorMetricsUpdateToJson(metricsUpdate)
   case blockUpdate: SparkListenerBlockUpdated =>
 blockUpdateToJson(blockUpdate)
-  case _ => parse(mapper.writeValueAsString(event))
+  case _ =>
+// Use piped streams to avoid extra memory consumption
+val outputStream = new PipedOutputStream()
+val inputStream = new PipedInputStream(outputStream)
+try {
+  mapper.writeValue(outputStream, event)
--- End diff --

I was actually hoping for a test to fail, but none did (the test suite has 
a bunch of very specific stuff). This code will likely block forever if the 
block size is larger. Going to add a test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-02-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20673#discussion_r170537137
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -100,7 +102,18 @@ private[spark] object JsonProtocol {
 executorMetricsUpdateToJson(metricsUpdate)
   case blockUpdate: SparkListenerBlockUpdated =>
 blockUpdateToJson(blockUpdate)
-  case _ => parse(mapper.writeValueAsString(event))
+  case _ =>
+// Use piped streams to avoid extra memory consumption
+val outputStream = new PipedOutputStream()
+val inputStream = new PipedInputStream(outputStream)
+try {
+  mapper.writeValue(outputStream, event)
--- End diff --

Wait wait .. does this lazily work for sure? Can we add a test (or manual 
test in the PR description) that reads some more data (maybe more then the 
buffer size in that pipe)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-02-25 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20673#discussion_r170477999
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -100,7 +102,16 @@ private[spark] object JsonProtocol {
 executorMetricsUpdateToJson(metricsUpdate)
   case blockUpdate: SparkListenerBlockUpdated =>
 blockUpdateToJson(blockUpdate)
-  case _ => parse(mapper.writeValueAsString(event))
+  case _ =>
+val outputStream = new PipedOutputStream()
+val inputStream = new PipedInputStream(outputStream)
+try {
+  mapper.writeValue(outputStream, event)
+  parse(inputStream)
+} finally {
+  IOUtils.closeQuietly(outputStream)
--- End diff --

and .. another note for  `IOUtils.closeQuietly` saying that it's 
intentionally used in case that the close might be already attempted by 
Jackson's library if I understood correctly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-02-25 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20673#discussion_r170478050
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -17,13 +17,15 @@
 
 package org.apache.spark.util
 
+import java.io.{ByteArrayOutputStream, PipedInputStream, PipedOutputStream}
--- End diff --

`ByteArrayOutputStream` seems not used here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-02-25 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20673#discussion_r170477844
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -100,7 +102,16 @@ private[spark] object JsonProtocol {
 executorMetricsUpdateToJson(metricsUpdate)
   case blockUpdate: SparkListenerBlockUpdated =>
 blockUpdateToJson(blockUpdate)
-  case _ => parse(mapper.writeValueAsString(event))
+  case _ =>
+val outputStream = new PipedOutputStream()
--- End diff --

Hi @brkyvz, how about adding a note that `PipedOutputStream` is 
intentionally used to get rid of additional consumption of memory if I get this 
correctly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...

2018-02-25 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/20673

[SPARK-23515] Use input/output streams for large events in 
JsonProtocol.sparkEventToJson

## What changes were proposed in this pull request?

`def sparkEventToJson(event: SparkListenerEvent)`

has a fallback method which creates a JSON object by turning an 
unrecognized event to Json and then parsing it again. This method materializes 
the whole string to parse the json record, which is unnecessary, and can cause 
OOMs as seen in the stack trace below:

```
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.(String.java:207)
at java.lang.StringBuilder.toString(StringBuilder.java:407)
at 
com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356)
at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:235)
at 
org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:20)
at 
org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:42)
at 
org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:35)
at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:20)
at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:50)
at 
org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103)
```

We should just use the stream parsing to avoid such OOMs.

## How was this patch tested?




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark eventLoggingJson

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20673.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20673


commit 774188003c5b1c1a000d69f5996dce580c7a1432
Author: Burak Yavuz 
Date:   2018-02-25T20:07:22Z

use streams for large events




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org