[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15546871#comment-15546871 ] Shivaram Venkataraman commented on SPARK-17634: --- [~iamthomaspowell] Thanks for the performance profiling and it would be great to have a PR with the code change you have above. I think there are a couple of issues that I would like to understand better: (a) The base cost of 2 seconds for any num rows < 1: Does this come from launching the R process or is there some other bottleneck here ? Given that calls to readTypedObject and readType are called linear number of times, I think this either comes down to memory allocation or something like that ? (b) Slowing down of reads as num rows increases: Does this problem get addressed by your change ? Other than the memory allocation bit I think this could also happen if the OS is out of memory and the R process is thrashing or something like that. > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533152#comment-15533152 ] Thomas Powell commented on SPARK-17634: --- This is the time within a single partition (and all partitions are actually evenly distributed). I've made the following change and this results in a constant time to read 1 rows. I'll can submit a PR back since this turns the deserialization into O(n) rather than the current implementation. {code} readMultipleObjectsWithDebug <- function(inputCon) { # readMultipleObjects will read multiple continuous objects from # a DataOutputStream. There is no preceding field telling the count # of the objects, so the number of objects varies, we try to read # all objects in a loop until the end of the stream. cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n") data <- vector('list', 10) cap <- 10 len <- 0 secs <- elapsedSecs() while (TRUE) { type <- SparkR:::readType(inputCon) if (type == "") { break } if(len == cap) { data <- c(data, vector('list', cap)) cap <- cap*2 } len <- len + 1 data[[len]] <- SparkR:::readTypedObject(inputCon, type) if (len %% 1 == 0) { duration <- elapsedSecs() - secs cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, append=TRUE, sep="\n") secs <- elapsedSecs() } } data[1:len] } {code} For the example I posted above all reads of 1 rows take around 2 seconds - the time to read the first 1 rows. I still think this is terribly slow. It's a huge cost (e.g. if your partitions have 1,000,000 rows) to pay even before you've actually run any computations. To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} to get a preview I still pay the cost of running {{dapply}} on one partition which can take a considerably long time. In the example I posted above my columns just contained integers but as soon as I expand to strings it takes significantly longer to read 1 row blocks due to the increased volume of data. > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15517839#comment-15517839 ] Felix Cheung commented on SPARK-17634: -- Also it would be great if you have a shareable example to reproduce this. If not, could you share more of your code leading to this? > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - > org.apache.spark.scheduler.S
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15517837#comment-15517837 ] Felix Cheung commented on SPARK-17634: -- I see. Do you know if the partitions are evenly distributed and not skewed? I think you can also see if which of the executor is running in the Spark UI. I have seen the R process taking up a significant larger amount of memory when processing data but it's not 100% obvious that it is the case here. > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > -
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516975#comment-15516975 ] Thomas Powell commented on SPARK-17634: --- Several hours. I modified {{worker.R}} so that it logs the time it takes to read each 1 rows. I saw terrible slow down. {code} Starting to read the data 1: 2.663s 2: 4.223s 3: 6.005s 4: 7.799s 5: 9.998s ... 54: 116.293s 55: 122.248s 56: 122.798s 57: 126.57s 58: 135.371s {code} This isn't total time to read rows. It is the time to read each 1 rows. I modified the {{readMultipleObjects}} function to be: {code} readMultipleObjectsWithDebug <- function(inputCon) { # readMultipleObjects will read multiple continuous objects from # a DataOutputStream. There is no preceding field telling the count # of the objects, so the number of objects varies, we try to read # all objects in a loop until the end of the stream. cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n") data <- list() secs <- elapsedSecs() while (TRUE) { # If reaching the end of the stream, type returned should be "". type <- SparkR:::readType(inputCon) if (type == "") { break } data[[length(data) + 1L]] <- SparkR:::readTypedObject(inputCon, type) if (length(data) %% 1 == 0) { duration <- elapsedSecs() - secs cat(paste(length(data), ": ", duration, "s", sep=""), file=debugFile, append=TRUE, sep="\n") secs <- elapsedSecs() } } data # this is a list of named lists now } {code} > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.M
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515176#comment-15515176 ] Felix Cheung commented on SPARK-17634: -- How long have you let it run? > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - > org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) > @bci=168, line=79 (Interpreted frame) > - > org
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513673#comment-15513673 ] Thomas Powell commented on SPARK-17634: --- I also ran this on a row limited version of this dataset. The first 22 tasks completed immediately (since there was no data to process). The final task recorded 32MB of shuffle read, (out of ~300MB of shuffle write in the previous stage), at which point it then stalled. > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.it