[jira] [Updated] (SPARK-48360) Simplify conditionals with predicate branches

2024-05-20 Thread Thomas Powell (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Powell updated SPARK-48360:
--
Summary: Simplify conditionals with predicate branches  (was: Simplify 
conditionals containing predicates)

> Simplify conditionals with predicate branches
> -
>
> Key: SPARK-48360
> URL: https://issues.apache.org/jira/browse/SPARK-48360
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.3
>Reporter: Thomas Powell
>Priority: Major
>
> The catalyst optimizer has many optimizations for {{CaseWhen}} and {{If}} 
> expressions, that can eliminate branches entirely or replace them with 
> Boolean logic. There are additional "always false" conditionals that could be 
> eliminated entirely. It would also be possible to replace conditionals with 
> Boolean logic where the {{if-branch}} and {{else-branch}} are themselves 
> predicates. The primary motivation would be to push more filters to the 
> datasource.
> For example:
> {code:java}
> Filter(If(GreaterThan(a, 2), false, LessThanOrEqual(b <= 4))){code}
> is equivalent to
> {code:java}
> # a not nullable
> Filter(And(LessThanOrEqual(a, 2), LessThanOrEqual(b, 4))
> # a nullable
> Filter(And(Not(EqualNotSafe(GreaterThan(a, 2), true), LessThanOrEqual(b, 
> 4 {code}
> Within a filter the nullability handling is admittedly less important since 
> the expression evaluating to null would be semantically equivalent to false, 
> but the original conditional may have been intentionally written to not 
> return null when {{a}} may be null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48360) Simplify conditionals containing predicates

2024-05-20 Thread Thomas Powell (Jira)
Thomas Powell created SPARK-48360:
-

 Summary: Simplify conditionals containing predicates
 Key: SPARK-48360
 URL: https://issues.apache.org/jira/browse/SPARK-48360
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.4.3
Reporter: Thomas Powell


The catalyst optimizer has many optimizations for {{CaseWhen}} and {{If}} 
expressions, that can eliminate branches entirely or replace them with Boolean 
logic. There are additional "always false" conditionals that could be 
eliminated entirely. It would also be possible to replace conditionals with 
Boolean logic where the {{if-branch}} and {{else-branch}} are themselves 
predicates. The primary motivation would be to push more filters to the 
datasource.

For example:
{code:java}
Filter(If(GreaterThan(a, 2), false, LessThanOrEqual(b <= 4))){code}
is equivalent to
{code:java}
# a not nullable
Filter(And(LessThanOrEqual(a, 2), LessThanOrEqual(b, 4))

# a nullable
Filter(And(Not(EqualNotSafe(GreaterThan(a, 2), true), LessThanOrEqual(b, 4 
{code}
Within a filter the nullability handling is admittedly less important since the 
expression evaluating to null would be semantically equivalent to false, but 
the original conditional may have been intentionally written to not return null 
when {{a}} may be null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17608) Long type has incorrect serialization/deserialization

2016-11-30 Thread Thomas Powell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15708082#comment-15708082
 ] 

Thomas Powell commented on SPARK-17608:
---

Yes the confusing thing at the moment is the roundtripping so this sounds like 
a good solution.

> Long type has incorrect serialization/deserialization
> -
>
> Key: SPARK-17608
> URL: https://issues.apache.org/jira/browse/SPARK-17608
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.0.0
>Reporter: Thomas Powell
>
> Am hitting issues when using {{dapply}} on a data frame that contains a 
> {{bigint}} in its schema. When this is converted to a SparkR data frame a 
> "bigint" gets converted to a R {{numeric}} type: 
> https://github.com/apache/spark/blob/master/R/pkg/R/types.R#L25.
> However, the R {{numeric}} type gets converted to 
> {{org.apache.spark.sql.types.DoubleType}}: 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L97.
> The two directions therefore aren't compatible. If I use the same schema when 
> using dapply (and just an identity function) I will get type collisions 
> because the output type is a double but the schema expects a bigint. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply

2016-09-29 Thread Thomas Powell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533152#comment-15533152
 ] 

Thomas Powell commented on SPARK-17634:
---

This is the time within a single partition (and all partitions are actually 
evenly distributed). I've made the following change and this results in a 
constant time to read 1 rows. I'll can submit a PR back since this turns 
the deserialization into O(n) rather than the current implementation.
{code}
readMultipleObjectsWithDebug <- function(inputCon) {
# readMultipleObjects will read multiple continuous objects from
# a DataOutputStream. There is no preceding field telling the count
# of the objects, so the number of objects varies, we try to read
# all objects in a loop until the end of the stream.
cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n")
data <- vector('list', 10)
cap <- 10
len <- 0
secs <- elapsedSecs()
while (TRUE) {
type <- SparkR:::readType(inputCon)
if (type == "") {
break
}
if(len == cap) {
data <- c(data, vector('list', cap))
cap <- cap*2
}
len <- len + 1
data[[len]] <- SparkR:::readTypedObject(inputCon, type)
if (len %% 1 == 0) {
duration <- elapsedSecs() - secs
cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, 
append=TRUE, sep="\n")
secs <- elapsedSecs()
}
}
data[1:len]
}
{code}
For the example I posted above all reads of 1 rows take around 2 seconds - 
the time to read the first 1 rows. 

I still think this is terribly slow. It's a huge cost (e.g. if your partitions 
have 1,000,000 rows) to pay even before you've actually run any computations. 
To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} 
to get a preview I still pay the cost of running {{dapply}} on one partition 
which can take a considerably long time. In the example I posted above my 
columns just contained integers but as soon as I expand to strings it takes 
significantly longer to read 1 row blocks due to the increased volume of 
data.

> Spark job hangs when using dapply
> -
>
> Key: SPARK-17634
> URL: https://issues.apache.org/jira/browse/SPARK-17634
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.0.0
>Reporter: Thomas Powell
>Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame 
> backed by files in parquet with around 200 files that is around 2GB. When I 
> load this in with the new partition coalescing it ends up having around 20 
> partitions so each one roughly 100MB. The data frame itself has 4 columns of 
> integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that 
> just uses an identity function the tasks hang and make no progress. Both the 
> R and Java processes are running on the Spark nodes and are listening on the 
> {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the 
> socket but never seems to make any progress. The R process is harder to debug 
> what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], 
> int, int, int) @bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, 
> int, int) @bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 
> (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 
> (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() 
> @bci=4, line=212 (Interpreted frame)
>  - 
> org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) 
> @bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) 
> @bci=109, line=87 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator)
>  @bci=322, line=59 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object)
>  @bci=5, line=29 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator)
>  @bci=59, line=178 (Interpreted 

[jira] [Comment Edited] (SPARK-17634) Spark job hangs when using dapply

2016-09-29 Thread Thomas Powell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533152#comment-15533152
 ] 

