[jira] [Comment Edited] (SPARK-17403) Fatal Error: Scan cached strings

2017-04-26 Thread Paul Lysak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982595#comment-15982595
 ] 

Paul Lysak edited comment on SPARK-17403 at 4/26/17 3:23 PM:
-

Looks like we have the same issue with Spark 2.1 on YARN (Amazon EMR release 
emr-5.4.0).
Workaround that solves the issue for us (at the cost of some performance) is to 
use df.persist(StorageLevel.DISK_ONLY) instead of df.cache().

Depending on the node types, memory settings, storage level and some other 
factors I couldn't clearly identify it may appear as 

{noformat}
User class threw exception: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 158 in stage 504.0 failed 4 times, most recent failure: 
Lost task 158.3 in stage 504.0 (TID 427365, ip-10-35-162-171.ec2.internal, 
executor 83): java.lang.NegativeArraySizeException
at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
at org.apache.spark.unsafe.types.UTF8String.clone(UTF8String.java:826)
at 
org.apache.spark.sql.execution.columnar.StringColumnStats.gatherStats(ColumnStats.scala:217)
at 
org.apache.spark.sql.execution.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:55)
at 
org.apache.spark.sql.execution.columnar.NativeColumnBuilder.org$apache$spark$sql$execution$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:97)
at 
org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78)
at 
org.apache.spark.sql.execution.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:97)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:122)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97)
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
{noformat}

or as 

{noformat}
User class threw exception: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 27 in stage 61.0 failed 4 times, most recent failure: Lost 
task 27.3 in stage 61.0 (TID 36167, ip-10-35-162-149.ec2.internal, executor 1): 
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_38$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97)
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
{noformat}

or as

{noformat}
2017-04-24 19:02:45,951 ERROR 
org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-3,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_37$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:217)

[jira] [Commented] (SPARK-17403) Fatal Error: Scan cached strings

2017-04-26 Thread Paul Lysak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984987#comment-15984987
 ] 

Paul Lysak commented on SPARK-17403:


Hope that helps - finally managed to reproduce it without using production data:

{code}
import org.apache.spark.sql.functions._

val leftRows = spark.sparkContext.parallelize(numSlices = 3000, seq = for 
(k <- 1 to 1000; j <- 1 to 1000) yield Row(k, j))
  .flatMap(r => (1 to 1000).map(l => Row.fromSeq(r.toSeq :+ l)))

