Repository: spark Updated Branches: refs/heads/master 4efdc764e -> e21e1c946
[SPARK-18013][SPARKR] add crossJoin API ## What changes were proposed in this pull request? Add crossJoin and do not default to cross join if joinExpr is left out ## How was this patch tested? unit test Author: Felix Cheung <felixcheun...@hotmail.com> Closes #15559 from felixcheung/rcrossjoin. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e21e1c94 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e21e1c94 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e21e1c94 Branch: refs/heads/master Commit: e21e1c946c4b7448fb150cfa2d9419864ae6f9b5 Parents: 4efdc76 Author: Felix Cheung <felixcheun...@hotmail.com> Authored: Fri Oct 21 12:35:37 2016 -0700 Committer: Felix Cheung <felixche...@apache.org> Committed: Fri Oct 21 12:35:37 2016 -0700 ---------------------------------------------------------------------- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 59 ++++++++++++++++++++------ R/pkg/R/generics.R | 4 ++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 ++++- docs/sparkr.md | 4 ++ 5 files changed, 64 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/NAMESPACE ---------------------------------------------------------------------- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 5960c62..8718185 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -71,6 +71,7 @@ exportMethods("arrange", "covar_samp", "covar_pop", "createOrReplaceTempView", + "crossJoin", "crosstab", "dapply", "dapplyCollect", http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/R/DataFrame.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 801d2ed..8910a4b 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2271,12 +2271,13 @@ setMethod("dropDuplicates", #' Join #' -#' Join two SparkDataFrames based on the given join expression. +#' Joins two SparkDataFrames based on the given join expression. #' #' @param x A SparkDataFrame #' @param y A SparkDataFrame #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a -#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join +#' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is +#' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead. #' @param joinType The type of join to perform. The following join types are available: #' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left', #' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner". @@ -2285,23 +2286,24 @@ setMethod("dropDuplicates", #' @aliases join,SparkDataFrame,SparkDataFrame-method #' @rdname join #' @name join -#' @seealso \link{merge} +#' @seealso \link{merge} \link{crossJoin} #' @export #' @examples #'\dontrun{ #' sparkR.session() #' df1 <- read.json(path) #' df2 <- read.json(path2) -#' join(df1, df2) # Performs a Cartesian #' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression #' join(df1, df2, df1$col1 == df2$col2, "right_outer") +#' join(df1, df2) # Attempts an inner join #' } #' @note join since 1.4.0 setMethod("join", signature(x = "SparkDataFrame", y = "SparkDataFrame"), function(x, y, joinExpr = NULL, joinType = NULL) { if (is.null(joinExpr)) { - sdf <- callJMethod(x@sdf, "crossJoin", y@sdf) + # this may not fail until the planner checks for Cartesian join later on. + sdf <- callJMethod(x@sdf, "join", y@sdf) } else { if (class(joinExpr) != "Column") stop("joinExpr must be a Column") if (is.null(joinType)) { @@ -2322,22 +2324,52 @@ setMethod("join", dataFrame(sdf) }) +#' CrossJoin +#' +#' Returns Cartesian Product on two SparkDataFrames. +#' +#' @param x A SparkDataFrame +#' @param y A SparkDataFrame +#' @return A SparkDataFrame containing the result of the join operation. +#' @family SparkDataFrame functions +#' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method +#' @rdname crossJoin +#' @name crossJoin +#' @seealso \link{merge} \link{join} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' crossJoin(df1, df2) # Performs a Cartesian +#' } +#' @note crossJoin since 2.1.0 +setMethod("crossJoin", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), + function(x, y) { + sdf <- callJMethod(x@sdf, "crossJoin", y@sdf) + dataFrame(sdf) + }) + #' Merges two data frames #' #' @name merge -#' @param x the first data frame to be joined -#' @param y the second data frame to be joined +#' @param x the first data frame to be joined. +#' @param y the second data frame to be joined. #' @param by a character vector specifying the join columns. If by is not #' specified, the common column names in \code{x} and \code{y} will be used. +#' If by or both by.x and by.y are explicitly set to NULL or of length 0, the Cartesian +#' Product of x and y will be returned. #' @param by.x a character vector specifying the joining columns for x. #' @param by.y a character vector specifying the joining columns for y. #' @param all a boolean value setting \code{all.x} and \code{all.y} #' if any of them are unset. #' @param all.x a boolean value indicating whether all the rows in x should -#' be including in the join +#' be including in the join. #' @param all.y a boolean value indicating whether all the rows in y should -#' be including in the join -#' @param sort a logical argument indicating whether the resulting columns should be sorted +#' be including in the join. +#' @param sort a logical argument indicating whether the resulting columns should be sorted. #' @param suffixes a string vector of length 2 used to make colnames of #' \code{x} and \code{y} unique. #' The first element is appended to each colname of \code{x}. @@ -2351,20 +2383,21 @@ setMethod("join", #' @family SparkDataFrame functions #' @aliases merge,SparkDataFrame,SparkDataFrame-method #' @rdname merge -#' @seealso \link{join} +#' @seealso \link{join} \link{crossJoin} #' @export #' @examples #'\dontrun{ #' sparkR.session() #' df1 <- read.json(path) #' df2 <- read.json(path2) -#' merge(df1, df2) # Performs a Cartesian +#' merge(df1, df2) # Performs an inner join by common columns #' merge(df1, df2, by = "col1") # Performs an inner join based on expression #' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE) #' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE) #' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE) #' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE) #' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y")) +#' merge(df1, df2, by = NULL) # Performs a Cartesian join #' } #' @note merge since 1.5.0 setMethod("merge", @@ -2401,7 +2434,7 @@ setMethod("merge", joinY <- by } else { # if by or both by.x and by.y have length 0, use Cartesian Product - joinRes <- join(x, y) + joinRes <- crossJoin(x, y) return (joinRes) } http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 810aea9..5549cd7 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -468,6 +468,10 @@ setGeneric("createOrReplaceTempView", standardGeneric("createOrReplaceTempView") }) +# @rdname crossJoin +# @export +setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") }) + #' @rdname dapply #' @export setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") }) http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/inst/tests/testthat/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 1c80686..3a987cd 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1572,7 +1572,7 @@ test_that("filter() on a DataFrame", { #expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint }) -test_that("join() and merge() on a DataFrame", { +test_that("join(), crossJoin() and merge() on a DataFrame", { df <- read.json(jsonPath) mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}", @@ -1583,7 +1583,14 @@ test_that("join() and merge() on a DataFrame", { writeLines(mockLines2, jsonPath2) df2 <- read.json(jsonPath2) - joined <- join(df, df2) + # inner join, not cartesian join + expect_equal(count(where(join(df, df2), df$name == df2$name)), 3) + # cartesian join + expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }), + paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for", + " INNER join between logical plans).*")) + + joined <- crossJoin(df, df2) expect_equal(names(joined), c("age", "name", "name", "test")) expect_equal(count(joined), 12) expect_equal(names(collect(joined)), c("age", "name", "name", "test")) http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/docs/sparkr.md ---------------------------------------------------------------------- diff --git a/docs/sparkr.md b/docs/sparkr.md index 340e7f7..c1829ef 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -591,3 +591,7 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma - The method `registerTempTable` has been deprecated to be replaced by `createOrReplaceTempView`. - The method `dropTempTable` has been deprecated to be replaced by `dropTempView`. - The `sc` SparkContext parameter is no longer required for these functions: `setJobGroup`, `clearJobGroup`, `cancelJobGroup` + +## Upgrading to SparkR 2.1.0 + + - `join` no longer performs Cartesian Product by default, use `crossJoin` instead. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org