[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768054#comment-16768054 ] Hyukjin Kwon commented on SPARK-18924: -- Oops, sorry guys. I just found this. I made a PR via SPARK-26762. Let me resolve this one as a duplicate of that. > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009818#comment-16009818 ] Wes McKinney commented on SPARK-18924: -- [~deanchen] once R bindings for Arrow are in ship shape, then adding native R-Parquet support comes basically for free. We're handling the Parquet reads for Python completely in C++, so you would do the same for R in the Rcpp binding layer. > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009786#comment-16009786 ] Dean Chen commented on SPARK-18924: --- Great, we've been following the effort on https://github.com/apache/spark/pull/15821 and was hoping someone would tag on the R integration. Can the ArrowConverters logic that converts Spark InternalRows to an ArrowPayload be reused for the R interface as a quick start? [~wesmckinn] would love to be able to replace our parquet with Arrow so there is the option to read data out of the data warehouse natively from R and Python without having to go through Spark SQL. That sounds like a longer term effort right? Would be happy to contribute on the Spark/Scala and R side to help speed up both efforts(don't have much internal bandwidth on the C++ side). > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009783#comment-16009783 ] Wes McKinney commented on SPARK-18924: -- I can also with this, particularly on the C++ side. I was talking with [~hadley] as well about this some time back. By using common Scala code for serialization in Spark and C++ code for R and Python, this will enable us to unify efforts on IO performance optimization. From Hadley's C++ implementation it should be possible to bootstrap (without too much effort) a reader for the Arrow file and streaming formats. > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009771#comment-16009771 ] Uwe L. Korn commented on SPARK-18924: - Note that there is already work on the Scala<->Python bridge over at https://github.com/apache/spark/pull/15821 / https://issues.apache.org/jira/browse/SPARK-13534 using Apache Arrow that solves the same problem. They have seen over 40x speedup. While there are no official Apache Arrow bindings for R yet, as part of https://github.com/wesm/feather a first version of them was created which should be a really good starting point. cc [~wesmckinn] > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009760#comment-16009760 ] Dean Chen commented on SPARK-18924: --- [~mengxr] The collect performance in SparkR is a huge pain point for us, anything we can do to help with this effort? > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858963#comment-15858963 ] Xiangrui Meng commented on SPARK-18924: --- I'm going to work on this one. So removed myself from "shepherd". > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15826996#comment-15826996 ] Joseph K. Bradley commented on SPARK-18924: --- Per the 2.2 roadmap process, I'm going to add [~mengxr] as the shepherd, but others here are free to take that role instead. > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15771602#comment-15771602 ] Shivaram Venkataraman commented on SPARK-18924: --- Yeah I think we could convert this JIRA into a few sub-tasks - the first one could be profiling some of the existing code to get a breakdown of how much time is spent where. The next one could be the JVM side changes like boxing / unboxing improvements etc. > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15763165#comment-15763165 ] Felix Cheung commented on SPARK-18924: -- Thank you for bring this up. JVM<->Java performance has been reported a few times and definitely something I have been tracking but I didn't get around to. I don't think rJava would work since it is GPLv2 licensed. Rcpp is also GPLv2/v3. Strategically placed calls to C might be a way to go (cross-platform complications aside)? That seems to be the approach for a lot of R packages. I recall we have a JIRA on performance tests, do we have more break down of the time spent? > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15762687#comment-15762687 ] Hossein Falaki commented on SPARK-18924: Would be good to think about this along with the efforts to have zero-copy data sharing between JVM and R. I think if we do that, a lot of the Ser/De problems in the data plane will go away. > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15762548#comment-15762548 ] Shivaram Venkataraman commented on SPARK-18924: --- This is a good thing to investigate - Just to provide some historical context, the functions in serialize.R and deserialize.R were primarily designed to enable functions between the JVM and R and the serialization performance is less critical there as its mostly just function names, arguments etc. For data path we were originally using R's own serializer, deserializer but that doesn't work if we want to parse the data in JVM. So the whole dfToCols was a retro-fit to make things work. In terms of design options: - I think removing the boxing / unboxing overheads and making `readIntArray` or `readStringArray` in R more efficient would be a good starting point - In terms of using other packages - there are licensing questions and also usability questions. So far users mostly don't require any extra R package to use SparkR and hence we are compatible across a bunch of R versions etc. So I think we should first look at the points about how we can make our existing architecture better - If the bottleneck is due to R function call overheads after the above changes we can explore writing a C module (similar to our old hashCode implementation). While this has lesser complications in terms of licensing, versions matches etc. - there is still some complexity on how we build and distribute this in a binary package. > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * > [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collects rows to local and then constructs columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null/NA > values properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But
[jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
[ https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760439#comment-15760439 ] Xiangrui Meng commented on SPARK-18924: --- cc: [~shivaram] [~felixcheung] [~falaki] [~yanboliang] for discussion. > Improve collect/createDataFrame performance in SparkR > - > > Key: SPARK-18924 > URL: https://issues.apache.org/jira/browse/SPARK-18924 > Project: Spark > Issue Type: Improvement > Components: SparkR >Reporter: Xiangrui Meng >Priority: Critical > > SparkR has its own SerDe for data serialization between JVM and R. > The SerDe on the JVM side is implemented in: > * > [SeDe|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala] > * > [SQLUtils|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala] > The SerDe on the R side is implemented in: > * > [deserialize|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R] > * [serialize|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R] > The serialization between JVM and R suffers from huge storage and computation > overhead. For example, a short round-trip of 1 million doubles surprisingly > took 3 minutes on my laptop: > {code} > > system.time(collect(createDataFrame(data.frame(x=runif(100) >user system elapsed > 14.224 0.582 189.135 > {code} > Collecting a medium-sized DataFrame to local and continuing with a local R > workflow is a use case we should pay attention to. SparkR will never be able > to cover all existing features from CRAN packages. It is also unnecessary for > Spark to do so because not all features need scalability. > Several factors contribute to the serialization overhead: > 1. The SerDe in R side is implemented using high-level R methods. > 2. DataFrame columns are not efficiently serialized, primitive type columns > in particular. > 3. Some overhead in the serialization protocol/impl. > 1) might be discussed before because R packages like rJava exist before > SparkR. I'm not sure whether we have a license issue in depending on those > libraries. Another option is to switch to low-level R'C interface or Rcpp, > which again might have license issue. I'm not an expert here. If we have to > implement our own, there still exist much space for improvement, discussed > below. > 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, > which collect rows to local and then construct columns. However, > * it ignores column types and results boxing/unboxing overhead > * it collects all objects to driver and results high GC pressure > A relatively simple change is to implement specialized column builder based > on column types, primitive types in particular. We need to handle null values > properly. A simple data structure we can use is > {code} > val size: Int > val nullIndexes: Array[Int] > val notNullValues: Array[T] // specialized for primitive types > {code} > On the R side, we can use `readBin` and `writeBin` to read the entire vector > in a single method call. The speed seems reasonable (at the order of GB/s): > {code} > > x <- runif(1000) # 1e7, not 1e6 > > system.time(r <- writeBin(x, raw(0))) >user system elapsed > 0.036 0.021 0.059 > > > system.time(y <- readBin(r, double(), 1000)) >user system elapsed > 0.015 0.007 0.024 > {code} > This is just a proposal that needs to be discussed and formalized. But in > general, it should be feasible to obtain 20x or more performance gain. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org