spark git commit: [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy

2018-01-03 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 f51c8fde8 -> 1860a43e9


[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, 
partitionBy

## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung 

Closes #20129 from felixcheung/rwater.

(cherry picked from commit df95a908baf78800556636a76d58bba9b3dd943f)
Signed-off-by: Felix Cheung 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1860a43e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1860a43e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1860a43e

Branch: refs/heads/branch-2.3
Commit: 1860a43e9affb7619be0a5a1c786e264d09bc446
Parents: f51c8fd
Author: Felix Cheung 
Authored: Wed Jan 3 21:43:14 2018 -0800
Committer: Felix Cheung 
Committed: Wed Jan 3 21:43:33 2018 -0800

--
 R/pkg/NAMESPACE |   1 +
 R/pkg/R/DataFrame.R |  96 -
 R/pkg/R/SQLContext.R|   4 +-
 R/pkg/R/generics.R  |   6 ++
 R/pkg/tests/fulltests/test_streaming.R  | 107 +++
 python/pyspark/sql/streaming.py |   4 +
 .../sql/execution/streaming/Triggers.scala  |   2 +-
 7 files changed, 214 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1860a43e/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 3219c6f..c51eb0f 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -179,6 +179,7 @@ exportMethods("arrange",
   "with",
   "withColumn",
   "withColumnRenamed",
+  "withWatermark",
   "write.df",
   "write.jdbc",
   "write.json",

http://git-wip-us.apache.org/repos/asf/spark/blob/1860a43e/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index fe238f6..9956f7e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3661,7 +3661,8 @@ setMethod("getNumPartitions",
 #' isStreaming
 #'
 #' Returns TRUE if this SparkDataFrame contains one or more sources that 
continuously return data
-#' as it arrives.
+#' as it arrives. A dataset that reads data from a streaming source must be 
executed as a
+#' \code{StreamingQuery} using \code{write.stream}.
 #'
 #' @param x A SparkDataFrame
 #' @return TRUE if this SparkDataFrame is from a streaming source
@@ -3707,7 +3708,17 @@ setMethod("isStreaming",
 #' @param df a streaming SparkDataFrame.
 #' @param source a name for external data source.
 #' @param outputMode one of 'append', 'complete', 'update'.
-#' @param ... additional argument(s) passed to the method.
+#' @param partitionBy a name or a list of names of columns to partition the 
output by on the file
+#'system. If specified, the output is laid out on the file system 
similar to Hive's
+#'partitioning scheme.
+#' @param trigger.processingTime a processing time interval as a string, e.g. 
'5 seconds',
+#''1 minute'. This is a trigger that runs a query periodically based 
on the processing
+#'time. If value is '0 seconds', the query will run as fast as 
possible, this is the
+#'default. Only one trigger can be set.
+#' @param trigger.once a logical, must be set to \code{TRUE}. This is a 
trigger that processes only
+#'one batch of data in a streaming query then terminates the query. 
Only one trigger can be
+#'set.
+#' @param ... additional external data source specific named options.
 #'
 #' @family SparkDataFrame functions
 #' @seealso \link{read.stream}
@@ -3725,7 +3736,8 @@ setMethod("isStreaming",
 #' # console
 #' q <- write.stream(wordCounts, "console", outputMode = "complete")
 #' # text stream
-#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = 
"/home/user/cp")
+#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = 
"/home/user/cp"
+#'   partitionBy = c("year", "month"), trigger.processingTime 
= "30 seconds")
 #' # memory stream
 #' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = 
"complete")
 #' head(sql("SELECT * from outs"))
@@ -3737,7 +3749,8 @@ setMethod("isStreaming",
 #' @note experimental
 setMethod("write.stream",
   signature(df = "SparkDataFrame"),
-  function(df, source = NULL, outputMode = NULL, ...) {
+  function(df, source = NULL, 

spark git commit: [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy

2018-01-03 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 7d045c5f0 -> df95a908b


[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, 
partitionBy

## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung 

Closes #20129 from felixcheung/rwater.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df95a908
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df95a908
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df95a908

Branch: refs/heads/master
Commit: df95a908baf78800556636a76d58bba9b3dd943f
Parents: 7d045c5
Author: Felix Cheung 
Authored: Wed Jan 3 21:43:14 2018 -0800
Committer: Felix Cheung 
Committed: Wed Jan 3 21:43:14 2018 -0800

--
 R/pkg/NAMESPACE |   1 +
 R/pkg/R/DataFrame.R |  96 -
 R/pkg/R/SQLContext.R|   4 +-
 R/pkg/R/generics.R  |   6 ++
 R/pkg/tests/fulltests/test_streaming.R  | 107 +++
 python/pyspark/sql/streaming.py |   4 +
 .../sql/execution/streaming/Triggers.scala  |   2 +-
 7 files changed, 214 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 3219c6f..c51eb0f 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -179,6 +179,7 @@ exportMethods("arrange",
   "with",
   "withColumn",
   "withColumnRenamed",
+  "withWatermark",
   "write.df",
   "write.jdbc",
   "write.json",

http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index fe238f6..9956f7e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3661,7 +3661,8 @@ setMethod("getNumPartitions",
 #' isStreaming
 #'
 #' Returns TRUE if this SparkDataFrame contains one or more sources that 
continuously return data
-#' as it arrives.
+#' as it arrives. A dataset that reads data from a streaming source must be 
executed as a
+#' \code{StreamingQuery} using \code{write.stream}.
 #'
 #' @param x A SparkDataFrame
 #' @return TRUE if this SparkDataFrame is from a streaming source
@@ -3707,7 +3708,17 @@ setMethod("isStreaming",
 #' @param df a streaming SparkDataFrame.
 #' @param source a name for external data source.
 #' @param outputMode one of 'append', 'complete', 'update'.
-#' @param ... additional argument(s) passed to the method.
+#' @param partitionBy a name or a list of names of columns to partition the 
output by on the file
+#'system. If specified, the output is laid out on the file system 
similar to Hive's
+#'partitioning scheme.
+#' @param trigger.processingTime a processing time interval as a string, e.g. 
'5 seconds',
+#''1 minute'. This is a trigger that runs a query periodically based 
on the processing
+#'time. If value is '0 seconds', the query will run as fast as 
possible, this is the
+#'default. Only one trigger can be set.
+#' @param trigger.once a logical, must be set to \code{TRUE}. This is a 
trigger that processes only
+#'one batch of data in a streaming query then terminates the query. 
Only one trigger can be
+#'set.
+#' @param ... additional external data source specific named options.
 #'
 #' @family SparkDataFrame functions
 #' @seealso \link{read.stream}
@@ -3725,7 +3736,8 @@ setMethod("isStreaming",
 #' # console
 #' q <- write.stream(wordCounts, "console", outputMode = "complete")
 #' # text stream
-#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = 
"/home/user/cp")
+#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = 
"/home/user/cp"
+#'   partitionBy = c("year", "month"), trigger.processingTime 
= "30 seconds")
 #' # memory stream
 #' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = 
"complete")
 #' head(sql("SELECT * from outs"))
@@ -3737,7 +3749,8 @@ setMethod("isStreaming",
 #' @note experimental
 setMethod("write.stream",
   signature(df = "SparkDataFrame"),
-  function(df, source = NULL, outputMode = NULL, ...) {
+  function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
+   trigger.processingTime = NULL, trigger.once = NULL, ...) {
 if