Thomas Powell edited comment on SPARK-17634 at 9/29/16 3:56 PM:


This is the time within a single partition (and all partitions are actually 
evenly distributed). I've made the following change and this results in a 
constant time to read 1 rows. I'll can submit a PR back since this turns 
the deserialization into a linear algorithm which isn't the case in the current 
implementation.
{code}
readMultipleObjectsWithDebug <- function(inputCon) {
# readMultipleObjects will read multiple continuous objects from
# a DataOutputStream. There is no preceding field telling the count
# of the objects, so the number of objects varies, we try to read
# all objects in a loop until the end of the stream.
cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n")
data <- vector('list', 10)
cap <- 10
len <- 0
secs <- elapsedSecs()
while (TRUE) {
type <- SparkR:::readType(inputCon)
if (type == "") {
break
}
if(len == cap) {
data <- c(data, vector('list', cap))
cap <- cap*2
}
len <- len + 1
data[[len]] <- SparkR:::readTypedObject(inputCon, type)
if (len %% 1 == 0) {
duration <- elapsedSecs() - secs
cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, 
append=TRUE, sep="\n")
secs <- elapsedSecs()
}
}
data[1:len]
}
{code}
For the example I posted above all reads of 1 rows take around 2 seconds - 
the time to read the first 1 rows. 

I still think this is terribly slow. It's a huge cost (e.g. if your partitions 
have 1,000,000 rows) to pay even before you've actually run any computations. 
To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} 
to get a preview I still pay the cost of running {{dapply}} on one partition 
which can take a considerably long time. In the example I posted above my 
columns just contained integers but as soon as I expand to strings it takes 
significantly longer to read 1 row blocks due to the increased volume of 
data.


