[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16009783#comment-16009783 ]
Wes McKinney edited comment on SPARK-18924 at 5/14/17 4:00 PM: --------------------------------------------------------------- I can also help with this, particularly on the C++ side. I was talking with [~hadley] as well about this some time back. By using common Scala code for serialization in Spark and C++ code for R and Python, this will enable us to unify efforts on IO performance optimization. From Hadley's C++ implementation it should be possible to bootstrap (without too much effort) a reader for the Arrow file and streaming formats. was (Author: wesmckinn): I can also with this, particularly on the C++ side. I was talking with [~hadley] as well about this some time back. By using common Scala code for serialization in Spark and C++ code for R and Python, this will enable us to unify efforts on IO performance optimization. From Hadley's C++ implementation it should be possible to bootstrap (without too much effort) a reader for the Arrow file and streaming formats. > Improve collect/createDataFrame performance in SparkR > ----------------------------------------------------- > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR > Reporter: Xiangrui Meng > Assignee: Xiangrui Meng > Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(1000000))))) > user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(10000000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) > user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 10000000)) > user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org