[ 
https://issues.apache.org/jira/browse/SPARK-19243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995489#comment-15995489
 ] 

Harish edited comment on SPARK-19243 at 5/3/17 7:52 PM:
--------------------------------------------------------

i am getting the same error in spark 2.1.0. 
I have 10 node cluster with 109GB each.
My data set is just 30K rows with 60 columns. I see total 72 partitions after 
loading the orc file to DF. then re-partitioned to 2001. No luck.

[~srowen]  did any one raised the similar issue?

Regards,
 Harish


was (Author: harishk15):
i am getting the same error in spark 2.1.0. 
I have 10 node cluster with 109GB each.
My data set is just 30K rows with 60 columns. I see total 72 partitions after 
loading the orc file to DF. then re-partitioned to 2001. No luck.

Regards,
 Harish

> Error when selecting from DataFrame containing parsed data from files larger 
> than 1MB
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-19243
>                 URL: https://issues.apache.org/jira/browse/SPARK-19243
>             Project: Spark
>          Issue Type: Bug
>            Reporter: Ben
>
> I hope I can describe the problem clearly. This error happens with Spark 
> 2.0.1. However, I tried with Spark 2.1.0 on my test PC and it worked there, 
> none of the issues below, but I can't try it on the test cluster because 
> Spark needs to be upgraded there. I'm opening this ticket because if it's a 
> bug, maybe something is still partially present in Spark 2.1.0.
> Initially I though it was my script's problem so I tried to debug, until I 
> found why this is happening.
> Step by step, I load XML files through spark-xml into a DataFrame. In my 
> case, the rowTag is the root tag, so each XML file creates a row. The XML 
> structure is fairly complex, which are converted to nested columns or arrays 
> inside the DF. Since I need to flatten the whole table, and since the output 
> is not fixed but I dynamically select what I want as output, in case I need 
> to output columns that have been parsed as arrays, then I explode them with 
> explode() only when needed.
> Normally I can select various columns that don't have many entries without a 
> problem. 
> I select a column that has a lot of entries into a new DF, e.g. simply through
> {noformat}
> df2 = df.select(...)
> {noformat}
> and then if I try to do a count() or first() or anything, Spark behaves two 
> ways:
> 1. If the source file was smaller than 1MB, it works.
> 2. If the source file was larger than 1MB, the following error occurs:
> {noformat}
> Traceback (most recent call last):
>   File \"/myCode.py\", line 71, in main
>     df.count()
>   File 
> \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/dataframe.py\",
>  line 299, in count
>     return int(self._jdf.count())
>   File 
> \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\",
>  line 1133, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File 
> \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/utils.py\",
>  line 63, in deco
>     return f(*a, **kw)
>   File 
> \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\",
>  line 319, in get_return_value
>     format(target_id, \".\", name), value)
> Py4JJavaError: An error occurred while calling o180.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 
> (TID 6, compname): java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>   at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
>   at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2526)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
>   at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at py4j.Gateway.invoke(Gateway.java:280)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:214)
>   at java.lang.Thread.run(Thread.java:745)
>   
> Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>   at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> {noformat}
> It doesn't happen only when calling count(), but anything.
> As mentioned, this only happens when there are many entries (This only 
> happens in my case in only one configuration, when the column with the most 
> entries is selected, a big difference from the others, and these entries were 
> in an array before, and have been exploded), otherwise it works fine 
> independently of the file size.
> The same thing happens if the source is an archive containing the XML files. 
> If the archive itself is larger than 1MB, error; smaller, works.
> The Spark log shows that the error is when calling e.g. count(), although in 
> my script's log I get this error after starting the select(...) command, but 
> maybe that's because of the parallel processing. Consequently, I'm not sure 
> whether the error happens during the explode(), select(), or for some reason 
> after the DF has been prepared and I call e.g. count().
> I have allocated more than enough memory. On another system I got a `Java 
> heap space` error, I guess on the way to getting the actual error.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to