was (Author: iamthomaspowell):
This is the time within a single partition (and all partitions are actually 
evenly distributed). I've made the following change and this results in a 
constant time to read 1 rows. I'll can submit a PR back since this turns 
the deserialization into O(n) rather than the current implementation.
{code}
readMultipleObjectsWithDebug <- function(inputCon) {
# readMultipleObjects will read multiple continuous objects from
# a DataOutputStream. There is no preceding field telling the count
# of the objects, so the number of objects varies, we try to read
# all objects in a loop until the end of the stream.
cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n")
data <- vector('list', 10)
cap <- 10
len <- 0
secs <- elapsedSecs()
while (TRUE) {
type <- SparkR:::readType(inputCon)
if (type == "") {
break
}
if(len == cap) {
data <- c(data, vector('list', cap))
cap <- cap*2
}
len <- len + 1
data[[len]] <- SparkR:::readTypedObject(inputCon, type)
if (len %% 1 == 0) {
duration <- elapsedSecs() - secs
cat(paste(len, ": ", duration, "s", sep=""), file=debugFile, 
append=TRUE, sep="\n")
secs <- elapsedSecs()
}
}
data[1:len]
}
{code}
For the example I posted above all reads of 1 rows take around 2 seconds - 
the time to read the first 1 rows. 

I still think this is terribly slow. It's a huge cost (e.g. if your partitions 
have 1,000,000 rows) to pay even before you've actually run any computations. 
To me this makes {{dapply}} basically unusable - even if I do {{SparkR::head}} 
to get a preview I still pay the cost of running {{dapply}} on one partition 
which can take a considerably long time. In the example I posted above my 
columns just contained integers but as soon as I expand to strings it takes 
significantly longer to read 1 row blocks due to the increased volume of 
data.

> Spark job hangs when using dapply
> -
>
> Key: SPARK-17634
> URL: https://issues.apache.org/jira/browse/SPARK-17634
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.0.0
>Reporter: Thomas Powell
>Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame 
> backed by files in parquet with around 200 files that is around 2GB. When I 
> load this in with the new partition coalescing it ends up having around 

[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply

2016-09-23 Thread Thomas Powell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15516975#comment-15516975
 ] 

Thomas Powell commented on SPARK-17634:
---

Several hours. I modified {{worker.R}} so that it logs the time it takes to 
read each 1 rows. I saw terrible slow down.
{code}
Starting to read the data
1: 2.663s
2: 4.223s
3: 6.005s
4: 7.799s
5: 9.998s
...
54: 116.293s
55: 122.248s
56: 122.798s
57: 126.57s
58: 135.371s
{code}

This isn't total time to read rows. It is the time to read each 1 rows. I 
modified the {{readMultipleObjects}} function to be:
{code}
readMultipleObjectsWithDebug <- function(inputCon) {
# readMultipleObjects will read multiple continuous objects from
# a DataOutputStream. There is no preceding field telling the count
# of the objects, so the number of objects varies, we try to read
# all objects in a loop until the end of the stream.
cat("Starting to read the data", file=debugFile, append=TRUE, sep="\n")
data <- list()
secs <- elapsedSecs()
while (TRUE) {
# If reaching the end of the stream, type returned should be "".
type <- SparkR:::readType(inputCon)
if (type == "") {
break
}
data[[length(data) + 1L]] <- SparkR:::readTypedObject(inputCon, type)
if (length(data) %% 1 == 0) {
duration <- elapsedSecs() - secs
cat(paste(length(data), ": ", duration, "s", sep=""), 
file=debugFile, append=TRUE, sep="\n")
secs <- elapsedSecs()
}
}
data # this is a list of named lists now
}
{code}

> Spark job hangs when using dapply
> -
>
> Key: SPARK-17634
> URL: https://issues.apache.org/jira/browse/SPARK-17634
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.0.0
>Reporter: Thomas Powell
>Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame 
> backed by files in parquet with around 200 files that is around 2GB. When I 
> load this in with the new partition coalescing it ends up having around 20 
> partitions so each one roughly 100MB. The data frame itself has 4 columns of 
> integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that 
> just uses an identity function the tasks hang and make no progress. Both the 
> R and Java processes are running on the Spark nodes and are listening on the 
> {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the 
> socket but never seems to make any progress. The R process is harder to debug 
> what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], 
> int, int, int) @bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, 
> int, int) @bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 
> (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 
> (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() 
> @bci=4, line=212 (Interpreted frame)
>  - 
> org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) 
> @bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) 
> @bci=109, line=87 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator)
>  @bci=322, line=59 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object)
>  @bci=5, line=29 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator)
>  @bci=59, line=178 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object)
>  @bci=5, line=175 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext,
>  int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object,
>  java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
>  - 

[jira] [Comment Edited] (SPARK-17634) Spark job hangs when using dapply

