Repository: spark
Updated Branches:
  refs/heads/master c1ffb3c10 -> 162326c0e


[SPARK-25117][R] Add EXEPT ALL and INTERSECT ALL support in R

## What changes were proposed in this pull request?
[SPARK-21274](https://issues.apache.org/jira/browse/SPARK-21274) added support 
for EXCEPT ALL and INTERSECT ALL. This PR adds the support in R.

## How was this patch tested?
Added test in test_sparkSQL.R

Author: Dilip Biswal <dbis...@us.ibm.com>

Closes #22107 from dilipbiswal/SPARK-25117.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/162326c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/162326c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/162326c0

Branch: refs/heads/master
Commit: 162326c0ee8419083ebd1669796abd234773e9b6
Parents: c1ffb3c
Author: Dilip Biswal <dbis...@us.ibm.com>
Authored: Fri Aug 17 00:04:04 2018 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Fri Aug 17 00:04:04 2018 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                       |  2 +
 R/pkg/R/DataFrame.R                   | 59 +++++++++++++++++++++++++++++-
 R/pkg/R/generics.R                    |  6 +++
 R/pkg/tests/fulltests/test_sparkSQL.R | 19 ++++++++++
 4 files changed, 85 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/162326c0/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index adfd387..0fd0848 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -117,6 +117,7 @@ exportMethods("arrange",
               "dropna",
               "dtypes",
               "except",
+              "exceptAll",
               "explain",
               "fillna",
               "filter",
@@ -131,6 +132,7 @@ exportMethods("arrange",
               "hint",
               "insertInto",
               "intersect",
+              "intersectAll",
               "isLocal",
               "isStreaming",
               "join",

http://git-wip-us.apache.org/repos/asf/spark/blob/162326c0/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 471ada1..4f2d4c7 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2848,6 +2848,35 @@ setMethod("intersect",
             dataFrame(intersected)
           })
 
+#' intersectAll
+#'
+#' Return a new SparkDataFrame containing rows in both this SparkDataFrame
+#' and another SparkDataFrame while preserving the duplicates.
+#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in
+#' SQL, this function resolves columns by position (not by name).
+#'
+#' @param x a SparkDataFrame.
+#' @param y a SparkDataFrame.
+#' @return A SparkDataFrame containing the result of the intersect all 
operation.
+#' @family SparkDataFrame functions
+#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method
+#' @rdname intersectAll
+#' @name intersectAll
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df1 <- read.json(path)
+#' df2 <- read.json(path2)
+#' intersectAllDF <- intersectAll(df1, df2)
+#' }
+#' @note intersectAll since 2.4.0
+setMethod("intersectAll",
+          signature(x = "SparkDataFrame", y = "SparkDataFrame"),
+          function(x, y) {
+            intersected <- callJMethod(x@sdf, "intersectAll", y@sdf)
+            dataFrame(intersected)
+          })
+
 #' except
 #'
 #' Return a new SparkDataFrame containing rows in this SparkDataFrame
@@ -2867,7 +2896,6 @@ setMethod("intersect",
 #' df2 <- read.json(path2)
 #' exceptDF <- except(df, df2)
 #' }
-#' @rdname except
 #' @note except since 1.4.0
 setMethod("except",
           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
@@ -2876,6 +2904,35 @@ setMethod("except",
             dataFrame(excepted)
           })
 
+#' exceptAll
+#'
+#' Return a new SparkDataFrame containing rows in this SparkDataFrame
+#' but not in another SparkDataFrame while preserving the duplicates.
+#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in
+#' SQL, this function resolves columns by position (not by name).
+#'
+#' @param x a SparkDataFrame.
+#' @param y a SparkDataFrame.
+#' @return A SparkDataFrame containing the result of the except all operation.
+#' @family SparkDataFrame functions
+#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method
+#' @rdname exceptAll
+#' @name exceptAll
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df1 <- read.json(path)
+#' df2 <- read.json(path2)
+#' exceptAllDF <- exceptAll(df1, df2)
+#' }
+#' @note exceptAll since 2.4.0
+setMethod("exceptAll",
+          signature(x = "SparkDataFrame", y = "SparkDataFrame"),
+          function(x, y) {
+            excepted <- callJMethod(x@sdf, "exceptAll", y@sdf)
+            dataFrame(excepted)
+          })
+
 #' Save the contents of SparkDataFrame to a data source.
 #'
 #' The data source is specified by the \code{source} and a set of options 
(...).

http://git-wip-us.apache.org/repos/asf/spark/blob/162326c0/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 4a7210b..f6f1849 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -471,6 +471,9 @@ setGeneric("explain", function(x, ...) { 
standardGeneric("explain") })
 #' @rdname except
 setGeneric("except", function(x, y) { standardGeneric("except") })
 
+#' @rdname exceptAll
+setGeneric("exceptAll", function(x, y) { standardGeneric("exceptAll") })
+
 #' @rdname nafunctions
 setGeneric("fillna", function(x, value, cols = NULL) { 
standardGeneric("fillna") })
 
@@ -495,6 +498,9 @@ setGeneric("insertInto", function(x, tableName, ...) { 
standardGeneric("insertIn
 #' @rdname intersect
 setGeneric("intersect", function(x, y) { standardGeneric("intersect") })
 
+#' @rdname intersectAll
+setGeneric("intersectAll", function(x, y) { standardGeneric("intersectAll") })
+
 #' @rdname isLocal
 setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/162326c0/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index adcbbff..bff6e35 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2482,6 +2482,25 @@ test_that("union(), unionByName(), rbind(), except(), 
and intersect() on a DataF
   unlink(jsonPath2)
 })
 
+test_that("intersectAll() and exceptAll()", {
+  df1 <- createDataFrame(list(list("a", 1), list("a", 1), list("a", 1),
+                              list("a", 1), list("b", 3), list("c", 4)),
+                         schema = c("a", "b"))
+  df2 <- createDataFrame(list(list("a", 1), list("a", 1), list("b", 3)), 
schema = c("a", "b"))
+  intersectAllExpected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3),
+                                       stringsAsFactors = FALSE)
+  exceptAllExpected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4),
+                                    stringsAsFactors = FALSE)
+  intersectAllDf <- arrange(intersectAll(df1, df2), df1$a)
+  expect_is(intersectAllDf, "SparkDataFrame")
+  exceptAllDf <- arrange(exceptAll(df1, df2), df1$a)
+  expect_is(exceptAllDf, "SparkDataFrame")
+  intersectAllActual <- collect(intersectAllDf)
+  expect_identical(intersectAllActual, intersectAllExpected)
+  exceptAllActual <- collect(exceptAllDf)
+  expect_identical(exceptAllActual, exceptAllExpected)
+})
+
 test_that("withColumn() and withColumnRenamed()", {
   df <- read.json(jsonPath)
   newDF <- withColumn(df, "newAge", df$age + 2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to