[jira] [Updated] (SPARK-48360) Simplify conditionals with predicate branches
[ https://issues.apache.org/jira/browse/SPARK-48360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Powell updated SPARK-48360: -- Summary: Simplify conditionals with predicate branches (was: Simplify conditionals containing predicates) > Simplify conditionals with predicate branches > - > > Key: SPARK-48360 > URL: https://issues.apache.org/jira/browse/SPARK-48360 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.3 >Reporter: Thomas Powell >Priority: Major > > The catalyst optimizer has many optimizations for {{CaseWhen}} and {{If}} > expressions, that can eliminate branches entirely or replace them with > Boolean logic. There are additional "always false" conditionals that could be > eliminated entirely. It would also be possible to replace conditionals with > Boolean logic where the {{if-branch}} and {{else-branch}} are themselves > predicates. The primary motivation would be to push more filters to the > datasource. > For example: > {code:java} > Filter(If(GreaterThan(a, 2), false, LessThanOrEqual(b <= 4))){code} > is equivalent to > {code:java} > # a not nullable > Filter(And(LessThanOrEqual(a, 2), LessThanOrEqual(b, 4)) > # a nullable > Filter(And(Not(EqualNotSafe(GreaterThan(a, 2), true), LessThanOrEqual(b, > 4 {code} > Within a filter the nullability handling is admittedly less important since > the expression evaluating to null would be semantically equivalent to false, > but the original conditional may have been intentionally written to not > return null when {{a}} may be null. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48360) Simplify conditionals containing predicates
Thomas Powell created SPARK-48360: - Summary: Simplify conditionals containing predicates Key: SPARK-48360 URL: https://issues.apache.org/jira/browse/SPARK-48360 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.4.3 Reporter: Thomas Powell The catalyst optimizer has many optimizations for {{CaseWhen}} and {{If}} expressions, that can eliminate branches entirely or replace them with Boolean logic. There are additional "always false" conditionals that could be eliminated entirely. It would also be possible to replace conditionals with Boolean logic where the {{if-branch}} and {{else-branch}} are themselves predicates. The primary motivation would be to push more filters to the datasource. For example: {code:java} Filter(If(GreaterThan(a, 2), false, LessThanOrEqual(b <= 4))){code} is equivalent to {code:java} # a not nullable Filter(And(LessThanOrEqual(a, 2), LessThanOrEqual(b, 4)) # a nullable Filter(And(Not(EqualNotSafe(GreaterThan(a, 2), true), LessThanOrEqual(b, 4 {code} Within a filter the nullability handling is admittedly less important since the expression evaluating to null would be semantically equivalent to false, but the original conditional may have been intentionally written to not return null when {{a}} may be null. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17608) Long type has incorrect serialization/deserialization
[ https://issues.apache.org/jira/browse/SPARK-17608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15708082#comment-15708082 ] Thomas Powell commented on SPARK-17608: --- Yes the confusing thing at the moment is the roundtripping so this sounds like a good solution. > Long type has incorrect serialization/deserialization > - > > Key: SPARK-17608 > URL: https://issues.apache.org/jira/browse/SPARK-17608 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell > > Am hitting issues when using {{dapply}} on a data frame that contains a > {{bigint}} in its schema. When this is converted to a SparkR data frame a > "bigint" gets converted to a R {{numeric}} type: > https://github.com/apache/spark/blob/master/R/pkg/R/types.R#L25. > However, the R {{numeric}} type gets converted to > {{org.apache.spark.sql.types.DoubleType}}: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L97. > The two directions therefore aren't compatible. If I use the same schema when > using dapply (and just an identity function) I will get type collisions > because the output type is a double but the schema expects a bigint. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533152#comment-15533152 ] Thomas Powell commented on SPARK-17634: --- This is the time within a single partition (and all partitions are actually evenly distributed). I've made the following change and this results in a constant time to read 1 rows. I'll can submit a PR back since this turns the deserialization into O(n) rather than the current implementation. {code} readMultipleObjectsWithDebug <- function(inputCon) { # readMultipleObjects will read multiple continuous objects from # a DataOutputStream. There is no preceding field telling the count # of the objects, so the number of objects varies, we try to read # all objects in a loop until the end of the stream. cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n") data <- vector('list', 10) cap <- 10 len <- 0 secs <- elapsedSecs() while (TRUE) { type <- SparkR:::readType(inputCon) if (type == "") { break } if(len == cap) { data <- c(data, vector('list', cap)) cap <- cap*2 } len <- len + 1 data[[len]] <- SparkR:::readTypedObject(inputCon, type) if (len %% 1 == 0) { duration <- elapsedSecs() - secs cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, append=TRUE, sep="\n") secs <- elapsedSecs() } } data[1:len] } {code} For the example I posted above all reads of 1 rows take around 2 seconds - the time to read the first 1 rows. I still think this is terribly slow. It's a huge cost (e.g. if your partitions have 1,000,000 rows) to pay even before you've actually run any computations. To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} to get a preview I still pay the cost of running {{dapply}} on one partition which can take a considerably long time. In the example I posted above my columns just contained integers but as soon as I expand to strings it takes significantly longer to read 1 row blocks due to the increased volume of data. > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted
[jira] [Comment Edited] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533152#comment-15533152 ] Thomas Powell edited comment on SPARK-17634 at 9/29/16 3:56 PM: This is the time within a single partition (and all partitions are actually evenly distributed). I've made the following change and this results in a constant time to read 1 rows. I'll can submit a PR back since this turns the deserialization into a linear algorithm which isn't the case in the current implementation. {code} readMultipleObjectsWithDebug <- function(inputCon) { # readMultipleObjects will read multiple continuous objects from # a DataOutputStream. There is no preceding field telling the count # of the objects, so the number of objects varies, we try to read # all objects in a loop until the end of the stream. cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n") data <- vector('list', 10) cap <- 10 len <- 0 secs <- elapsedSecs() while (TRUE) { type <- SparkR:::readType(inputCon) if (type == "") { break } if(len == cap) { data <- c(data, vector('list', cap)) cap <- cap*2 } len <- len + 1 data[[len]] <- SparkR:::readTypedObject(inputCon, type) if (len %% 1 == 0) { duration <- elapsedSecs() - secs cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, append=TRUE, sep="\n") secs <- elapsedSecs() } } data[1:len] } {code} For the example I posted above all reads of 1 rows take around 2 seconds - the time to read the first 1 rows. I still think this is terribly slow. It's a huge cost (e.g. if your partitions have 1,000,000 rows) to pay even before you've actually run any computations. To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} to get a preview I still pay the cost of running {{dapply}} on one partition which can take a considerably long time. In the example I posted above my columns just contained integers but as soon as I expand to strings it takes significantly longer to read 1 row blocks due to the increased volume of data. was (Author: iamthomaspowell): This is the time within a single partition (and all partitions are actually evenly distributed). I've made the following change and this results in a constant time to read 1 rows. I'll can submit a PR back since this turns the deserialization into O(n) rather than the current implementation. {code} readMultipleObjectsWithDebug <- function(inputCon) { # readMultipleObjects will read multiple continuous objects from # a DataOutputStream. There is no preceding field telling the count # of the objects, so the number of objects varies, we try to read # all objects in a loop until the end of the stream. cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n") data <- vector('list', 10) cap <- 10 len <- 0 secs <- elapsedSecs() while (TRUE) { type <- SparkR:::readType(inputCon) if (type == "") { break } if(len == cap) { data <- c(data, vector('list', cap)) cap <- cap*2 } len <- len + 1 data[[len]] <- SparkR:::readTypedObject(inputCon, type) if (len %% 1 == 0) { duration <- elapsedSecs() - secs cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, append=TRUE, sep="\n") secs <- elapsedSecs() } } data[1:len] } {code} For the example I posted above all reads of 1 rows take around 2 seconds - the time to read the first 1 rows. I still think this is terribly slow. It's a huge cost (e.g. if your partitions have 1,000,000 rows) to pay even before you've actually run any computations. To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} to get a preview I still pay the cost of running {{dapply}} on one partition which can take a considerably long time. In the example I posted above my columns just contained integers but as soon as I expand to strings it takes significantly longer to read 1 row blocks due to the increased volume of data. > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15516975#comment-15516975 ] Thomas Powell commented on SPARK-17634: --- Several hours. I modified {{worker.R}} so that it logs the time it takes to read each 1 rows. I saw terrible slow down. {code} Starting to read the data 1: 2.663s 2: 4.223s 3: 6.005s 4: 7.799s 5: 9.998s ... 54: 116.293s 55: 122.248s 56: 122.798s 57: 126.57s 58: 135.371s {code} This isn't total time to read rows. It is the time to read each 1 rows. I modified the {{readMultipleObjects}} function to be: {code} readMultipleObjectsWithDebug <- function(inputCon) { # readMultipleObjects will read multiple continuous objects from # a DataOutputStream. There is no preceding field telling the count # of the objects, so the number of objects varies, we try to read # all objects in a loop until the end of the stream. cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n") data <- list() secs <- elapsedSecs() while (TRUE) { # If reaching the end of the stream, type returned should be "". type <- SparkR:::readType(inputCon) if (type == "") { break } data[[length(data) + 1L]] <- SparkR:::readTypedObject(inputCon, type) if (length(data) %% 1 == 0) { duration <- elapsedSecs() - secs cat(paste(length(data), ": ", duration, "s", sep=""), file=debugFile, append=TRUE, sep="\n") secs <- elapsedSecs() } } data # this is a list of named lists now } {code} > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > -
[jira] [Comment Edited] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513673#comment-15513673 ] Thomas Powell edited comment on SPARK-17634 at 9/22/16 4:35 PM: I also ran this on a row limited version of this dataset. The first 22 tasks completed immediately (since there was no data to process). The final task recorded 13MB of shuffle read, (out of ~300MB of shuffle write in the previous stage), at which point it then stalled. was (Author: iamthomaspowell): I also ran this on a row limited version of this dataset. The first 22 tasks completed immediately (since there was no data to process). The final task recorded 32MB of shuffle read, (out of ~300MB of shuffle write in the previous stage), at which point it then stalled. > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted
[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply
[ https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513673#comment-15513673 ] Thomas Powell commented on SPARK-17634: --- I also ran this on a row limited version of this dataset. The first 22 tasks completed immediately (since there was no data to process). The final task recorded 32MB of shuffle read, (out of ~300MB of shuffle write in the previous stage), at which point it then stalled. > Spark job hangs when using dapply > - > > Key: SPARK-17634 > URL: https://issues.apache.org/jira/browse/SPARK-17634 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Thomas Powell >Priority: Critical > > I'm running into an issue when using dapply on yarn. I have a data frame > backed by files in parquet with around 200 files that is around 2GB. When I > load this in with the new partition coalescing it ends up having around 20 > partitions so each one roughly 100MB. The data frame itself has 4 columns of > integers and doubles. If I run a count over this things work fine. > However, if I add a {{dapply}} in between the read and the {{count}} that > just uses an identity function the tasks hang and make no progress. Both the > R and Java processes are running on the Spark nodes and are listening on the > {{SPARKR_WORKER_PORT}}. > {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} > I took a jstack of the Java process and see that it is just listening on the > socket but never seems to make any progress. The R process is harder to debug > what it is doing. > {code} > Thread 112823: (state = IN_NATIVE) > - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], > int, int, int) @bci=0 (Interpreted frame) > - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, > int, int) @bci=8, line=116 (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 > (Interpreted frame) > - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 > (Interpreted frame) > - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) > - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) > - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) > - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() > @bci=4, line=212 (Interpreted frame) > - > org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) > @bci=25, line=96 (Interpreted frame) > - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) > @bci=109, line=87 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) > @bci=322, line=59 (Interpreted frame) > - > org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) > @bci=5, line=29 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) > @bci=59, line=178 (Interpreted frame) > - > org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) > @bci=5, line=175 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, > int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) > - > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, > java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) > - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) > - > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, > org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) > -
[jira] [Created] (SPARK-17634) Spark job hangs when using dapply
Thomas Powell created SPARK-17634: - Summary: Spark job hangs when using dapply Key: SPARK-17634 URL: https://issues.apache.org/jira/browse/SPARK-17634 Project: Spark Issue Type: Bug Components: SparkR Affects Versions: 2.0.0 Reporter: Thomas Powell Priority: Critical I'm running into an issue when using dapply on yarn. I have a data frame backed by files in parquet with around 200 files that is around 2GB. When I load this in with the new partition coalescing it ends up having around 20 partitions so each one roughly 100MB. The data frame itself has 4 columns of integers and doubles. If I run a count over this things work fine. However, if I add a {{dapply}} in between the read and the {{count}} that just uses an identity function the tasks hang and make no progress. Both the R and Java processes are running on the Spark nodes and are listening on the {{SPARKR_WORKER_PORT}}. {{result <- dapply(df, function(x){x}, SparkR::schema(df))}} I took a jstack of the Java process and see that it is just listening on the socket but never seems to make any progress. The R process is harder to debug what it is doing. {code} Thread 112823: (state = IN_NATIVE) - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], int, int, int) @bci=0 (Interpreted frame) - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, int, int) @bci=8, line=116 (Interpreted frame) - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 (Interpreted frame) - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 (Interpreted frame) - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame) - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame) - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame) - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() @bci=4, line=212 (Interpreted frame) - org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) @bci=25, line=96 (Interpreted frame) - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) @bci=109, line=87 (Interpreted frame) - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator) @bci=322, line=59 (Interpreted frame) - org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) @bci=5, line=29 (Interpreted frame) - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator) @bci=59, line=178 (Interpreted frame) - org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object) @bci=5, line=175 (Interpreted frame) - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext, int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame) - org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object, java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame) - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame) - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=168, line=79 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=47 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long, int, org.apache.spark.metrics.MetricsSystem) @bci=82, line=85 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 (Interpreted frame) -
[jira] [Created] (SPARK-17608) Long type has incorrect serialization/deserialization
Thomas Powell created SPARK-17608: - Summary: Long type has incorrect serialization/deserialization Key: SPARK-17608 URL: https://issues.apache.org/jira/browse/SPARK-17608 Project: Spark Issue Type: Bug Components: SparkR Affects Versions: 2.0.0 Reporter: Thomas Powell Am hitting issues when using {{dapply}} on a data frame that contains a {{bigint}} in its schema. When this is converted to a SparkR data frame a "bigint" gets converted to a R {{numeric}} type: https://github.com/apache/spark/blob/master/R/pkg/R/types.R#L25. However, the R {{numeric}} type gets converted to {{org.apache.spark.sql.types.DoubleType}}: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L97. The two directions therefore aren't compatible. If I use the same schema when using dapply (and just an identity function) I will get type collisions because the output type is a double but the schema expects a bigint. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org