2016-09-22 Thread Thomas Powell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513673#comment-15513673
 ] 

Thomas Powell edited comment on SPARK-17634 at 9/22/16 4:35 PM:


I also ran this on a row limited version of this dataset. The first 22 tasks 
completed immediately (since there was no data to process). The final task 
recorded 13MB of shuffle read, (out of ~300MB of shuffle write in the previous 
stage), at which point it then stalled.


was (Author: iamthomaspowell):
I also ran this on a row limited version of this dataset. The first 22 tasks 
completed immediately (since there was no data to process). The final task 
recorded 32MB of shuffle read, (out of ~300MB of shuffle write in the previous 
stage), at which point it then stalled.

> Spark job hangs when using dapply
> -
>
> Key: SPARK-17634
> URL: https://issues.apache.org/jira/browse/SPARK-17634
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.0.0
>Reporter: Thomas Powell
>Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame 
> backed by files in parquet with around 200 files that is around 2GB. When I 
> load this in with the new partition coalescing it ends up having around 20 
> partitions so each one roughly 100MB. The data frame itself has 4 columns of 
> integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that 
> just uses an identity function the tasks hang and make no progress. Both the 
> R and Java processes are running on the Spark nodes and are listening on the 
> {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the 
> socket but never seems to make any progress. The R process is harder to debug 
> what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], 
> int, int, int) @bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, 
> int, int) @bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 
> (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 
> (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() 
> @bci=4, line=212 (Interpreted frame)
>  - 
> org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) 
> @bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) 
> @bci=109, line=87 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator)
>  @bci=322, line=59 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object)
>  @bci=5, line=29 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator)
>  @bci=59, line=178 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object)
>  @bci=5, line=175 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext,
>  int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object,
>  java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted 

[jira] [Commented] (SPARK-17634) Spark job hangs when using dapply

2016-09-22 Thread Thomas Powell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513673#comment-15513673
 ] 

Thomas Powell commented on SPARK-17634:
---

I also ran this on a row limited version of this dataset. The first 22 tasks 
completed immediately (since there was no data to process). The final task 
recorded 32MB of shuffle read, (out of ~300MB of shuffle write in the previous 
stage), at which point it then stalled.

> Spark job hangs when using dapply
> -
>
> Key: SPARK-17634
> URL: https://issues.apache.org/jira/browse/SPARK-17634
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.0.0
>Reporter: Thomas Powell
>Priority: Critical
>
> I'm running into an issue when using dapply on yarn. I have a data frame 
> backed by files in parquet with around 200 files that is around 2GB. When I 
> load this in with the new partition coalescing it ends up having around 20 
> partitions so each one roughly 100MB. The data frame itself has 4 columns of 
> integers and doubles. If I run a count over this things work fine.
> However, if I add a {{dapply}} in between the read and the {{count}} that 
> just uses an identity function the tasks hang and make no progress. Both the 
> R and Java processes are running on the Spark nodes and are listening on the 
> {{SPARKR_WORKER_PORT}}.
> {{result <- dapply(df, function(x){x}, SparkR::schema(df))}}
> I took a jstack of the Java process and see that it is just listening on the 
> socket but never seems to make any progress. The R process is harder to debug 
> what it is doing.
> {code}
> Thread 112823: (state = IN_NATIVE)
>  - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], 
> int, int, int) @bci=0 (Interpreted frame)
>  - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, 
> int, int) @bci=8, line=116 (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 
> (Interpreted frame)
>  - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 
> (Interpreted frame)
>  - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
>  - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
>  - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
>  - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() 
> @bci=4, line=212 (Interpreted frame)
>  - 
> org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) 
> @bci=25, line=96 (Interpreted frame)
>  - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) 
> @bci=109, line=87 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator)
>  @bci=322, line=59 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object)
>  @bci=5, line=29 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator)
>  @bci=59, line=178 (Interpreted frame)
>  - 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object)
>  @bci=5, line=175 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext,
>  int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object,
>  java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
>  - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
>  - 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
> org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
>  - 

[jira] [Created] (SPARK-17634) Spark job hangs when using dapply

2016-09-22 Thread Thomas Powell (JIRA)
Thomas Powell created SPARK-17634:
-

 Summary: Spark job hangs when using dapply
 Key: SPARK-17634
 URL: https://issues.apache.org/jira/browse/SPARK-17634
 Project: Spark
  Issue Type: Bug
  Components: SparkR
Affects Versions: 2.0.0
Reporter: Thomas Powell
Priority: Critical


