http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/01936c1d/sparkr-interpreter/src/main/resources/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/generics.R b/sparkr-interpreter/src/main/resources/R/pkg/R/generics.R deleted file mode 100644 index 43dd8d2..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/generics.R +++ /dev/null @@ -1,985 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -############ RDD Actions and Transformations ############ - -# @rdname aggregateRDD -# @seealso reduce -# @export -setGeneric("aggregateRDD", - function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") }) - -# @rdname cache-methods -# @export -setGeneric("cache", function(x) { standardGeneric("cache") }) - -# @rdname coalesce -# @seealso repartition -# @export -setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") }) - -# @rdname checkpoint-methods -# @export -setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") }) - -# @rdname collect-methods -# @export -setGeneric("collect", function(x, ...) { standardGeneric("collect") }) - -# @rdname collect-methods -# @export -setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") }) - -# @rdname collect-methods -# @export -setGeneric("collectPartition", - function(x, partitionId) { - standardGeneric("collectPartition") - }) - -# @rdname count -# @export -setGeneric("count", function(x) { standardGeneric("count") }) - -# @rdname countByValue -# @export -setGeneric("countByValue", function(x) { standardGeneric("countByValue") }) - -# @rdname statfunctions -# @export -setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") }) - -# @rdname distinct -# @export -setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") }) - -# @rdname filterRDD -# @export -setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") }) - -# @rdname first -# @export -setGeneric("first", function(x) { standardGeneric("first") }) - -# @rdname flatMap -# @export -setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") }) - -# @rdname fold -# @seealso reduce -# @export -setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") }) - -# @rdname foreach -# @export -setGeneric("foreach", function(x, func) { standardGeneric("foreach") }) - -# @rdname foreach -# @export -setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachPartition") }) - -# The jrdd accessor function. -setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") }) - -# @rdname glom -# @export -setGeneric("glom", function(x) { standardGeneric("glom") }) - -# @rdname keyBy -# @export -setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") }) - -# @rdname lapplyPartition -# @export -setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") }) - -# @rdname lapplyPartitionsWithIndex -# @export -setGeneric("lapplyPartitionsWithIndex", - function(X, FUN) { - standardGeneric("lapplyPartitionsWithIndex") - }) - -# @rdname lapply -# @export -setGeneric("map", function(X, FUN) { standardGeneric("map") }) - -# @rdname lapplyPartition -# @export -setGeneric("mapPartitions", function(X, FUN) { standardGeneric("mapPartitions") }) - -# @rdname lapplyPartitionsWithIndex -# @export -setGeneric("mapPartitionsWithIndex", - function(X, FUN) { standardGeneric("mapPartitionsWithIndex") }) - -# @rdname maximum -# @export -setGeneric("maximum", function(x) { standardGeneric("maximum") }) - -# @rdname minimum -# @export -setGeneric("minimum", function(x) { standardGeneric("minimum") }) - -# @rdname sumRDD -# @export -setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") }) - -# @rdname name -# @export -setGeneric("name", function(x) { standardGeneric("name") }) - -# @rdname numPartitions -# @export -setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") }) - -# @rdname persist -# @export -setGeneric("persist", function(x, newLevel) { standardGeneric("persist") }) - -# @rdname pipeRDD -# @export -setGeneric("pipeRDD", function(x, command, env = list()) { standardGeneric("pipeRDD")}) - -# @rdname reduce -# @export -setGeneric("reduce", function(x, func) { standardGeneric("reduce") }) - -# @rdname repartition -# @seealso coalesce -# @export -setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") }) - -# @rdname sampleRDD -# @export -setGeneric("sampleRDD", - function(x, withReplacement, fraction, seed) { - standardGeneric("sampleRDD") - }) - -# @rdname saveAsObjectFile -# @seealso objectFile -# @export -setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") }) - -# @rdname saveAsTextFile -# @export -setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") }) - -# @rdname setName -# @export -setGeneric("setName", function(x, name) { standardGeneric("setName") }) - -# @rdname sortBy -# @export -setGeneric("sortBy", - function(x, func, ascending = TRUE, numPartitions = 1) { - standardGeneric("sortBy") - }) - -# @rdname take -# @export -setGeneric("take", function(x, num) { standardGeneric("take") }) - -# @rdname takeOrdered -# @export -setGeneric("takeOrdered", function(x, num) { standardGeneric("takeOrdered") }) - -# @rdname takeSample -# @export -setGeneric("takeSample", - function(x, withReplacement, num, seed) { - standardGeneric("takeSample") - }) - -# @rdname top -# @export -setGeneric("top", function(x, num) { standardGeneric("top") }) - -# @rdname unionRDD -# @export -setGeneric("unionRDD", function(x, y) { standardGeneric("unionRDD") }) - -# @rdname unpersist-methods -# @export -setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") }) - -# @rdname zipRDD -# @export -setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") }) - -# @rdname zipRDD -# @export -setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") }, - signature = "...") - -# @rdname zipWithIndex -# @seealso zipWithUniqueId -# @export -setGeneric("zipWithIndex", function(x) { standardGeneric("zipWithIndex") }) - -# @rdname zipWithUniqueId -# @seealso zipWithIndex -# @export -setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") }) - - -############ Binary Functions ############# - -# @rdname cartesian -# @export -setGeneric("cartesian", function(x, other) { standardGeneric("cartesian") }) - -# @rdname countByKey -# @export -setGeneric("countByKey", function(x) { standardGeneric("countByKey") }) - -# @rdname flatMapValues -# @export -setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") }) - -# @rdname intersection -# @export -setGeneric("intersection", - function(x, other, numPartitions = 1) { - standardGeneric("intersection") - }) - -# @rdname keys -# @export -setGeneric("keys", function(x) { standardGeneric("keys") }) - -# @rdname lookup -# @export -setGeneric("lookup", function(x, key) { standardGeneric("lookup") }) - -# @rdname mapValues -# @export -setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") }) - -# @rdname sampleByKey -# @export -setGeneric("sampleByKey", - function(x, withReplacement, fractions, seed) { - standardGeneric("sampleByKey") - }) - -# @rdname values -# @export -setGeneric("values", function(x) { standardGeneric("values") }) - - -############ Shuffle Functions ############ - -# @rdname aggregateByKey -# @seealso foldByKey, combineByKey -# @export -setGeneric("aggregateByKey", - function(x, zeroValue, seqOp, combOp, numPartitions) { - standardGeneric("aggregateByKey") - }) - -# @rdname cogroup -# @export -setGeneric("cogroup", - function(..., numPartitions) { - standardGeneric("cogroup") - }, - signature = "...") - -# @rdname combineByKey -# @seealso groupByKey, reduceByKey -# @export -setGeneric("combineByKey", - function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) { - standardGeneric("combineByKey") - }) - -# @rdname foldByKey -# @seealso aggregateByKey, combineByKey -# @export -setGeneric("foldByKey", - function(x, zeroValue, func, numPartitions) { - standardGeneric("foldByKey") - }) - -# @rdname join-methods -# @export -setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") }) - -# @rdname groupByKey -# @seealso reduceByKey -# @export -setGeneric("groupByKey", function(x, numPartitions) { standardGeneric("groupByKey") }) - -# @rdname join-methods -# @export -setGeneric("join", function(x, y, ...) { standardGeneric("join") }) - -# @rdname join-methods -# @export -setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") }) - -# @rdname partitionBy -# @export -setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") }) - -# @rdname reduceByKey -# @seealso groupByKey -# @export -setGeneric("reduceByKey", function(x, combineFunc, numPartitions) { standardGeneric("reduceByKey")}) - -# @rdname reduceByKeyLocally -# @seealso reduceByKey -# @export -setGeneric("reduceByKeyLocally", - function(x, combineFunc) { - standardGeneric("reduceByKeyLocally") - }) - -# @rdname join-methods -# @export -setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") }) - -# @rdname sortByKey -# @export -setGeneric("sortByKey", - function(x, ascending = TRUE, numPartitions = 1) { - standardGeneric("sortByKey") - }) - -# @rdname subtract -# @export -setGeneric("subtract", - function(x, other, numPartitions = 1) { - standardGeneric("subtract") - }) - -# @rdname subtractByKey -# @export -setGeneric("subtractByKey", - function(x, other, numPartitions = 1) { - standardGeneric("subtractByKey") - }) - - -################### Broadcast Variable Methods ################# - -# @rdname broadcast -# @export -setGeneric("value", function(bcast) { standardGeneric("value") }) - - - -#################### DataFrame Methods ######################## - -#' @rdname agg -#' @export -setGeneric("agg", function (x, ...) { standardGeneric("agg") }) - -#' @rdname arrange -#' @export -setGeneric("arrange", function(x, col, ...) { standardGeneric("arrange") }) - -#' @rdname schema -#' @export -setGeneric("columns", function(x) {standardGeneric("columns") }) - -#' @rdname describe -#' @export -setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) - -#' @rdname nafunctions -#' @export -setGeneric("dropna", - function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) { - standardGeneric("dropna") - }) - -#' @rdname nafunctions -#' @export -setGeneric("na.omit", - function(object, ...) { - standardGeneric("na.omit") - }) - -#' @rdname schema -#' @export -setGeneric("dtypes", function(x) { standardGeneric("dtypes") }) - -#' @rdname explain -#' @export -setGeneric("explain", function(x, ...) { standardGeneric("explain") }) - -#' @rdname except -#' @export -setGeneric("except", function(x, y) { standardGeneric("except") }) - -#' @rdname nafunctions -#' @export -setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") }) - -#' @rdname filter -#' @export -setGeneric("filter", function(x, condition) { standardGeneric("filter") }) - -#' @rdname groupBy -#' @export -setGeneric("group_by", function(x, ...) { standardGeneric("group_by") }) - -#' @rdname groupBy -#' @export -setGeneric("groupBy", function(x, ...) { standardGeneric("groupBy") }) - -#' @rdname insertInto -#' @export -setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertInto") }) - -#' @rdname intersect -#' @export -setGeneric("intersect", function(x, y) { standardGeneric("intersect") }) - -#' @rdname isLocal -#' @export -setGeneric("isLocal", function(x) { standardGeneric("isLocal") }) - -#' @rdname limit -#' @export -setGeneric("limit", function(x, num) {standardGeneric("limit") }) - -#' rdname merge -#' @export -setGeneric("merge") - -#' @rdname withColumn -#' @export -setGeneric("mutate", function(.data, ...) {standardGeneric("mutate") }) - -#' @rdname arrange -#' @export -setGeneric("orderBy", function(x, col) { standardGeneric("orderBy") }) - -#' @rdname schema -#' @export -setGeneric("printSchema", function(x) { standardGeneric("printSchema") }) - -#' @rdname withColumnRenamed -#' @export -setGeneric("rename", function(x, ...) { standardGeneric("rename") }) - -#' @rdname registerTempTable -#' @export -setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") }) - -#' @rdname sample -#' @export -setGeneric("sample", - function(x, withReplacement, fraction, seed) { - standardGeneric("sample") - }) - -#' @rdname sample -#' @export -setGeneric("sample_frac", - function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") }) - -#' @rdname saveAsParquetFile -#' @export -setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") }) - -#' @rdname saveAsTable -#' @export -setGeneric("saveAsTable", function(df, tableName, source, mode, ...) { - standardGeneric("saveAsTable") -}) - -#' @rdname withColumn -#' @export -setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") }) - -#' @rdname write.df -#' @export -setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") }) - -#' @rdname write.df -#' @export -setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") }) - -#' @rdname schema -#' @export -setGeneric("schema", function(x) { standardGeneric("schema") }) - -#' @rdname select -#' @export -setGeneric("select", function(x, col, ...) { standardGeneric("select") } ) - -#' @rdname select -#' @export -setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr") }) - -#' @rdname showDF -#' @export -setGeneric("showDF", function(x,...) { standardGeneric("showDF") }) - -# @rdname subset -# @export -setGeneric("subset", function(x, subset, select, ...) { standardGeneric("subset") }) - -#' @rdname agg -#' @export -setGeneric("summarize", function(x,...) { standardGeneric("summarize") }) - -#' @rdname summary -#' @export -setGeneric("summary", function(x, ...) { standardGeneric("summary") }) - -# @rdname tojson -# @export -setGeneric("toJSON", function(x) { standardGeneric("toJSON") }) - -#' @rdname DataFrame -#' @export -setGeneric("toRDD", function(x) { standardGeneric("toRDD") }) - -#' @rdname unionAll -#' @export -setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") }) - -#' @rdname filter -#' @export -setGeneric("where", function(x, condition) { standardGeneric("where") }) - -#' @rdname withColumn -#' @export -setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn") }) - -#' @rdname withColumnRenamed -#' @export -setGeneric("withColumnRenamed", - function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") }) - - -###################### Column Methods ########################## - -#' @rdname column -#' @export -setGeneric("asc", function(x) { standardGeneric("asc") }) - -#' @rdname column -#' @export -setGeneric("between", function(x, bounds) { standardGeneric("between") }) - -#' @rdname column -#' @export -setGeneric("cast", function(x, dataType) { standardGeneric("cast") }) - -#' @rdname column -#' @export -setGeneric("contains", function(x, ...) { standardGeneric("contains") }) - -#' @rdname column -#' @export -setGeneric("desc", function(x) { standardGeneric("desc") }) - -#' @rdname column -#' @export -setGeneric("endsWith", function(x, ...) { standardGeneric("endsWith") }) - -#' @rdname column -#' @export -setGeneric("getField", function(x, ...) { standardGeneric("getField") }) - -#' @rdname column -#' @export -setGeneric("getItem", function(x, ...) { standardGeneric("getItem") }) - -#' @rdname column -#' @export -setGeneric("isNull", function(x) { standardGeneric("isNull") }) - -#' @rdname column -#' @export -setGeneric("isNotNull", function(x) { standardGeneric("isNotNull") }) - -#' @rdname column -#' @export -setGeneric("like", function(x, ...) { standardGeneric("like") }) - -#' @rdname column -#' @export -setGeneric("rlike", function(x, ...) { standardGeneric("rlike") }) - -#' @rdname column -#' @export -setGeneric("startsWith", function(x, ...) { standardGeneric("startsWith") }) - -#' @rdname column -#' @export -setGeneric("when", function(condition, value) { standardGeneric("when") }) - -#' @rdname column -#' @export -setGeneric("otherwise", function(x, value) { standardGeneric("otherwise") }) - - -###################### Expression Function Methods ########################## - -#' @rdname add_months -#' @export -setGeneric("add_months", function(y, x) { standardGeneric("add_months") }) - -#' @rdname approxCountDistinct -#' @export -setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCountDistinct") }) - -#' @rdname ascii -#' @export -setGeneric("ascii", function(x) { standardGeneric("ascii") }) - -#' @rdname avg -#' @export -setGeneric("avg", function(x, ...) { standardGeneric("avg") }) - -#' @rdname base64 -#' @export -setGeneric("base64", function(x) { standardGeneric("base64") }) - -#' @rdname bin -#' @export -setGeneric("bin", function(x) { standardGeneric("bin") }) - -#' @rdname bitwiseNOT -#' @export -setGeneric("bitwiseNOT", function(x) { standardGeneric("bitwiseNOT") }) - -#' @rdname cbrt -#' @export -setGeneric("cbrt", function(x) { standardGeneric("cbrt") }) - -#' @rdname ceil -#' @export -setGeneric("ceil", function(x) { standardGeneric("ceil") }) - -#' @rdname concat -#' @export -setGeneric("concat", function(x, ...) { standardGeneric("concat") }) - -#' @rdname concat_ws -#' @export -setGeneric("concat_ws", function(sep, x, ...) { standardGeneric("concat_ws") }) - -#' @rdname conv -#' @export -setGeneric("conv", function(x, fromBase, toBase) { standardGeneric("conv") }) - -#' @rdname countDistinct -#' @export -setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct") }) - -#' @rdname crc32 -#' @export -setGeneric("crc32", function(x) { standardGeneric("crc32") }) - -#' @rdname datediff -#' @export -setGeneric("datediff", function(y, x) { standardGeneric("datediff") }) - -#' @rdname date_add -#' @export -setGeneric("date_add", function(y, x) { standardGeneric("date_add") }) - -#' @rdname date_format -#' @export -setGeneric("date_format", function(y, x) { standardGeneric("date_format") }) - -#' @rdname date_sub -#' @export -setGeneric("date_sub", function(y, x) { standardGeneric("date_sub") }) - -#' @rdname dayofmonth -#' @export -setGeneric("dayofmonth", function(x) { standardGeneric("dayofmonth") }) - -#' @rdname dayofyear -#' @export -setGeneric("dayofyear", function(x) { standardGeneric("dayofyear") }) - -#' @rdname explode -#' @export -setGeneric("explode", function(x) { standardGeneric("explode") }) - -#' @rdname expr -#' @export -setGeneric("expr", function(x) { standardGeneric("expr") }) - -#' @rdname from_utc_timestamp -#' @export -setGeneric("from_utc_timestamp", function(y, x) { standardGeneric("from_utc_timestamp") }) - -#' @rdname format_number -#' @export -setGeneric("format_number", function(y, x) { standardGeneric("format_number") }) - -#' @rdname format_string -#' @export -setGeneric("format_string", function(format, x, ...) { standardGeneric("format_string") }) - -#' @rdname from_unixtime -#' @export -setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") }) - -#' @rdname greatest -#' @export -setGeneric("greatest", function(x, ...) { standardGeneric("greatest") }) - -#' @rdname hex -#' @export -setGeneric("hex", function(x) { standardGeneric("hex") }) - -#' @rdname hour -#' @export -setGeneric("hour", function(x) { standardGeneric("hour") }) - -#' @rdname hypot -#' @export -setGeneric("hypot", function(y, x) { standardGeneric("hypot") }) - -#' @rdname initcap -#' @export -setGeneric("initcap", function(x) { standardGeneric("initcap") }) - -#' @rdname instr -#' @export -setGeneric("instr", function(y, x) { standardGeneric("instr") }) - -#' @rdname isNaN -#' @export -setGeneric("isNaN", function(x) { standardGeneric("isNaN") }) - -#' @rdname last -#' @export -setGeneric("last", function(x) { standardGeneric("last") }) - -#' @rdname last_day -#' @export -setGeneric("last_day", function(x) { standardGeneric("last_day") }) - -#' @rdname least -#' @export -setGeneric("least", function(x, ...) { standardGeneric("least") }) - -#' @rdname levenshtein -#' @export -setGeneric("levenshtein", function(y, x) { standardGeneric("levenshtein") }) - -#' @rdname lit -#' @export -setGeneric("lit", function(x) { standardGeneric("lit") }) - -#' @rdname locate -#' @export -setGeneric("locate", function(substr, str, ...) { standardGeneric("locate") }) - -#' @rdname lower -#' @export -setGeneric("lower", function(x) { standardGeneric("lower") }) - -#' @rdname lpad -#' @export -setGeneric("lpad", function(x, len, pad) { standardGeneric("lpad") }) - -#' @rdname ltrim -#' @export -setGeneric("ltrim", function(x) { standardGeneric("ltrim") }) - -#' @rdname md5 -#' @export -setGeneric("md5", function(x) { standardGeneric("md5") }) - -#' @rdname minute -#' @export -setGeneric("minute", function(x) { standardGeneric("minute") }) - -#' @rdname month -#' @export -setGeneric("month", function(x) { standardGeneric("month") }) - -#' @rdname months_between -#' @export -setGeneric("months_between", function(y, x) { standardGeneric("months_between") }) - -#' @rdname count -#' @export -setGeneric("n", function(x) { standardGeneric("n") }) - -#' @rdname nanvl -#' @export -setGeneric("nanvl", function(y, x) { standardGeneric("nanvl") }) - -#' @rdname negate -#' @export -setGeneric("negate", function(x) { standardGeneric("negate") }) - -#' @rdname next_day -#' @export -setGeneric("next_day", function(y, x) { standardGeneric("next_day") }) - -#' @rdname countDistinct -#' @export -setGeneric("n_distinct", function(x, ...) { standardGeneric("n_distinct") }) - -#' @rdname pmod -#' @export -setGeneric("pmod", function(y, x) { standardGeneric("pmod") }) - -#' @rdname quarter -#' @export -setGeneric("quarter", function(x) { standardGeneric("quarter") }) - -#' @rdname rand -#' @export -setGeneric("rand", function(seed) { standardGeneric("rand") }) - -#' @rdname randn -#' @export -setGeneric("randn", function(seed) { standardGeneric("randn") }) - -#' @rdname regexp_extract -#' @export -setGeneric("regexp_extract", function(x, pattern, idx) { standardGeneric("regexp_extract") }) - -#' @rdname regexp_replace -#' @export -setGeneric("regexp_replace", - function(x, pattern, replacement) { standardGeneric("regexp_replace") }) - -#' @rdname reverse -#' @export -setGeneric("reverse", function(x) { standardGeneric("reverse") }) - -#' @rdname rint -#' @export -setGeneric("rint", function(x, ...) { standardGeneric("rint") }) - -#' @rdname rpad -#' @export -setGeneric("rpad", function(x, len, pad) { standardGeneric("rpad") }) - -#' @rdname rtrim -#' @export -setGeneric("rtrim", function(x) { standardGeneric("rtrim") }) - -#' @rdname second -#' @export -setGeneric("second", function(x) { standardGeneric("second") }) - -#' @rdname sha1 -#' @export -setGeneric("sha1", function(x) { standardGeneric("sha1") }) - -#' @rdname sha2 -#' @export -setGeneric("sha2", function(y, x) { standardGeneric("sha2") }) - -#' @rdname shiftLeft -#' @export -setGeneric("shiftLeft", function(y, x) { standardGeneric("shiftLeft") }) - -#' @rdname shiftRight -#' @export -setGeneric("shiftRight", function(y, x) { standardGeneric("shiftRight") }) - -#' @rdname shiftRightUnsigned -#' @export -setGeneric("shiftRightUnsigned", function(y, x) { standardGeneric("shiftRightUnsigned") }) - -#' @rdname signum -#' @export -setGeneric("signum", function(x) { standardGeneric("signum") }) - -#' @rdname size -#' @export -setGeneric("size", function(x) { standardGeneric("size") }) - -#' @rdname soundex -#' @export -setGeneric("soundex", function(x) { standardGeneric("soundex") }) - -#' @rdname substring_index -#' @export -setGeneric("substring_index", function(x, delim, count) { standardGeneric("substring_index") }) - -#' @rdname sumDistinct -#' @export -setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") }) - -#' @rdname toDegrees -#' @export -setGeneric("toDegrees", function(x) { standardGeneric("toDegrees") }) - -#' @rdname toRadians -#' @export -setGeneric("toRadians", function(x) { standardGeneric("toRadians") }) - -#' @rdname to_date -#' @export -setGeneric("to_date", function(x) { standardGeneric("to_date") }) - -#' @rdname to_utc_timestamp -#' @export -setGeneric("to_utc_timestamp", function(y, x) { standardGeneric("to_utc_timestamp") }) - -#' @rdname translate -#' @export -setGeneric("translate", function(x, matchingString, replaceString) { standardGeneric("translate") }) - -#' @rdname trim -#' @export -setGeneric("trim", function(x) { standardGeneric("trim") }) - -#' @rdname unbase64 -#' @export -setGeneric("unbase64", function(x) { standardGeneric("unbase64") }) - -#' @rdname unhex -#' @export -setGeneric("unhex", function(x) { standardGeneric("unhex") }) - -#' @rdname unix_timestamp -#' @export -setGeneric("unix_timestamp", function(x, format) { standardGeneric("unix_timestamp") }) - -#' @rdname upper -#' @export -setGeneric("upper", function(x) { standardGeneric("upper") }) - -#' @rdname weekofyear -#' @export -setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") }) - -#' @rdname year -#' @export -setGeneric("year", function(x) { standardGeneric("year") }) - - -#' @rdname glm -#' @export -setGeneric("glm") - -#' @rdname rbind -#' @export -setGeneric("rbind", signature = "...")
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/01936c1d/sparkr-interpreter/src/main/resources/R/pkg/R/group.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/group.R b/sparkr-interpreter/src/main/resources/R/pkg/R/group.R deleted file mode 100644 index 576ac72..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/group.R +++ /dev/null @@ -1,138 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# group.R - GroupedData class and methods implemented in S4 OO classes - -#' @include generics.R jobj.R schema.R column.R -NULL - -setOldClass("jobj") - -#' @title S4 class that represents a GroupedData -#' @description GroupedDatas can be created using groupBy() on a DataFrame -#' @rdname GroupedData -#' @seealso groupBy -#' -#' @param sgd A Java object reference to the backing Scala GroupedData -#' @export -setClass("GroupedData", - slots = list(sgd = "jobj")) - -setMethod("initialize", "GroupedData", function(.Object, sgd) { - .Object@sgd <- sgd - .Object -}) - -#' @rdname DataFrame -groupedData <- function(sgd) { - new("GroupedData", sgd) -} - - -#' @rdname show -setMethod("show", "GroupedData", - function(object) { - cat("GroupedData\n") - }) - -#' Count -#' -#' Count the number of rows for each group. -#' The resulting DataFrame will also contain the grouping columns. -#' -#' @param x a GroupedData -#' @return a DataFrame -#' @rdname agg -#' @export -#' @examples -#' \dontrun{ -#' count(groupBy(df, "name")) -#' } -setMethod("count", - signature(x = "GroupedData"), - function(x) { - dataFrame(callJMethod(x@sgd, "count")) - }) - -#' Agg -#' -#' Aggregates on the entire DataFrame without groups. -#' The resulting DataFrame will also contain the grouping columns. -#' -#' df2 <- agg(df, <column> = <aggFunction>) -#' df2 <- agg(df, newColName = aggFunction(column)) -#' -#' @param x a GroupedData -#' @return a DataFrame -#' @rdname agg -#' @examples -#' \dontrun{ -#' df2 <- agg(df, age = "sum") # new column name will be created as 'SUM(age#0)' -#' df2 <- agg(df, ageSum = sum(df$age)) # Creates a new column named ageSum -#' } -setMethod("agg", - signature(x = "GroupedData"), - function(x, ...) { - cols <- list(...) - stopifnot(length(cols) > 0) - if (is.character(cols[[1]])) { - cols <- varargsToEnv(...) - sdf <- callJMethod(x@sgd, "agg", cols) - } else if (class(cols[[1]]) == "Column") { - ns <- names(cols) - if (!is.null(ns)) { - for (n in ns) { - if (n != "") { - cols[[n]] <- alias(cols[[n]], n) - } - } - } - jcols <- lapply(cols, function(c) { c@jc }) - sdf <- callJMethod(x@sgd, "agg", jcols[[1]], listToSeq(jcols[-1])) - } else { - stop("agg can only support Column or character") - } - dataFrame(sdf) - }) - -#' @rdname agg -#' @aliases agg -setMethod("summarize", - signature(x = "GroupedData"), - function(x, ...) { - agg(x, ...) - }) - -# sum/mean/avg/min/max -methods <- c("sum", "mean", "avg", "min", "max") - -createMethod <- function(name) { - setMethod(name, - signature(x = "GroupedData"), - function(x, ...) { - sdf <- callJMethod(x@sgd, name, toSeq(...)) - dataFrame(sdf) - }) -} - -createMethods <- function() { - for (name in methods) { - createMethod(name) - } -} - -createMethods() http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/01936c1d/sparkr-interpreter/src/main/resources/R/pkg/R/jobj.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/jobj.R b/sparkr-interpreter/src/main/resources/R/pkg/R/jobj.R deleted file mode 100644 index 0838a7b..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/jobj.R +++ /dev/null @@ -1,104 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# References to objects that exist on the JVM backend -# are maintained using the jobj. - -#' @include generics.R -NULL - -# Maintain a reference count of Java object references -# This allows us to GC the java object when it is safe -.validJobjs <- new.env(parent = emptyenv()) - -# List of object ids to be removed -.toRemoveJobjs <- new.env(parent = emptyenv()) - -# Check if jobj was created with the current SparkContext -isValidJobj <- function(jobj) { - if (exists(".scStartTime", envir = .sparkREnv)) { - jobj$appId == get(".scStartTime", envir = .sparkREnv) - } else { - FALSE - } -} - -getJobj <- function(objId) { - newObj <- jobj(objId) - if (exists(objId, .validJobjs)) { - .validJobjs[[objId]] <- .validJobjs[[objId]] + 1 - } else { - .validJobjs[[objId]] <- 1 - } - newObj -} - -# Handler for a java object that exists on the backend. -jobj <- function(objId) { - if (!is.character(objId)) { - stop("object id must be a character") - } - # NOTE: We need a new env for a jobj as we can only register - # finalizers for environments or external references pointers. - obj <- structure(new.env(parent = emptyenv()), class = "jobj") - obj$id <- objId - obj$appId <- get(".scStartTime", envir = .sparkREnv) - - # Register a finalizer to remove the Java object when this reference - # is garbage collected in R - reg.finalizer(obj, cleanup.jobj) - obj -} - -#' Print a JVM object reference. -#' -#' This function prints the type and id for an object stored -#' in the SparkR JVM backend. -#' -#' @param x The JVM object reference -#' @param ... further arguments passed to or from other methods -print.jobj <- function(x, ...) { - cls <- callJMethod(x, "getClass") - name <- callJMethod(cls, "getName") - cat("Java ref type", name, "id", x$id, "\n", sep = " ") -} - -cleanup.jobj <- function(jobj) { - if (isValidJobj(jobj)) { - objId <- jobj$id - # If we don't know anything about this jobj, ignore it - if (exists(objId, envir = .validJobjs)) { - .validJobjs[[objId]] <- .validJobjs[[objId]] - 1 - - if (.validJobjs[[objId]] == 0) { - rm(list = objId, envir = .validJobjs) - # NOTE: We cannot call removeJObject here as the finalizer may be run - # in the middle of another RPC. Thus we queue up this object Id to be removed - # and then run all the removeJObject when the next RPC is called. - .toRemoveJobjs[[objId]] <- 1 - } - } - } -} - -clearJobjs <- function() { - valid <- ls(.validJobjs) - rm(list = valid, envir = .validJobjs) - - removeList <- ls(.toRemoveJobjs) - rm(list = removeList, envir = .toRemoveJobjs) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/01936c1d/sparkr-interpreter/src/main/resources/R/pkg/R/mllib.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/mllib.R b/sparkr-interpreter/src/main/resources/R/pkg/R/mllib.R deleted file mode 100644 index cea3d76..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/mllib.R +++ /dev/null @@ -1,99 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# mllib.R: Provides methods for MLlib integration - -#' @title S4 class that represents a PipelineModel -#' @param model A Java object reference to the backing Scala PipelineModel -#' @export -setClass("PipelineModel", representation(model = "jobj")) - -#' Fits a generalized linear model -#' -#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package. -#' -#' @param formula A symbolic description of the model to be fitted. Currently only a few formula -#' operators are supported, including '~', '+', '-', and '.'. -#' @param data DataFrame for training -#' @param family Error distribution. "gaussian" -> linear regression, "binomial" -> logistic reg. -#' @param lambda Regularization parameter -#' @param alpha Elastic-net mixing parameter (see glmnet's documentation for details) -#' @return a fitted MLlib model -#' @rdname glm -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlContext <- sparkRSQL.init(sc) -#' data(iris) -#' df <- createDataFrame(sqlContext, iris) -#' model <- glm(Sepal_Length ~ Sepal_Width, df) -#'} -setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"), - function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0) { - family <- match.arg(family) - model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "fitRModelFormula", deparse(formula), data@sdf, family, lambda, - alpha) - return(new("PipelineModel", model = model)) - }) - -#' Make predictions from a model -#' -#' Makes predictions from a model produced by glm(), similarly to R's predict(). -#' -#' @param object A fitted MLlib model -#' @param newData DataFrame for testing -#' @return DataFrame containing predicted values -#' @rdname predict -#' @export -#' @examples -#'\dontrun{ -#' model <- glm(y ~ x, trainingData) -#' predicted <- predict(model, testData) -#' showDF(predicted) -#'} -setMethod("predict", signature(object = "PipelineModel"), - function(object, newData) { - return(dataFrame(callJMethod(object@model, "transform", newData@sdf))) - }) - -#' Get the summary of a model -#' -#' Returns the summary of a model produced by glm(), similarly to R's summary(). -#' -#' @param x A fitted MLlib model -#' @return a list with a 'coefficient' component, which is the matrix of coefficients. See -#' summary.glm for more information. -#' @rdname summary -#' @export -#' @examples -#'\dontrun{ -#' model <- glm(y ~ x, trainingData) -#' summary(model) -#'} -setMethod("summary", signature(x = "PipelineModel"), - function(x, ...) { - features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelFeatures", x@model) - weights <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelWeights", x@model) - coefficients <- as.matrix(unlist(weights)) - colnames(coefficients) <- c("Estimate") - rownames(coefficients) <- unlist(features) - return(list(coefficients = coefficients)) - }) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/01936c1d/sparkr-interpreter/src/main/resources/R/pkg/R/pairRDD.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/pairRDD.R b/sparkr-interpreter/src/main/resources/R/pkg/R/pairRDD.R deleted file mode 100644 index 3680adc..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/pairRDD.R +++ /dev/null @@ -1,908 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Operations supported on RDDs contains pairs (i.e key, value) -#' @include generics.R jobj.R RDD.R -NULL - -############ Actions and Transformations ############ - -# Look up elements of a key in an RDD -# -# @description -# \code{lookup} returns a list of values in this RDD for key key. -# -# @param x The RDD to collect -# @param key The key to look up for -# @return a list of values in this RDD for key key -# @examples -#\dontrun{ -# sc <- sparkR.init() -# pairs <- list(c(1, 1), c(2, 2), c(1, 3)) -# rdd <- parallelize(sc, pairs) -# lookup(rdd, 1) # list(1, 3) -#} -# @rdname lookup -# @aliases lookup,RDD-method -setMethod("lookup", - signature(x = "RDD", key = "ANY"), - function(x, key) { - partitionFunc <- function(part) { - filtered <- part[unlist(lapply(part, function(i) { identical(key, i[[1]]) }))] - lapply(filtered, function(i) { i[[2]] }) - } - valsRDD <- lapplyPartition(x, partitionFunc) - collect(valsRDD) - }) - -# Count the number of elements for each key, and return the result to the -# master as lists of (key, count) pairs. -# -# Same as countByKey in Spark. -# -# @param x The RDD to count keys. -# @return list of (key, count) pairs, where count is number of each key in rdd. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1))) -# countByKey(rdd) # ("a", 2L), ("b", 1L) -#} -# @rdname countByKey -# @aliases countByKey,RDD-method -setMethod("countByKey", - signature(x = "RDD"), - function(x) { - keys <- lapply(x, function(item) { item[[1]] }) - countByValue(keys) - }) - -# Return an RDD with the keys of each tuple. -# -# @param x The RDD from which the keys of each tuple is returned. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) -# collect(keys(rdd)) # list(1, 3) -#} -# @rdname keys -# @aliases keys,RDD -setMethod("keys", - signature(x = "RDD"), - function(x) { - func <- function(k) { - k[[1]] - } - lapply(x, func) - }) - -# Return an RDD with the values of each tuple. -# -# @param x The RDD from which the values of each tuple is returned. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) -# collect(values(rdd)) # list(2, 4) -#} -# @rdname values -# @aliases values,RDD -setMethod("values", - signature(x = "RDD"), - function(x) { - func <- function(v) { - v[[2]] - } - lapply(x, func) - }) - -# Applies a function to all values of the elements, without modifying the keys. -# -# The same as `mapValues()' in Spark. -# -# @param X The RDD to apply the transformation. -# @param FUN the transformation to apply on the value of each element. -# @return a new RDD created by the transformation. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, 1:10) -# makePairs <- lapply(rdd, function(x) { list(x, x) }) -# collect(mapValues(makePairs, function(x) { x * 2) }) -# Output: list(list(1,2), list(2,4), list(3,6), ...) -#} -# @rdname mapValues -# @aliases mapValues,RDD,function-method -setMethod("mapValues", - signature(X = "RDD", FUN = "function"), - function(X, FUN) { - func <- function(x) { - list(x[[1]], FUN(x[[2]])) - } - lapply(X, func) - }) - -# Pass each value in the key-value pair RDD through a flatMap function without -# changing the keys; this also retains the original RDD's partitioning. -# -# The same as 'flatMapValues()' in Spark. -# -# @param X The RDD to apply the transformation. -# @param FUN the transformation to apply on the value of each element. -# @return a new RDD created by the transformation. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4)))) -# collect(flatMapValues(rdd, function(x) { x })) -# Output: list(list(1,1), list(1,2), list(2,3), list(2,4)) -#} -# @rdname flatMapValues -# @aliases flatMapValues,RDD,function-method -setMethod("flatMapValues", - signature(X = "RDD", FUN = "function"), - function(X, FUN) { - flatMapFunc <- function(x) { - lapply(FUN(x[[2]]), function(v) { list(x[[1]], v) }) - } - flatMap(X, flatMapFunc) - }) - -############ Shuffle Functions ############ - -# Partition an RDD by key -# -# This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -# For each element of this RDD, the partitioner is used to compute a hash -# function and the RDD is partitioned using this hash value. -# -# @param x The RDD to partition. Should be an RDD where each element is -# list(K, V) or c(K, V). -# @param numPartitions Number of partitions to create. -# @param ... Other optional arguments to partitionBy. -# -# @param partitionFunc The partition function to use. Uses a default hashCode -# function if not provided -# @return An RDD partitioned using the specified partitioner. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -# rdd <- parallelize(sc, pairs) -# parts <- partitionBy(rdd, 2L) -# collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4) -#} -# @rdname partitionBy -# @aliases partitionBy,RDD,integer-method -setMethod("partitionBy", - signature(x = "RDD", numPartitions = "numeric"), - function(x, numPartitions, partitionFunc = hashCode) { - - #if (missing(partitionFunc)) { - # partitionFunc <- hashCode - #} - - partitionFunc <- cleanClosure(partitionFunc) - serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL) - - packageNamesArr <- serialize(.sparkREnv$.packages, - connection = NULL) - broadcastArr <- lapply(ls(.broadcastNames), - function(name) { get(name, .broadcastNames) }) - jrdd <- getJRDD(x) - - # We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])], - # where the key is the target partition number, the value is - # the content (key-val pairs). - pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD", - callJMethod(jrdd, "rdd"), - numToInt(numPartitions), - serializedHashFuncBytes, - getSerializedMode(x), - packageNamesArr, - broadcastArr, - callJMethod(jrdd, "classTag")) - - # Create a corresponding partitioner. - rPartitioner <- newJObject("org.apache.spark.HashPartitioner", - numToInt(numPartitions)) - - # Call partitionBy on the obtained PairwiseRDD. - javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD") - javaPairRDD <- callJMethod(javaPairRDD, "partitionBy", rPartitioner) - - # Call .values() on the result to get back the final result, the - # shuffled acutal content key-val pairs. - r <- callJMethod(javaPairRDD, "values") - - RDD(r, serializedMode = "byte") - }) - -# Group values by key -# -# This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -# and group values for each key in the RDD into a single sequence. -# -# @param x The RDD to group. Should be an RDD where each element is -# list(K, V) or c(K, V). -# @param numPartitions Number of partitions to create. -# @return An RDD where each element is list(K, list(V)) -# @seealso reduceByKey -# @examples -#\dontrun{ -# sc <- sparkR.init() -# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -# rdd <- parallelize(sc, pairs) -# parts <- groupByKey(rdd, 2L) -# grouped <- collect(parts) -# grouped[[1]] # Should be a list(1, list(2, 4)) -#} -# @rdname groupByKey -# @aliases groupByKey,RDD,integer-method -setMethod("groupByKey", - signature(x = "RDD", numPartitions = "numeric"), - function(x, numPartitions) { - shuffled <- partitionBy(x, numPartitions) - groupVals <- function(part) { - vals <- new.env() - keys <- new.env() - pred <- function(item) exists(item$hash, keys) - appendList <- function(acc, i) { - addItemToAccumulator(acc, i) - acc - } - makeList <- function(i) { - acc <- initAccumulator() - addItemToAccumulator(acc, i) - acc - } - # Each item in the partition is list of (K, V) - lapply(part, - function(item) { - item$hash <- as.character(hashCode(item[[1]])) - updateOrCreatePair(item, keys, vals, pred, - appendList, makeList) - }) - # extract out data field - vals <- eapply(vals, - function(i) { - length(i$data) <- i$counter - i$data - }) - # Every key in the environment contains a list - # Convert that to list(K, Seq[V]) - convertEnvsToList(keys, vals) - } - lapplyPartition(shuffled, groupVals) - }) - -# Merge values by key -# -# This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -# and merges the values for each key using an associative reduce function. -# -# @param x The RDD to reduce by key. Should be an RDD where each element is -# list(K, V) or c(K, V). -# @param combineFunc The associative reduce function to use. -# @param numPartitions Number of partitions to create. -# @return An RDD where each element is list(K, V') where V' is the merged -# value -# @examples -#\dontrun{ -# sc <- sparkR.init() -# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -# rdd <- parallelize(sc, pairs) -# parts <- reduceByKey(rdd, "+", 2L) -# reduced <- collect(parts) -# reduced[[1]] # Should be a list(1, 6) -#} -# @rdname reduceByKey -# @aliases reduceByKey,RDD,integer-method -setMethod("reduceByKey", - signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"), - function(x, combineFunc, numPartitions) { - reduceVals <- function(part) { - vals <- new.env() - keys <- new.env() - pred <- function(item) exists(item$hash, keys) - lapply(part, - function(item) { - item$hash <- as.character(hashCode(item[[1]])) - updateOrCreatePair(item, keys, vals, pred, combineFunc, identity) - }) - convertEnvsToList(keys, vals) - } - locallyReduced <- lapplyPartition(x, reduceVals) - shuffled <- partitionBy(locallyReduced, numToInt(numPartitions)) - lapplyPartition(shuffled, reduceVals) - }) - -# Merge values by key locally -# -# This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -# and merges the values for each key using an associative reduce function, but return the -# results immediately to the driver as an R list. -# -# @param x The RDD to reduce by key. Should be an RDD where each element is -# list(K, V) or c(K, V). -# @param combineFunc The associative reduce function to use. -# @return A list of elements of type list(K, V') where V' is the merged value for each key -# @seealso reduceByKey -# @examples -#\dontrun{ -# sc <- sparkR.init() -# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -# rdd <- parallelize(sc, pairs) -# reduced <- reduceByKeyLocally(rdd, "+") -# reduced # list(list(1, 6), list(1.1, 3)) -#} -# @rdname reduceByKeyLocally -# @aliases reduceByKeyLocally,RDD,integer-method -setMethod("reduceByKeyLocally", - signature(x = "RDD", combineFunc = "ANY"), - function(x, combineFunc) { - reducePart <- function(part) { - vals <- new.env() - keys <- new.env() - pred <- function(item) exists(item$hash, keys) - lapply(part, - function(item) { - item$hash <- as.character(hashCode(item[[1]])) - updateOrCreatePair(item, keys, vals, pred, combineFunc, identity) - }) - list(list(keys, vals)) # return hash to avoid re-compute in merge - } - mergeParts <- function(accum, x) { - pred <- function(item) { - exists(item$hash, accum[[1]]) - } - lapply(ls(x[[1]]), - function(name) { - item <- list(x[[1]][[name]], x[[2]][[name]]) - item$hash <- name - updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity) - }) - accum - } - reduced <- mapPartitions(x, reducePart) - merged <- reduce(reduced, mergeParts) - convertEnvsToList(merged[[1]], merged[[2]]) - }) - -# Combine values by key -# -# Generic function to combine the elements for each key using a custom set of -# aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], -# for a "combined type" C. Note that V and C can be different -- for example, one -# might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). - -# Users provide three functions: -# \itemize{ -# \item createCombiner, which turns a V into a C (e.g., creates a one-element list) -# \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) - -# \item mergeCombiners, to combine two C's into a single one (e.g., concatenates -# two lists). -# } -# -# @param x The RDD to combine. Should be an RDD where each element is -# list(K, V) or c(K, V). -# @param createCombiner Create a combiner (C) given a value (V) -# @param mergeValue Merge the given value (V) with an existing combiner (C) -# @param mergeCombiners Merge two combiners and return a new combiner -# @param numPartitions Number of partitions to create. -# @return An RDD where each element is list(K, C) where C is the combined type -# -# @seealso groupByKey, reduceByKey -# @examples -#\dontrun{ -# sc <- sparkR.init() -# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -# rdd <- parallelize(sc, pairs) -# parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L) -# combined <- collect(parts) -# combined[[1]] # Should be a list(1, 6) -#} -# @rdname combineByKey -# @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method -setMethod("combineByKey", - signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY", - mergeCombiners = "ANY", numPartitions = "numeric"), - function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) { - combineLocally <- function(part) { - combiners <- new.env() - keys <- new.env() - pred <- function(item) exists(item$hash, keys) - lapply(part, - function(item) { - item$hash <- as.character(hashCode(item[[1]])) - updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner) - }) - convertEnvsToList(keys, combiners) - } - locallyCombined <- lapplyPartition(x, combineLocally) - shuffled <- partitionBy(locallyCombined, numToInt(numPartitions)) - mergeAfterShuffle <- function(part) { - combiners <- new.env() - keys <- new.env() - pred <- function(item) exists(item$hash, keys) - lapply(part, - function(item) { - item$hash <- as.character(hashCode(item[[1]])) - updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity) - }) - convertEnvsToList(keys, combiners) - } - lapplyPartition(shuffled, mergeAfterShuffle) - }) - -# Aggregate a pair RDD by each key. -# -# Aggregate the values of each key in an RDD, using given combine functions -# and a neutral "zero value". This function can return a different result type, -# U, than the type of the values in this RDD, V. Thus, we need one operation -# for merging a V into a U and one operation for merging two U's, The former -# operation is used for merging values within a partition, and the latter is -# used for merging values between partitions. To avoid memory allocation, both -# of these functions are allowed to modify and return their first argument -# instead of creating a new U. -# -# @param x An RDD. -# @param zeroValue A neutral "zero value". -# @param seqOp A function to aggregate the values of each key. It may return -# a different result type from the type of the values. -# @param combOp A function to aggregate results of seqOp. -# @return An RDD containing the aggregation result. -# @seealso foldByKey, combineByKey -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) -# zeroValue <- list(0, 0) -# seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } -# combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } -# aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) -# # list(list(1, list(3, 2)), list(2, list(7, 2))) -#} -# @rdname aggregateByKey -# @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method -setMethod("aggregateByKey", - signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", - combOp = "ANY", numPartitions = "numeric"), - function(x, zeroValue, seqOp, combOp, numPartitions) { - createCombiner <- function(v) { - do.call(seqOp, list(zeroValue, v)) - } - - combineByKey(x, createCombiner, seqOp, combOp, numPartitions) - }) - -# Fold a pair RDD by each key. -# -# Aggregate the values of each key in an RDD, using an associative function "func" -# and a neutral "zero value" which may be added to the result an arbitrary -# number of times, and must not change the result (e.g., 0 for addition, or -# 1 for multiplication.). -# -# @param x An RDD. -# @param zeroValue A neutral "zero value". -# @param func An associative function for folding values of each key. -# @return An RDD containing the aggregation result. -# @seealso aggregateByKey, combineByKey -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) -# foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7)) -#} -# @rdname foldByKey -# @aliases foldByKey,RDD,ANY,ANY,integer-method -setMethod("foldByKey", - signature(x = "RDD", zeroValue = "ANY", - func = "ANY", numPartitions = "numeric"), - function(x, zeroValue, func, numPartitions) { - aggregateByKey(x, zeroValue, func, func, numPartitions) - }) - -############ Binary Functions ############# - -# Join two RDDs -# -# @description -# \code{join} This function joins two RDDs where every element is of the form list(K, V). -# The key types of the two RDDs should be the same. -# -# @param x An RDD to be joined. Should be an RDD where each element is -# list(K, V). -# @param y An RDD to be joined. Should be an RDD where each element is -# list(K, V). -# @param numPartitions Number of partitions to create. -# @return a new RDD containing all pairs of elements with matching keys in -# two input RDDs. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) -# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) -# join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3)) -#} -# @rdname join-methods -# @aliases join,RDD,RDD-method -setMethod("join", - signature(x = "RDD", y = "RDD"), - function(x, y, numPartitions) { - xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) - yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) - - doJoin <- function(v) { - joinTaggedList(v, list(FALSE, FALSE)) - } - - joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), - doJoin) - }) - -# Left outer join two RDDs -# -# @description -# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of -# the form list(K, V). The key types of the two RDDs should be the same. -# -# @param x An RDD to be joined. Should be an RDD where each element is -# list(K, V). -# @param y An RDD to be joined. Should be an RDD where each element is -# list(K, V). -# @param numPartitions Number of partitions to create. -# @return For each element (k, v) in x, the resulting RDD will either contain -# all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL)) -# if no elements in rdd2 have key k. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) -# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) -# leftOuterJoin(rdd1, rdd2, 2L) -# # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL))) -#} -# @rdname join-methods -# @aliases leftOuterJoin,RDD,RDD-method -setMethod("leftOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "numeric"), - function(x, y, numPartitions) { - xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) - yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) - - doJoin <- function(v) { - joinTaggedList(v, list(FALSE, TRUE)) - } - - joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) - }) - -# Right outer join two RDDs -# -# @description -# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of -# the form list(K, V). The key types of the two RDDs should be the same. -# -# @param x An RDD to be joined. Should be an RDD where each element is -# list(K, V). -# @param y An RDD to be joined. Should be an RDD where each element is -# list(K, V). -# @param numPartitions Number of partitions to create. -# @return For each element (k, w) in y, the resulting RDD will either contain -# all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w)) -# if no elements in x have key k. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3))) -# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) -# rightOuterJoin(rdd1, rdd2, 2L) -# # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4))) -#} -# @rdname join-methods -# @aliases rightOuterJoin,RDD,RDD-method -setMethod("rightOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "numeric"), - function(x, y, numPartitions) { - xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) - yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) - - doJoin <- function(v) { - joinTaggedList(v, list(TRUE, FALSE)) - } - - joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) - }) - -# Full outer join two RDDs -# -# @description -# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of -# the form list(K, V). The key types of the two RDDs should be the same. -# -# @param x An RDD to be joined. Should be an RDD where each element is -# list(K, V). -# @param y An RDD to be joined. Should be an RDD where each element is -# list(K, V). -# @param numPartitions Number of partitions to create. -# @return For each element (k, v) in x and (k, w) in y, the resulting RDD -# will contain all pairs (k, (v, w)) for both (k, v) in x and -# (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements -# in x/y have key k. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3))) -# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) -# fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)), -# # list(1, list(3, 1)), -# # list(2, list(NULL, 4))) -# # list(3, list(3, NULL)), -#} -# @rdname join-methods -# @aliases fullOuterJoin,RDD,RDD-method -setMethod("fullOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "numeric"), - function(x, y, numPartitions) { - xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) - yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) - - doJoin <- function(v) { - joinTaggedList(v, list(TRUE, TRUE)) - } - - joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) - }) - -# For each key k in several RDDs, return a resulting RDD that -# whose values are a list of values for the key in all RDDs. -# -# @param ... Several RDDs. -# @param numPartitions Number of partitions to create. -# @return a new RDD containing all pairs of elements with values in a list -# in all RDDs. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) -# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) -# cogroup(rdd1, rdd2, numPartitions = 2L) -# # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list())) -#} -# @rdname cogroup -# @aliases cogroup,RDD-method -setMethod("cogroup", - "RDD", - function(..., numPartitions) { - rdds <- list(...) - rddsLen <- length(rdds) - for (i in 1:rddsLen) { - rdds[[i]] <- lapply(rdds[[i]], - function(x) { list(x[[1]], list(i, x[[2]])) }) - } - union.rdd <- Reduce(unionRDD, rdds) - group.func <- function(vlist) { - res <- list() - length(res) <- rddsLen - for (x in vlist) { - i <- x[[1]] - acc <- res[[i]] - # Create an accumulator. - if (is.null(acc)) { - acc <- initAccumulator() - } - addItemToAccumulator(acc, x[[2]]) - res[[i]] <- acc - } - lapply(res, function(acc) { - if (is.null(acc)) { - list() - } else { - acc$data - } - }) - } - cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions), - group.func) - }) - -# Sort a (k, v) pair RDD by k. -# -# @param x A (k, v) pair RDD to be sorted. -# @param ascending A flag to indicate whether the sorting is ascending or descending. -# @param numPartitions Number of partitions to create. -# @return An RDD where all (k, v) pair elements are sorted. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3))) -# collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1)) -#} -# @rdname sortByKey -# @aliases sortByKey,RDD,RDD-method -setMethod("sortByKey", - signature(x = "RDD"), - function(x, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) { - rangeBounds <- list() - - if (numPartitions > 1) { - rddSize <- count(x) - # constant from Spark's RangePartitioner - maxSampleSize <- numPartitions * 20 - fraction <- min(maxSampleSize / max(rddSize, 1), 1.0) - - samples <- collect(keys(sampleRDD(x, FALSE, fraction, 1L))) - - # Note: the built-in R sort() function only works on atomic vectors - samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending) - - if (length(samples) > 0) { - rangeBounds <- lapply(seq_len(numPartitions - 1), - function(i) { - j <- ceiling(length(samples) * i / numPartitions) - samples[j] - }) - } - } - - rangePartitionFunc <- function(key) { - partition <- 0 - - # TODO: Use binary search instead of linear search, similar with Spark - while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) { - partition <- partition + 1 - } - - if (ascending) { - partition - } else { - numPartitions - partition - 1 - } - } - - partitionFunc <- function(part) { - sortKeyValueList(part, decreasing = !ascending) - } - - newRDD <- partitionBy(x, numPartitions, rangePartitionFunc) - lapplyPartition(newRDD, partitionFunc) - }) - -# Subtract a pair RDD with another pair RDD. -# -# Return an RDD with the pairs from x whose keys are not in other. -# -# @param x An RDD. -# @param other An RDD. -# @param numPartitions Number of the partitions in the result RDD. -# @return An RDD with the pairs from x whose keys are not in other. -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4), -# list("b", 5), list("a", 2))) -# rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1))) -# collect(subtractByKey(rdd1, rdd2)) -# # list(list("b", 4), list("b", 5)) -#} -# @rdname subtractByKey -# @aliases subtractByKey,RDD -setMethod("subtractByKey", - signature(x = "RDD", other = "RDD"), - function(x, other, numPartitions = SparkR:::numPartitions(x)) { - filterFunction <- function(elem) { - iters <- elem[[2]] - (length(iters[[1]]) > 0) && (length(iters[[2]]) == 0) - } - - flatMapValues(filterRDD(cogroup(x, - other, - numPartitions = numPartitions), - filterFunction), - function (v) { v[[1]] }) - }) - -# Return a subset of this RDD sampled by key. -# -# @description -# \code{sampleByKey} Create a sample of this RDD using variable sampling rates -# for different keys as specified by fractions, a key to sampling rate map. -# -# @param x The RDD to sample elements by key, where each element is -# list(K, V) or c(K, V). -# @param withReplacement Sampling with replacement or not -# @param fraction The (rough) sample target fraction -# @param seed Randomness seed value -# @examples -#\dontrun{ -# sc <- sparkR.init() -# rdd <- parallelize(sc, 1:3000) -# pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x) -# else { if (x %% 3 == 1) list("b", x) else list("c", x) }}) -# fractions <- list(a = 0.2, b = 0.1, c = 0.3) -# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) -# 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE -# 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE -# 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE -# lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE -# lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE -# lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE -# lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE -# lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE -# lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE -# fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4) -# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored -# fractions <- list(a = 0.2, b = 0.1) -# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c" -#} -# @rdname sampleByKey -# @aliases sampleByKey,RDD-method -setMethod("sampleByKey", - signature(x = "RDD", withReplacement = "logical", - fractions = "vector", seed = "integer"), - function(x, withReplacement, fractions, seed) { - - for (elem in fractions) { - if (elem < 0.0) { - stop(paste("Negative fraction value ", fractions[which(fractions == elem)])) - } - } - - # The sampler: takes a partition and returns its sampled version. - samplingFunc <- function(partIndex, part) { - set.seed(bitwXor(seed, partIndex)) - res <- vector("list", length(part)) - len <- 0 - - # mixing because the initial seeds are close to each other - runif(10) - - for (elem in part) { - if (elem[[1]] %in% names(fractions)) { - frac <- as.numeric(fractions[which(elem[[1]] == names(fractions))]) - if (withReplacement) { - count <- rpois(1, frac) - if (count > 0) { - res[ (len + 1) : (len + count) ] <- rep(list(elem), count) - len <- len + count - } - } else { - if (runif(1) < frac) { - len <- len + 1 - res[[len]] <- elem - } - } - } else { - stop("KeyError: \"", elem[[1]], "\"") - } - } - - # TODO(zongheng): look into the performance of the current - # implementation. Look into some iterator package? Note that - # Scala avoids many calls to creating an empty list and PySpark - # similarly achieves this using `yield'. (duplicated from sampleRDD) - if (len > 0) { - res[1:len] - } else { - list() - } - } - - lapplyPartitionsWithIndex(x, samplingFunc) - }) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/01936c1d/sparkr-interpreter/src/main/resources/R/pkg/R/schema.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/schema.R b/sparkr-interpreter/src/main/resources/R/pkg/R/schema.R deleted file mode 100644 index 79c744e..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/schema.R +++ /dev/null @@ -1,166 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# A set of S3 classes and methods that support the SparkSQL `StructType` and `StructField -# datatypes. These are used to create and interact with DataFrame schemas. - -#' structType -#' -#' Create a structType object that contains the metadata for a DataFrame. Intended for -#' use with createDataFrame and toDF. -#' -#' @param x a structField object (created with the field() function) -#' @param ... additional structField objects -#' @return a structType object -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) -#' schema <- structType(structField("a", "integer"), structField("b", "string")) -#' df <- createDataFrame(sqlCtx, rdd, schema) -#' } -structType <- function(x, ...) { - UseMethod("structType", x) -} - -structType.jobj <- function(x) { - obj <- structure(list(), class = "structType") - obj$jobj <- x - obj$fields <- function() { lapply(callJMethod(obj$jobj, "fields"), structField) } - obj -} - -structType.structField <- function(x, ...) { - fields <- list(x, ...) - if (!all(sapply(fields, inherits, "structField"))) { - stop("All arguments must be structField objects.") - } - sfObjList <- lapply(fields, function(field) { - field$jobj - }) - stObj <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "createStructType", - listToSeq(sfObjList)) - structType(stObj) -} - -#' Print a Spark StructType. -#' -#' This function prints the contents of a StructType returned from the -#' SparkR JVM backend. -#' -#' @param x A StructType object -#' @param ... further arguments passed to or from other methods -print.structType <- function(x, ...) { - cat("StructType\n", - sapply(x$fields(), - function(field) { - paste("|-", "name = \"", field$name(), - "\", type = \"", field$dataType.toString(), - "\", nullable = ", field$nullable(), "\n", - sep = "") - }), - sep = "") -} - -#' structField -#' -#' Create a structField object that contains the metadata for a single field in a schema. -#' -#' @param x The name of the field -#' @param type The data type of the field -#' @param nullable A logical vector indicating whether or not the field is nullable -#' @return a structField object -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) -#' field1 <- structField("a", "integer", TRUE) -#' field2 <- structField("b", "string", TRUE) -#' schema <- structType(field1, field2) -#' df <- createDataFrame(sqlCtx, rdd, schema) -#' } - -structField <- function(x, ...) { - UseMethod("structField", x) -} - -structField.jobj <- function(x) { - obj <- structure(list(), class = "structField") - obj$jobj <- x - obj$name <- function() { callJMethod(x, "name") } - obj$dataType <- function() { callJMethod(x, "dataType") } - obj$dataType.toString <- function() { callJMethod(obj$dataType(), "toString") } - obj$dataType.simpleString <- function() { callJMethod(obj$dataType(), "simpleString") } - obj$nullable <- function() { callJMethod(x, "nullable") } - obj -} - -structField.character <- function(x, type, nullable = TRUE) { - if (class(x) != "character") { - stop("Field name must be a string.") - } - if (class(type) != "character") { - stop("Field type must be a string.") - } - if (class(nullable) != "logical") { - stop("nullable must be either TRUE or FALSE") - } - options <- c("byte", - "integer", - "float", - "double", - "numeric", - "character", - "string", - "binary", - "raw", - "logical", - "boolean", - "timestamp", - "date") - dataType <- if (type %in% options) { - type - } else { - stop(paste("Unsupported type for Dataframe:", type)) - } - sfObj <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "createStructField", - x, - dataType, - nullable) - structField(sfObj) -} - -#' Print a Spark StructField. -#' -#' This function prints the contents of a StructField returned from the -#' SparkR JVM backend. -#' -#' @param x A StructField object -#' @param ... further arguments passed to or from other methods -print.structField <- function(x, ...) { - cat("StructField(name = \"", x$name(), - "\", type = \"", x$dataType.toString(), - "\", nullable = ", x$nullable(), - ")", - sep = "") -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/01936c1d/sparkr-interpreter/src/main/resources/R/pkg/R/serialize.R ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/serialize.R b/sparkr-interpreter/src/main/resources/R/pkg/R/serialize.R deleted file mode 100644 index 311021e..0000000 --- a/sparkr-interpreter/src/main/resources/R/pkg/R/serialize.R +++ /dev/null @@ -1,208 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Utility functions to serialize R objects so they can be read in Java. - -# Type mapping from R to Java -# -# NULL -> Void -# integer -> Int -# character -> String -# logical -> Boolean -# double, numeric -> Double -# raw -> Array[Byte] -# Date -> Date -# POSIXct,POSIXlt -> Time -# -# list[T] -> Array[T], where T is one of above mentioned types -# environment -> Map[String, T], where T is a native type -# jobj -> Object, where jobj is an object created in the backend - -writeObject <- function(con, object, writeType = TRUE) { - # NOTE: In R vectors have same type as objects. So we don't support - # passing in vectors as arrays and instead require arrays to be passed - # as lists. - type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt") - # Checking types is needed here, since âis.naâ only handles atomic vectors, - # lists and pairlists - if (type %in% c("integer", "character", "logical", "double", "numeric")) { - if (is.na(object)) { - object <- NULL - type <- "NULL" - } - } - if (writeType) { - writeType(con, type) - } - switch(type, - NULL = writeVoid(con), - integer = writeInt(con, object), - character = writeString(con, object), - logical = writeBoolean(con, object), - double = writeDouble(con, object), - numeric = writeDouble(con, object), - raw = writeRaw(con, object), - list = writeList(con, object), - jobj = writeJobj(con, object), - environment = writeEnv(con, object), - Date = writeDate(con, object), - POSIXlt = writeTime(con, object), - POSIXct = writeTime(con, object), - stop(paste("Unsupported type for serialization", type))) -} - -writeVoid <- function(con) { - # no value for NULL -} - -writeJobj <- function(con, value) { - if (!isValidJobj(value)) { - stop("invalid jobj ", value$id) - } - writeString(con, value$id) -} - -writeString <- function(con, value) { - utfVal <- enc2utf8(value) - writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1)) - writeBin(utfVal, con, endian = "big") -} - -writeInt <- function(con, value) { - writeBin(as.integer(value), con, endian = "big") -} - -writeDouble <- function(con, value) { - writeBin(value, con, endian = "big") -} - -writeBoolean <- function(con, value) { - # TRUE becomes 1, FALSE becomes 0 - writeInt(con, as.integer(value)) -} - -writeRawSerialize <- function(outputCon, batch) { - outputSer <- serialize(batch, ascii = FALSE, connection = NULL) - writeRaw(outputCon, outputSer) -} - -writeRowSerialize <- function(outputCon, rows) { - invisible(lapply(rows, function(r) { - bytes <- serializeRow(r) - writeRaw(outputCon, bytes) - })) -} - -serializeRow <- function(row) { - rawObj <- rawConnection(raw(0), "wb") - on.exit(close(rawObj)) - writeRow(rawObj, row) - rawConnectionValue(rawObj) -} - -writeRow <- function(con, row) { - numCols <- length(row) - writeInt(con, numCols) - for (i in 1:numCols) { - writeObject(con, row[[i]]) - } -} - -writeRaw <- function(con, batch) { - writeInt(con, length(batch)) - writeBin(batch, con, endian = "big") -} - -writeType <- function(con, class) { - type <- switch(class, - NULL = "n", - integer = "i", - character = "c", - logical = "b", - double = "d", - numeric = "d", - raw = "r", - list = "l", - jobj = "j", - environment = "e", - Date = "D", - POSIXlt = "t", - POSIXct = "t", - stop(paste("Unsupported type for serialization", class))) - writeBin(charToRaw(type), con) -} - -# Used to pass arrays where all the elements are of the same type -writeList <- function(con, arr) { - # All elements should be of same type - elemType <- unique(sapply(arr, function(elem) { class(elem) })) - stopifnot(length(elemType) <= 1) - - # TODO: Empty lists are given type "character" right now. - # This may not work if the Java side expects array of any other type. - if (length(elemType) == 0) { - elemType <- class("somestring") - } - - writeType(con, elemType) - writeInt(con, length(arr)) - - if (length(arr) > 0) { - for (a in arr) { - writeObject(con, a, FALSE) - } - } -} - -# Used to pass arrays where the elements can be of different types -writeGenericList <- function(con, list) { - writeInt(con, length(list)) - for (elem in list) { - writeObject(con, elem) - } -} - -# Used to pass in hash maps required on Java side. -writeEnv <- function(con, env) { - len <- length(env) - - writeInt(con, len) - if (len > 0) { - writeList(con, as.list(ls(env))) - vals <- lapply(ls(env), function(x) { env[[x]] }) - writeGenericList(con, as.list(vals)) - } -} - -writeDate <- function(con, date) { - writeString(con, as.character(date)) -} - -writeTime <- function(con, time) { - writeDouble(con, as.double(time)) -} - -# Used to serialize in a list of objects where each -# object can be of a different type. Serialization format is -# <object type> <object> for each object -writeArgs <- function(con, args) { - if (length(args) > 0) { - for (a in args) { - writeObject(con, a) - } - } -}