Gerard Maas created SPARK-21710:
-----------------------------------

             Summary: ConsoleSink causes OOM crashes with large inputs.
                 Key: SPARK-21710
                 URL: https://issues.apache.org/jira/browse/SPARK-21710
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.2.0
         Environment: affects all environments
            Reporter: Gerard Maas


ConsoleSink does a full collect of the streaming dataset in order to show few 
lines on screen. This is problematic with large inputs, like a kafka backlog or 
a file source with files larger than the driver's memory.

Here's an example:

{code:scala}
import spark.implicits._
import org.apache.spark.sql.functions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

val schema = StructType(StructField("text", StringType, true) :: Nil)

val lines = spark
  .readStream
  .format("text")
  .option("path", "/tmp/data")
  .schema(schema)
  .load()

val base = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()
{code}

When a large file larger than the available driver memory is fed through this 
streaming job, we get:

{code:java}
-------------------------------------------
Batch: 0
-------------------------------------------

[Stage 0:>                                                        (0 + 8) / 
111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6)
java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:3236)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
  at 
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
  at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
  at java.io.DataOutputStream.write(DataOutputStream.java:107)
  at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker for task 6,5,main]
java.lang.OutOfMemoryError: Java heap space
{code}

This issue can be traced back to a `collect` on the source `DataFrame`:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52

A fairly simple solution would be to do a `take(numRows)` instead of the 
collect. (PR in progress)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to