spark git commit: [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy
Repository: spark Updated Branches: refs/heads/branch-2.3 f51c8fde8 -> 1860a43e9 [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy ## What changes were proposed in this pull request? R Structured Streaming API for withWatermark, trigger, partitionBy ## How was this patch tested? manual, unit tests Author: Felix CheungCloses #20129 from felixcheung/rwater. (cherry picked from commit df95a908baf78800556636a76d58bba9b3dd943f) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1860a43e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1860a43e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1860a43e Branch: refs/heads/branch-2.3 Commit: 1860a43e9affb7619be0a5a1c786e264d09bc446 Parents: f51c8fd Author: Felix Cheung Authored: Wed Jan 3 21:43:14 2018 -0800 Committer: Felix Cheung Committed: Wed Jan 3 21:43:33 2018 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 96 - R/pkg/R/SQLContext.R| 4 +- R/pkg/R/generics.R | 6 ++ R/pkg/tests/fulltests/test_streaming.R | 107 +++ python/pyspark/sql/streaming.py | 4 + .../sql/execution/streaming/Triggers.scala | 2 +- 7 files changed, 214 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1860a43e/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 3219c6f..c51eb0f 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -179,6 +179,7 @@ exportMethods("arrange", "with", "withColumn", "withColumnRenamed", + "withWatermark", "write.df", "write.jdbc", "write.json", http://git-wip-us.apache.org/repos/asf/spark/blob/1860a43e/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index fe238f6..9956f7e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3661,7 +3661,8 @@ setMethod("getNumPartitions", #' isStreaming #' #' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data -#' as it arrives. +#' as it arrives. A dataset that reads data from a streaming source must be executed as a +#' \code{StreamingQuery} using \code{write.stream}. #' #' @param x A SparkDataFrame #' @return TRUE if this SparkDataFrame is from a streaming source @@ -3707,7 +3708,17 @@ setMethod("isStreaming", #' @param df a streaming SparkDataFrame. #' @param source a name for external data source. #' @param outputMode one of 'append', 'complete', 'update'. -#' @param ... additional argument(s) passed to the method. +#' @param partitionBy a name or a list of names of columns to partition the output by on the file +#'system. If specified, the output is laid out on the file system similar to Hive's +#'partitioning scheme. +#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds', +#''1 minute'. This is a trigger that runs a query periodically based on the processing +#'time. If value is '0 seconds', the query will run as fast as possible, this is the +#'default. Only one trigger can be set. +#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only +#'one batch of data in a streaming query then terminates the query. Only one trigger can be +#'set. +#' @param ... additional external data source specific named options. #' #' @family SparkDataFrame functions #' @seealso \link{read.stream} @@ -3725,7 +3736,8 @@ setMethod("isStreaming", #' # console #' q <- write.stream(wordCounts, "console", outputMode = "complete") #' # text stream -#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp") +#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp" +#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds") #' # memory stream #' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete") #' head(sql("SELECT * from outs")) @@ -3737,7 +3749,8 @@ setMethod("isStreaming", #' @note experimental setMethod("write.stream", signature(df = "SparkDataFrame"), - function(df, source = NULL, outputMode = NULL, ...) { + function(df, source = NULL,
spark git commit: [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy
Repository: spark Updated Branches: refs/heads/master 7d045c5f0 -> df95a908b [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy ## What changes were proposed in this pull request? R Structured Streaming API for withWatermark, trigger, partitionBy ## How was this patch tested? manual, unit tests Author: Felix CheungCloses #20129 from felixcheung/rwater. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df95a908 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df95a908 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df95a908 Branch: refs/heads/master Commit: df95a908baf78800556636a76d58bba9b3dd943f Parents: 7d045c5 Author: Felix Cheung Authored: Wed Jan 3 21:43:14 2018 -0800 Committer: Felix Cheung Committed: Wed Jan 3 21:43:14 2018 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 96 - R/pkg/R/SQLContext.R| 4 +- R/pkg/R/generics.R | 6 ++ R/pkg/tests/fulltests/test_streaming.R | 107 +++ python/pyspark/sql/streaming.py | 4 + .../sql/execution/streaming/Triggers.scala | 2 +- 7 files changed, 214 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 3219c6f..c51eb0f 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -179,6 +179,7 @@ exportMethods("arrange", "with", "withColumn", "withColumnRenamed", + "withWatermark", "write.df", "write.jdbc", "write.json", http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index fe238f6..9956f7e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3661,7 +3661,8 @@ setMethod("getNumPartitions", #' isStreaming #' #' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data -#' as it arrives. +#' as it arrives. A dataset that reads data from a streaming source must be executed as a +#' \code{StreamingQuery} using \code{write.stream}. #' #' @param x A SparkDataFrame #' @return TRUE if this SparkDataFrame is from a streaming source @@ -3707,7 +3708,17 @@ setMethod("isStreaming", #' @param df a streaming SparkDataFrame. #' @param source a name for external data source. #' @param outputMode one of 'append', 'complete', 'update'. -#' @param ... additional argument(s) passed to the method. +#' @param partitionBy a name or a list of names of columns to partition the output by on the file +#'system. If specified, the output is laid out on the file system similar to Hive's +#'partitioning scheme. +#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds', +#''1 minute'. This is a trigger that runs a query periodically based on the processing +#'time. If value is '0 seconds', the query will run as fast as possible, this is the +#'default. Only one trigger can be set. +#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only +#'one batch of data in a streaming query then terminates the query. Only one trigger can be +#'set. +#' @param ... additional external data source specific named options. #' #' @family SparkDataFrame functions #' @seealso \link{read.stream} @@ -3725,7 +3736,8 @@ setMethod("isStreaming", #' # console #' q <- write.stream(wordCounts, "console", outputMode = "complete") #' # text stream -#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp") +#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp" +#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds") #' # memory stream #' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete") #' head(sql("SELECT * from outs")) @@ -3737,7 +3749,8 @@ setMethod("isStreaming", #' @note experimental setMethod("write.stream", signature(df = "SparkDataFrame"), - function(df, source = NULL, outputMode = NULL, ...) { + function(df, source = NULL, outputMode = NULL, partitionBy = NULL, + trigger.processingTime = NULL, trigger.once = NULL, ...) { if