[
https://issues.apache.org/jira/browse/SPARK-21710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gerard Maas updated SPARK-21710:
Description:
ConsoleSink does a full collect of the streaming dataset in order to show few
lines on screen. This is problematic with large inputs, like a kafka backlog or
a file source with files larger than the driver's memory.
Here's an example:
{code:java}
import spark.implicits._
import org.apache.spark.sql.functions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
val schema = StructType(StructField("text", StringType, true) :: Nil)
val lines = spark
.readStream
.format("text")
.option("path", "/tmp/data")
.schema(schema)
.load()
val base = lines.writeStream
.outputMode("append")
.format("console")
.start()
{code}
When a large file larger than the available driver memory is fed through this
streaming job, we get:
{code:java}
---
Batch: 0
---
[Stage 0:>(0 + 8) /
111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in
thread Thread[Executor task launch worker for task 6,5,main]
java.lang.OutOfMemoryError: Java heap space
{code}
This issue can be traced back to a `collect` on the source `DataFrame`:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52
A fairly simple solution would be to do a `take(numRows)` instead of the
collect. (PR in progress)
was:
ConsoleSink does a full collect of the streaming dataset in order to show few
lines on screen. This is problematic with large inputs, like a kafka backlog or
a file source with files larger than the driver's memory.
Here's an example:
{code:scala}
import spark.implicits._
import org.apache.spark.sql.functions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
val schema = StructType(StructField("text", StringType, true) :: Nil)
val lines = spark
.readStream
.format("text")
.option("path", "/tmp/data")
.schema(schema)
.load()
val base = lines.writeStream
.outputMode("append")
.format("console")
.start()
{code}
When a large file larger than the available driver memory is fed through this
streaming job, we get:
{code:java}
---
Batch: 0
---
[Stage 0:>(0 + 8) /
111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
org.apac