I'm running into an issue when using dapply on yarn. I have a data frame backed 
by files in parquet with around 200 files that is around 2GB. When I load this 
in with the new partition coalescing it ends up having around 20 partitions so 
each one roughly 100MB. The data frame itself has 4 columns of integers and 
doubles. If I run a count over this things work fine.

However, if I add a {{dapply}} in between the read and the {{count}} that just 
uses an identity function the tasks hang and make no progress. Both the R and 
Java processes are running on the Spark nodes and are listening on the 
{{SPARKR_WORKER_PORT}}.

{{result <- dapply(df, function(x){x}, SparkR::schema(df))}}

I took a jstack of the Java process and see that it is just listening on the 
socket but never seems to make any progress. The R process is harder to debug 
what it is doing.
{code}
Thread 112823: (state = IN_NATIVE)
 - java.net.SocketInputStream.socketRead0(java.io.FileDescriptor, byte[], int, 
int, int) @bci=0 (Interpreted frame)
 - java.net.SocketInputStream.socketRead(java.io.FileDescriptor, byte[], int, 
int, int) @bci=8, line=116 (Interpreted frame)
 - java.net.SocketInputStream.read(byte[], int, int, int) @bci=79, line=170 
(Interpreted frame)
 - java.net.SocketInputStream.read(byte[], int, int) @bci=11, line=141 
(Interpreted frame)
 - java.io.BufferedInputStream.fill() @bci=214, line=246 (Interpreted frame)
 - java.io.BufferedInputStream.read() @bci=12, line=265 (Compiled frame)
 - java.io.DataInputStream.readInt() @bci=4, line=387 (Compiled frame)
 - org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read() 
@bci=4, line=212 (Interpreted frame)
 - 
org.apache.spark.api.r.RRunner$$anon$1.(org.apache.spark.api.r.RRunner) 
@bci=25, line=96 (Interpreted frame)
 - org.apache.spark.api.r.RRunner.compute(scala.collection.Iterator, int) 
@bci=109, line=87 (Interpreted frame)
 - 
org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(scala.collection.Iterator)
 @bci=322, line=59 (Interpreted frame)
 - 
org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(java.lang.Object) 
@bci=5, line=29 (Interpreted frame)
 - 
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(scala.collection.Iterator)
 @bci=59, line=178 (Interpreted frame)
 - 
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(java.lang.Object)
 @bci=5, line=175 (Interpreted frame)
 - 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(org.apache.spark.TaskContext,
 int, scala.collection.Iterator) @bci=8, line=784 (Interpreted frame)
 - 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(java.lang.Object,
 java.lang.Object, java.lang.Object) @bci=13, line=784 (Interpreted frame)
 - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=27, line=38 (Interpreted frame)
 - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
 - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
 - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
 - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
 - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
 - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=24, line=38 (Interpreted frame)
 - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=26, line=319 (Interpreted frame)
 - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, 
org.apache.spark.TaskContext) @bci=33, line=283 (Interpreted frame)
 - 
org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) 
@bci=168, line=79 (Interpreted frame)
 - 
org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) 
@bci=2, line=47 (Interpreted frame)
 - org.apache.spark.scheduler.Task.run(long, int, 
org.apache.spark.metrics.MetricsSystem) @bci=82, line=85 (Interpreted frame)
 - org.apache.spark.executor.Executor$TaskRunner.run() @bci=374, line=274 
(Interpreted frame)
 - 

[jira] [Created] (SPARK-17608) Long type has incorrect serialization/deserialization

2016-09-20 Thread Thomas Powell (JIRA)
Thomas Powell created SPARK-17608:
-

 Summary: Long type has incorrect serialization/deserialization
 Key: SPARK-17608
 URL: https://issues.apache.org/jira/browse/SPARK-17608
 Project: Spark
  Issue Type: Bug
  Components: SparkR
Affects Versions: 2.0.0
Reporter: Thomas Powell


Am hitting issues when using {{dapply}} on a data frame that contains a 
{{bigint}} in its schema. When this is converted to a SparkR data frame a 
"bigint" gets converted to a R {{numeric}} type: 
https://github.com/apache/spark/blob/master/R/pkg/R/types.R#L25.

However, the R {{numeric}} type gets converted to 
{{org.apache.spark.sql.types.DoubleType}}: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L97.

The two directions therefore aren't compatible. If I use the same schema when 
using dapply (and just an identity function) I will get type collisions because 
the output type is a double but the schema expects a bigint. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org