[jira] [Comment Edited] (SPARK-17403) Fatal Error: Scan cached strings
[ https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982595#comment-15982595 ] Paul Lysak edited comment on SPARK-17403 at 4/26/17 3:23 PM: - Looks like we have the same issue with Spark 2.1 on YARN (Amazon EMR release emr-5.4.0). Workaround that solves the issue for us (at the cost of some performance) is to use df.persist(StorageLevel.DISK_ONLY) instead of df.cache(). Depending on the node types, memory settings, storage level and some other factors I couldn't clearly identify it may appear as {noformat} User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 158 in stage 504.0 failed 4 times, most recent failure: Lost task 158.3 in stage 504.0 (TID 427365, ip-10-35-162-171.ec2.internal, executor 83): java.lang.NegativeArraySizeException at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) at org.apache.spark.unsafe.types.UTF8String.clone(UTF8String.java:826) at org.apache.spark.sql.execution.columnar.StringColumnStats.gatherStats(ColumnStats.scala:217) at org.apache.spark.sql.execution.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:55) at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.org$apache$spark$sql$execution$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:97) at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:97) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:122) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) {noformat} or as {noformat} User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 27 in stage 61.0 failed 4 times, most recent failure: Lost task 27.3 in stage 61.0 (TID 36167, ip-10-35-162-149.ec2.internal, executor 1): java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_38$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) {noformat} or as {noformat} 2017-04-24 19:02:45,951 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_37$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:217)
[jira] [Commented] (SPARK-17403) Fatal Error: Scan cached strings
[ https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984987#comment-15984987 ] Paul Lysak commented on SPARK-17403: Hope that helps - finally managed to reproduce it without using production data: {code} import org.apache.spark.sql.functions._ val leftRows = spark.sparkContext.parallelize(numSlices = 3000, seq = for (k <- 1 to 1000; j <- 1 to 1000) yield Row(k, j)) .flatMap(r => (1 to 1000).map(l => Row.fromSeq(r.toSeq :+ l))) val leftDf = spark.createDataFrame(leftRows, StructType(Seq( StructField("k", IntegerType), StructField("j", IntegerType), StructField("l", IntegerType) ))) .withColumn("combinedKey", expr("k*100 + j*1000 + l")) .withColumn("fixedCol", lit("sampleVal")) .withColumn("combKeyStr", format_number(col("combinedKey"), 0)) .withColumn("k100", expr("k*100")) .withColumn("j100", expr("j*100")) .withColumn("l100", expr("l*100")) .withColumn("k_200", expr("k+200")) .withColumn("j_200", expr("j+200")) .withColumn("l_200", expr("l+200")) .withColumn("strCol1_1", concat(lit("value of sample column number one with which column k will be concatenated:" * 5), format_number(col("k"), 0))) .withColumn("strCol1_2", concat(lit("value of sample column two one with which column j will be concatenated:" * 5), format_number(col("j"), 0))) .withColumn("strCol1_3", concat(lit("value of sample column three one with which column r will be concatenated:" * 5), format_number(col("l"), 0))) .withColumn("strCol2_1", concat(lit("value of sample column number one with which column k will be concatenated:" * 5), format_number(col("k"), 0))) .withColumn("strCol2_2", concat(lit("value of sample column two one with which column j will be concatenated:" * 5), format_number(col("j"), 0))) .withColumn("strCol2_3", concat(lit("value of sample column three one with which column r will be concatenated:" * 5), format_number(col("l"), 0))) .withColumn("strCol3_1", concat(lit("value of sample column number one with which column k will be concatenated:" * 5), format_number(col("k"), 0))) .withColumn("strCol3_2", concat(lit("value of sample column two one with which column j will be concatenated:" * 5), format_number(col("j"), 0))) .withColumn("strCol3_3", concat(lit("value of sample column three one with which column r will be concatenated:" * 5), format_number(col("l"), 0))) //if further columns commented out - error disappears leftDf.cache() println("= leftDf count:" + leftDf.count()) leftDf.show(10) val rightRows = spark.sparkContext.parallelize((1 to 800).map(i => Row(i, "k_" + i, "sampleVal"))) val rightDf = spark.createDataFrame(rightRows, StructType(Seq( StructField("k", IntegerType), StructField("kStr", StringType), StructField("sampleCol", StringType) ))) rightDf.cache() println("= rightDf count:" + rightDf.count()) rightDf.show(10) val joinedDf = leftDf.join(broadcast(rightDf), usingColumns = Seq("k"), joinType = "left") joinedDf.cache() println("= joinedDf count:" + joinedDf.count()) joinedDf.show(10) {code} ApplicationMaster fails with such exception: {noformat} User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 949 in stage 8.0 failed 4 times, most recent failure: Lost task 949.3 in stage 8.0 (TID 4922, ip-10-35-162-219.ec2.internal, executor 139): java.lang.NegativeArraySizeException at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) at org.apache.spark.unsafe.types.UTF8String.clone(UTF8String.java:826) at org.apache.spark.sql.execution.columnar.StringColumnStats.gatherStats(ColumnStats.scala:216) at org.apache.spark.sql.execution.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:55) at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.org$apache$spark$sql$execution$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:97) at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:97) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:122) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager
[jira] [Commented] (SPARK-17403) Fatal Error: Scan cached strings
[ https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982595#comment-15982595 ] Paul Lysak commented on SPARK-17403: Looks like we have the same issue with Spark 2.1 on YARN (Amazon EMR release emr-5.4.0). Workaround that solves the issue for us (at the cost of some performance) is to use df.persist(StorageLevel.DISK_ONLY) instead of df.cache(). Depending on the node types, memory settings, storage level and some other factors I couldn't clearly identify it may appear as User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 158 in stage 504.0 failed 4 times, most recent failure: Lost task 158.3 in stage 504.0 (TID 427365, ip-10-35-162-171.ec2.internal, executor 83): java.lang.NegativeArraySizeException at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) at org.apache.spark.unsafe.types.UTF8String.clone(UTF8String.java:826) at org.apache.spark.sql.execution.columnar.StringColumnStats.gatherStats(ColumnStats.scala:217) at org.apache.spark.sql.execution.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:55) at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.org$apache$spark$sql$execution$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:97) at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:97) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:122) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) or as User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 27 in stage 61.0 failed 4 times, most recent failure: Lost task 27.3 in stage 61.0 (TID 36167, ip-10-35-162-149.ec2.internal, executor 1): java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_38$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) or as 2017-04-24 19:02:45,951 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_37$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:217) at org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:215)
[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors
[ https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15893877#comment-15893877 ] Paul Lysak commented on SPARK-19371: I'm observing similar behavior in Spark 2.1 - unfortunately, due to complex workflow of our application wasn't yet able to identify after which operation exactly all the partitions of DataFrame end up on a single executor, so no matter how big cluster is - only one executor picks all the job. > Cannot spread cached partitions evenly across executors > --- > > Key: SPARK-19371 > URL: https://issues.apache.org/jira/browse/SPARK-19371 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.1 >Reporter: Thunder Stumpges > > Before running an intensive iterative job (in this case a distributed topic > model training), we need to load a dataset and persist it across executors. > After loading from HDFS and persisting, the partitions are spread unevenly > across executors (based on the initial scheduling of the reads which are not > data locale sensitive). The partition sizes are even, just not their > distribution over executors. We currently have no way to force the partitions > to spread evenly, and as the iterative algorithm begins, tasks are > distributed to executors based on this initial load, forcing some very > unbalanced work. > This has been mentioned a > [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059] > of > [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html] > in > [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html] > user/dev group threads. > None of the discussions I could find had solutions that worked for me. Here > are examples of things I have tried. All resulted in partitions in memory > that were NOT evenly distributed to executors, causing future tasks to be > imbalanced across executors as well. > *Reduce Locality* > {code}spark.shuffle.reduceLocality.enabled=false/true{code} > *"Legacy" memory mode* > {code}spark.memory.useLegacyMode = true/false{code} > *Basic load and repartition* > {code} > val numPartitions = 48*16 > val df = sqlContext.read. > parquet("/data/folder_to_load"). > repartition(numPartitions). > persist > df.count > {code} > *Load and repartition to 2x partitions, then shuffle repartition down to > desired partitions* > {code} > val numPartitions = 48*16 > val df2 = sqlContext.read. > parquet("/data/folder_to_load"). > repartition(numPartitions*2) > val df = df2.repartition(numPartitions). > persist > df.count > {code} > It would be great if when persisting an RDD/DataFrame, if we could request > that those partitions be stored evenly across executors in preparation for > future tasks. > I'm not sure if this is a more general issue (I.E. not just involving > persisting RDDs), but for the persisted in-memory case, it can make a HUGE > difference in the over-all running time of the remaining work. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org