[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2019-02-14 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768054#comment-16768054
 ] 

Hyukjin Kwon commented on SPARK-18924:
--

Oops, sorry guys. I just found this. I made a PR via SPARK-26762. Let me 
resolve this one as a duplicate of that.

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-05-14 Thread Wes McKinney (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009818#comment-16009818
 ] 

Wes McKinney commented on SPARK-18924:
--

[~deanchen] once R bindings for Arrow are in ship shape, then adding native 
R-Parquet support comes basically for free. We're handling the Parquet reads 
for Python completely in C++, so you would do the same for R in the Rcpp 
binding layer. 

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-05-14 Thread Dean Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009786#comment-16009786
 ] 

Dean Chen commented on SPARK-18924:
---

Great, we've been following the effort on 
https://github.com/apache/spark/pull/15821 and was hoping someone would tag on 
the R integration. 

Can the ArrowConverters logic that converts Spark InternalRows to an 
ArrowPayload be reused for the R interface as a quick start?  [~wesmckinn] 
would love to be able to replace our parquet with Arrow so there is the option 
to read data out of the data warehouse natively from R and Python without 
having to go through Spark SQL. That sounds like a longer term effort right? 

Would be happy to contribute on the Spark/Scala and R side to help speed up 
both efforts(don't have much internal bandwidth on the C++ side).

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-05-14 Thread Wes McKinney (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009783#comment-16009783
 ] 

Wes McKinney commented on SPARK-18924:
--

I can also with this, particularly on the C++ side. I was talking with 
[~hadley] as well about this some time back. By using common Scala code for 
serialization in Spark and C++ code for R and Python, this will enable us to 
unify efforts on IO performance optimization. From Hadley's C++ implementation 
it should be possible to bootstrap (without too much effort) a reader for the 
Arrow file and streaming formats.

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-05-14 Thread Uwe L. Korn (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009771#comment-16009771
 ] 

Uwe L. Korn commented on SPARK-18924:
-

Note that there is already work on the Scala<->Python bridge over at 
https://github.com/apache/spark/pull/15821 / 
https://issues.apache.org/jira/browse/SPARK-13534 using Apache Arrow that 
solves the same problem. They have seen over 40x speedup. While there are no 
official Apache Arrow bindings for R yet, as part of 
https://github.com/wesm/feather a first version of them was created which 
should be a really good starting point.

cc [~wesmckinn]

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-05-14 Thread Dean Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009760#comment-16009760
 ] 

Dean Chen commented on SPARK-18924:
---

[~mengxr] The collect performance in SparkR is a huge pain point for us, 
anything we can do to help with this effort? 

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-02-08 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858963#comment-15858963
 ] 

Xiangrui Meng commented on SPARK-18924:
---

I'm going to work on this one. So removed myself from "shepherd".

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-01-17 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15826996#comment-15826996
 ] 

Joseph K. Bradley commented on SPARK-18924:
---

Per the 2.2 roadmap process, I'm going to add [~mengxr] as the shepherd, but 
others here are free to take that role instead.

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2016-12-22 Thread Shivaram Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15771602#comment-15771602
 ] 

Shivaram Venkataraman commented on SPARK-18924:
---

Yeah I think we could convert this JIRA into a few sub-tasks - the first one 
could be profiling some of the existing code to get a breakdown of how much 
time is spent where. The next one could be the JVM side changes like boxing / 
unboxing improvements etc. 

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2016-12-19 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15763165#comment-15763165
 ] 

Felix Cheung commented on SPARK-18924:
--

Thank you for bring this up. JVM<->Java performance has been reported a few 
times and definitely something I have been tracking but I didn't get around to.

I don't think rJava would work since it is GPLv2 licensed. Rcpp is also 
GPLv2/v3.

Strategically placed calls to C might be a way to go (cross-platform 
complications aside)? That seems to be the approach for a lot of R packages. I 
recall we have a JIRA on performance tests, do we have more break down of the 
time spent?


> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2016-12-19 Thread Hossein Falaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15762687#comment-15762687
 ] 

Hossein Falaki commented on SPARK-18924:


Would be good to think about this along with the efforts to have zero-copy data 
sharing between JVM and R. I think if we do that, a lot of the Ser/De problems 
in the data plane will go away.

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2016-12-19 Thread Shivaram Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15762548#comment-15762548
 ] 

Shivaram Venkataraman commented on SPARK-18924:
---

This is a good thing to investigate - Just to provide some historical context, 
the functions in serialize.R and deserialize.R were primarily designed to 
enable functions between the JVM and R and the serialization performance is 
less critical there as its mostly just function names, arguments etc. 

For data path we were originally using R's own serializer, deserializer but 
that doesn't work if we want to parse the data in JVM. So the whole dfToCols 
was a retro-fit to make things work.

In terms of design options:
- I think removing the boxing / unboxing overheads and making `readIntArray` or 
`readStringArray` in R more efficient would be a good starting point
- In terms of using other packages - there are licensing questions and also 
usability questions. So far users mostly don't require any extra R package to 
use SparkR and hence we are compatible across a bunch of R versions etc. So I 
think we should first look at the points about how we can make our existing 
architecture better
- If the bottleneck is due to R function call overheads after the above changes 
we can explore writing a C module (similar to our old hashCode implementation). 
While this has lesser complications in terms of licensing, versions matches 
etc. -  there is still some complexity on how we build and distribute this in a 
binary package.

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But 

[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2016-12-18 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760439#comment-15760439
 ] 

Xiangrui Meng commented on SPARK-18924:
---

cc: [~shivaram] [~felixcheung] [~falaki] [~yanboliang] for discussion.

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SeDe|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * [serialize|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round-trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collect rows to local  and then construct columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null values 
> properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org