Thomas Powell created SPARK-17634:
-------------------------------------

             Summary: Spark job hangs when using dapply
                 Key: SPARK-17634
                 URL: https://issues.apache.org/jira/browse/SPARK-17634
             Project: Spark
          Issue Type: Bug
          Components: SparkR
    Affects Versions: 2.0.0
            Reporter: Thomas Powell
            Priority: Critical


I'm running into an issue when using dapply on yarn. I have a data frame backed 
by files in parquet with around 200 files that is around 2GB. When I load this 
in with the new partition coalescing it ends up having around 20 partitions so 
each one roughly 100MB. The data frame itself has 4 columns of integers and 
doubles. If I run a count over this things work fine.

However, if I add a {{dapply}} in between the read and the {{count}} that just 
uses an identity function the tasks hang and make no progress. Both the R and 
Java processes are running on the Spark nodes and are listening on the 
{{SPARKR_WORKER_PORT}}.

{{result <- dapply(df, function(x){x}, SparkR::schema(df))}}

I took a jstack of the Java process and see that it is just listening on the 
socket but never seems to make any progress. The R process is harder to debug 
what it is doing.
{code}
Thread 112823: (state = IN_NATIVE)
 - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], int, 
int, int) @bci=0 (Interpreted frame)
 - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, 
int, int) @bci=8, line=116 (Interpreted frame)
 - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 
(Interpreted frame)
 - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 
(Interpreted frame)
 - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
 - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
 - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
 - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() 
@bci=4, line=212 (Interpreted frame)
 - 
org.apache.spark.api.r.RRunner$$anon$1.<init>(org.apache.spark.api.r.RRunner) 
@bci=25, line=96 (Interpreted frame)
 - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) 
@bci=109, line=87 (Interpreted frame)
 - 
org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator)
 @bci=322, line=59 (Interpreted frame)
 - 
org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) 
@bci=5, line=29 (Interpreted frame)
 - 
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator)
 @bci=59, line=178 (Interpreted frame)
 - 
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object)
 @bci=5, line=175 (Interpreted frame)
 - 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext,
 int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
 - 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object,
 java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
 - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame)
 - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
 - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
 - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
 - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
 - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
 - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
 - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
 - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
 - 
org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) 
@bci=168, line=79 (Interpreted frame)
 - 
org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) 
@bci=2, line=47 (Interpreted frame)
 - org.apache.spark.scheduler.Task.run(long, int, 
org.apache.spark.metrics.MetricsSystem) @bci=82, line=85 (Interpreted frame)
 - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 
(Interpreted frame)
 - 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
 @bci=95, line=1142 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617 
(Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
{code}

Any recommendations on how best to debug? Nothing appears in the logs since the 
processes don't actually fail. The executors themselves have 4GB of memory 
which should be more than enough.

My feeling is this could be something around serialization?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to