val leftDf = spark.createDataFrame(leftRows, StructType(Seq(
  StructField("k", IntegerType),
  StructField("j", IntegerType),
  StructField("l", IntegerType)
)))
  .withColumn("combinedKey", expr("k*100 + j*1000 + l"))
  .withColumn("fixedCol", lit("sampleVal"))
  .withColumn("combKeyStr", format_number(col("combinedKey"), 0))
  .withColumn("k100", expr("k*100"))
  .withColumn("j100", expr("j*100"))
  .withColumn("l100", expr("l*100"))
  .withColumn("k_200", expr("k+200"))
  .withColumn("j_200", expr("j+200"))
  .withColumn("l_200", expr("l+200"))
  .withColumn("strCol1_1", concat(lit("value of sample column number one 
with which column k will be concatenated:" * 5), format_number(col("k"), 0)))
  .withColumn("strCol1_2", concat(lit("value of sample column two one with 
which column j will be concatenated:" * 5), format_number(col("j"), 0)))
  .withColumn("strCol1_3", concat(lit("value of sample column three one 
with which column r will be concatenated:" * 5), format_number(col("l"), 0)))
  .withColumn("strCol2_1", concat(lit("value of sample column number one 
with which column k will be concatenated:" * 5), format_number(col("k"), 0)))
  .withColumn("strCol2_2", concat(lit("value of sample column two one with 
which column j will be concatenated:" * 5), format_number(col("j"), 0)))
  .withColumn("strCol2_3", concat(lit("value of sample column three one 
with which column r will be concatenated:" * 5), format_number(col("l"), 0)))
  .withColumn("strCol3_1", concat(lit("value of sample column number one 
with which column k will be concatenated:" * 5), format_number(col("k"), 0)))
  .withColumn("strCol3_2", concat(lit("value of sample column two one with 
which column j will be concatenated:" * 5), format_number(col("j"), 0)))
  .withColumn("strCol3_3", concat(lit("value of sample column three one 
with which column r will be concatenated:" * 5), format_number(col("l"), 0))) 
//if further columns commented out - error disappears

leftDf.cache()
println("= leftDf count:" + leftDf.count())
leftDf.show(10)

val rightRows = spark.sparkContext.parallelize((1 to 800).map(i => Row(i, 
"k_" + i, "sampleVal")))
val rightDf = spark.createDataFrame(rightRows, StructType(Seq(
  StructField("k", IntegerType),
  StructField("kStr", StringType),
  StructField("sampleCol", StringType)
)))

rightDf.cache()
println("= rightDf count:" + rightDf.count())
rightDf.show(10)

val joinedDf = leftDf.join(broadcast(rightDf), usingColumns = Seq("k"), 
joinType = "left")

joinedDf.cache()
println("= joinedDf count:" + joinedDf.count())
joinedDf.show(10)
{code}

ApplicationMaster fails with such exception:
{noformat}
User class threw exception: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 949 in stage 8.0 failed 4 times, most recent failure: Lost 
task 949.3 in stage 8.0 (TID 4922, ip-10-35-162-219.ec2.internal, executor 
139): java.lang.NegativeArraySizeException
at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
at org.apache.spark.unsafe.types.UTF8String.clone(UTF8String.java:826)
at 
org.apache.spark.sql.execution.columnar.StringColumnStats.gatherStats(ColumnStats.scala:216)
at 
org.apache.spark.sql.execution.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:55)
at 
org.apache.spark.sql.execution.columnar.NativeColumnBuilder.org$apache$spark$sql$execution$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:97)
at 
org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78)
at 
org.apache.spark.sql.execution.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:97)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:122)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97)
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager

[jira] [Commented] (SPARK-17403) Fatal Error: Scan cached strings

2017-04-25 Thread Paul Lysak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982595#comment-15982595
 ] 

Paul Lysak commented on SPARK-17403:


Looks like we have the same issue with Spark 2.1 on YARN (Amazon EMR release 
emr-5.4.0).
Workaround that solves the issue for us (at the cost of some performance) is to 
use df.persist(StorageLevel.DISK_ONLY) instead of df.cache().

Depending on the node types, memory settings, storage level and some other 
factors I couldn't clearly identify it may appear as 

User class threw exception: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 158 in stage 504.0 failed 4 times, most recent failure: 
Lost task 158.3 in stage 504.0 (TID 427365, ip-10-35-162-171.ec2.internal, 
executor 83): java.lang.NegativeArraySizeException
at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
at org.apache.spark.unsafe.types.UTF8String.clone(UTF8String.java:826)
at 
org.apache.spark.sql.execution.columnar.StringColumnStats.gatherStats(ColumnStats.scala:217)
at 
org.apache.spark.sql.execution.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:55)
at 
org.apache.spark.sql.execution.columnar.NativeColumnBuilder.org$apache$spark$sql$execution$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:97)
at 
org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78)
at 
org.apache.spark.sql.execution.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:97)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:122)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97)
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)

or as 

User class threw exception: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 27 in stage 61.0 failed 4 times, most recent failure: Lost 
task 27.3 in stage 61.0 (TID 36167, ip-10-35-162-149.ec2.internal, executor 1): 
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_38$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97)
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)

or as

2017-04-24 19:02:45,951 ERROR 
org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker-3,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_37$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:217)
at 
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:215)

[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-03-02 Thread Paul Lysak (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15893877#comment-15893877
 ] 

Paul Lysak commented on SPARK-19371:


I'm observing similar behavior in Spark 2.1 - unfortunately, due to complex 
workflow of our application wasn't yet able to identify after which operation 
exactly all the partitions of DataFrame end up on a single executor, so no 
matter how big cluster is - only one executor picks all the job.

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org