spark git commit: [SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide
Repository: spark Updated Branches: refs/heads/branch-2.1 c13378796 -> e2f773923 [SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide ## What changes were proposed in this pull request? - A separate subsection for Aggregations under âGetting Startedâ in the Spark SQL programming guide. It mentions which aggregate functions are predefined and how users can create their own. - Examples of using the `UserDefinedAggregateFunction` abstract class for untyped aggregations in Java and Scala. - Examples of using the `Aggregator` abstract class for type-safe aggregations in Java and Scala. - Python is not covered. - The PR might not resolve the ticket since I do not know what exactly was planned by the author. In total, there are four new standalone examples that can be executed via `spark-submit` or `run-example`. The updated Spark SQL programming guide references to these examples and does not contain hard-coded snippets. ## How was this patch tested? The patch was tested locally by building the docs. The examples were run as well. ![image](https://cloud.githubusercontent.com/assets/6235869/21292915/04d9d084-c515-11e6-811a-999d598dffba.png) Author: aokolnychyiCloses #16329 from aokolnychyi/SPARK-16046. (cherry picked from commit 3fdce814348fae34df379a6ab9655dbbb2c3427c) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2f77392 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2f77392 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2f77392 Branch: refs/heads/branch-2.1 Commit: e2f773923d3c61a620255e1f792c97e8999fa157 Parents: c133787 Author: aokolnychyi Authored: Tue Jan 24 22:13:17 2017 -0800 Committer: gatorsmile Committed: Tue Jan 24 22:13:35 2017 -0800 -- docs/sql-programming-guide.md | 46 ++ .../sql/JavaUserDefinedTypedAggregation.java| 160 +++ .../sql/JavaUserDefinedUntypedAggregation.java | 132 +++ examples/src/main/resources/employees.json | 4 + .../sql/UserDefinedTypedAggregation.scala | 91 +++ .../sql/UserDefinedUntypedAggregation.scala | 100 6 files changed, 533 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2f77392/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index fb3c6a7..ffe0f39 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -382,6 +382,52 @@ For example: +## Aggregations + +The [built-in DataFrames functions](api/scala/index.html#org.apache.spark.sql.functions$) provide common +aggregations such as `count()`, `countDistinct()`, `avg()`, `max()`, `min()`, etc. +While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in +[Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$) and +[Java](api/java/org/apache/spark/sql/expressions/javalang/typed.html) to work with strongly typed Datasets. +Moreover, users are not limited to the predefined aggregate functions and can create their own. + +### Untyped User-Defined Aggregate Functions + + + + + +Users have to extend the [UserDefinedAggregateFunction](api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction) +abstract class to implement a custom untyped aggregate function. For example, a user-defined average +can look like: + +{% include_example untyped_custom_aggregation scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala%} + + + + +{% include_example untyped_custom_aggregation java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java%} + + + + +### Type-Safe User-Defined Aggregate Functions + +User-defined aggregations for strongly typed Datasets revolve around the [Aggregator](api/scala/index.html#org.apache.spark.sql.expressions.Aggregator) abstract class. +For example, a type-safe user-defined average can look like: + + + + +{% include_example typed_custom_aggregation scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala%} + + + + +{% include_example typed_custom_aggregation java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java%} + + + # Data Sources http://git-wip-us.apache.org/repos/asf/spark/blob/e2f77392/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java
spark git commit: [SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide
Repository: spark Updated Branches: refs/heads/master 40a4cfc7c -> 3fdce8143 [SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide ## What changes were proposed in this pull request? - A separate subsection for Aggregations under âGetting Startedâ in the Spark SQL programming guide. It mentions which aggregate functions are predefined and how users can create their own. - Examples of using the `UserDefinedAggregateFunction` abstract class for untyped aggregations in Java and Scala. - Examples of using the `Aggregator` abstract class for type-safe aggregations in Java and Scala. - Python is not covered. - The PR might not resolve the ticket since I do not know what exactly was planned by the author. In total, there are four new standalone examples that can be executed via `spark-submit` or `run-example`. The updated Spark SQL programming guide references to these examples and does not contain hard-coded snippets. ## How was this patch tested? The patch was tested locally by building the docs. The examples were run as well. ![image](https://cloud.githubusercontent.com/assets/6235869/21292915/04d9d084-c515-11e6-811a-999d598dffba.png) Author: aokolnychyiCloses #16329 from aokolnychyi/SPARK-16046. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fdce814 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fdce814 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fdce814 Branch: refs/heads/master Commit: 3fdce814348fae34df379a6ab9655dbbb2c3427c Parents: 40a4cfc Author: aokolnychyi Authored: Tue Jan 24 22:13:17 2017 -0800 Committer: gatorsmile Committed: Tue Jan 24 22:13:17 2017 -0800 -- docs/sql-programming-guide.md | 46 ++ .../sql/JavaUserDefinedTypedAggregation.java| 160 +++ .../sql/JavaUserDefinedUntypedAggregation.java | 132 +++ examples/src/main/resources/employees.json | 4 + .../sql/UserDefinedTypedAggregation.scala | 91 +++ .../sql/UserDefinedUntypedAggregation.scala | 100 6 files changed, 533 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3fdce814/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index f4c89e5..c60088d 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -382,6 +382,52 @@ For example: +## Aggregations + +The [built-in DataFrames functions](api/scala/index.html#org.apache.spark.sql.functions$) provide common +aggregations such as `count()`, `countDistinct()`, `avg()`, `max()`, `min()`, etc. +While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in +[Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$) and +[Java](api/java/org/apache/spark/sql/expressions/javalang/typed.html) to work with strongly typed Datasets. +Moreover, users are not limited to the predefined aggregate functions and can create their own. + +### Untyped User-Defined Aggregate Functions + + + + + +Users have to extend the [UserDefinedAggregateFunction](api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction) +abstract class to implement a custom untyped aggregate function. For example, a user-defined average +can look like: + +{% include_example untyped_custom_aggregation scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala%} + + + + +{% include_example untyped_custom_aggregation java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java%} + + + + +### Type-Safe User-Defined Aggregate Functions + +User-defined aggregations for strongly typed Datasets revolve around the [Aggregator](api/scala/index.html#org.apache.spark.sql.expressions.Aggregator) abstract class. +For example, a type-safe user-defined average can look like: + + + + +{% include_example typed_custom_aggregation scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala%} + + + + +{% include_example typed_custom_aggregation java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java%} + + + # Data Sources http://git-wip-us.apache.org/repos/asf/spark/blob/3fdce814/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java new file mode 100644 index 000..78e9011 ---
spark git commit: [SPARK-19330][DSTREAMS] Also show tooltip for successful batches
Repository: spark Updated Branches: refs/heads/branch-2.1 b94fb284b -> c13378796 [SPARK-19330][DSTREAMS] Also show tooltip for successful batches ## What changes were proposed in this pull request? ### Before ![_streaming_before](https://cloud.githubusercontent.com/assets/15843379/22181462/1e45c20c-e0c8-11e6-831c-8bf69722a4ee.png) ### After ![_streaming_after](https://cloud.githubusercontent.com/assets/15843379/22181464/23f38a40-e0c8-11e6-9a87-e27b1ffb1935.png) ## How was this patch tested? Manually Author: Liwei LinCloses #16673 from lw-lin/streaming. (cherry picked from commit 40a4cfc7c7911107d1cf7a2663469031dcf1f576) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1337879 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1337879 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1337879 Branch: refs/heads/branch-2.1 Commit: c133787965e65e19c0aab636c941b5673e6a68e5 Parents: b94fb28 Author: Liwei Lin Authored: Tue Jan 24 16:36:17 2017 -0800 Committer: Shixiong Zhu Committed: Tue Jan 24 16:36:24 2017 -0800 -- .../org/apache/spark/streaming/ui/static/streaming-page.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1337879/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js -- diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js index f82323a..d004f34 100644 --- a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js +++ b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js @@ -169,7 +169,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) { .style("cursor", "pointer") .attr("cx", function(d) { return x(d.x); }) .attr("cy", function(d) { return y(d.y); }) -.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";}) +.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "3";}) .on('mouseover', function(d) { var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x]; showBootstrapTooltip(d3.select(this).node(), tip); @@ -187,7 +187,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) { .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) .attr("opacity", function(d) { return isFailedBatch(d.x) ? "1" : "0";}) -.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";}); +.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "3";}); }) .on("click", function(d) { if (lastTimeout != null) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19330][DSTREAMS] Also show tooltip for successful batches
Repository: spark Updated Branches: refs/heads/master 15ef3740d -> 40a4cfc7c [SPARK-19330][DSTREAMS] Also show tooltip for successful batches ## What changes were proposed in this pull request? ### Before ![_streaming_before](https://cloud.githubusercontent.com/assets/15843379/22181462/1e45c20c-e0c8-11e6-831c-8bf69722a4ee.png) ### After ![_streaming_after](https://cloud.githubusercontent.com/assets/15843379/22181464/23f38a40-e0c8-11e6-9a87-e27b1ffb1935.png) ## How was this patch tested? Manually Author: Liwei LinCloses #16673 from lw-lin/streaming. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40a4cfc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40a4cfc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40a4cfc7 Branch: refs/heads/master Commit: 40a4cfc7c7911107d1cf7a2663469031dcf1f576 Parents: 15ef374 Author: Liwei Lin Authored: Tue Jan 24 16:36:17 2017 -0800 Committer: Shixiong Zhu Committed: Tue Jan 24 16:36:17 2017 -0800 -- .../org/apache/spark/streaming/ui/static/streaming-page.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40a4cfc7/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js -- diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js index f82323a..d004f34 100644 --- a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js +++ b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js @@ -169,7 +169,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) { .style("cursor", "pointer") .attr("cx", function(d) { return x(d.x); }) .attr("cy", function(d) { return y(d.y); }) -.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";}) +.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "3";}) .on('mouseover', function(d) { var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x]; showBootstrapTooltip(d3.select(this).node(), tip); @@ -187,7 +187,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) { .attr("stroke", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) .attr("fill", function(d) { return isFailedBatch(d.x) ? "red" : "white";}) .attr("opacity", function(d) { return isFailedBatch(d.x) ? "1" : "0";}) -.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "0";}); +.attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "3";}); }) .on("click", function(d) { if (lastTimeout != null) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19334][SQL] Fix the code injection vulnerability related to Generator functions.
Repository: spark Updated Branches: refs/heads/master cdb691eb4 -> 15ef3740d [SPARK-19334][SQL] Fix the code injection vulnerability related to Generator functions. ## What changes were proposed in this pull request? Similar to SPARK-15165, codegen is in danger of arbitrary code injection. The root cause is how variable names are created by codegen. In GenerateExec#codeGenAccessor, a variable name is created like as follows. ``` val value = ctx.freshName(name) ``` The variable `value` is named based on the value of the variable `name` and the value of `name` is from schema given by users so an attacker can attack with queries like as follows. ``` SELECT inline(array(cast(struct(1) AS struct<`=new Object() { {f();} public void f() {throw new RuntimeException("This exception is injected.");} public int x;}.x`:int>))) ``` In the example above, a RuntimeException is thrown but an attacker can replace it with arbitrary code. ## How was this patch tested? Added a new test case. Author: Kousuke SarutaCloses #16681 from sarutak/SPARK-19334. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15ef3740 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15ef3740 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15ef3740 Branch: refs/heads/master Commit: 15ef3740dea3d82f64a030d4523ad542485e1453 Parents: cdb691e Author: Kousuke Saruta Authored: Tue Jan 24 23:35:23 2017 +0100 Committer: Herman van Hovell Committed: Tue Jan 24 23:35:23 2017 +0100 -- .../apache/spark/sql/execution/GenerateExec.scala | 11 +-- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 16 2 files changed, 25 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/15ef3740/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index b52f5c4..69be709 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -181,7 +181,14 @@ case class GenerateExec( val row = codeGenAccessor(ctx, data.value, "col", index, st, nullable, checks) val fieldChecks = checks ++ optionalCode(nullable, row.isNull) val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) => - codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, f.nullable, fieldChecks) + codeGenAccessor( +ctx, +row.value, +s"st_col${i}", +i.toString, +f.dataType, +f.nullable, +fieldChecks) } ("", row.code, columns) @@ -247,7 +254,7 @@ case class GenerateExec( val values = e.dataType match { case ArrayType(st: StructType, nullable) => st.fields.toSeq.zipWithIndex.map { case (f, i) => - codeGenAccessor(ctx, current, f.name, s"$i", f.dataType, f.nullable, checks) + codeGenAccessor(ctx, current, s"st_col${i}", s"$i", f.dataType, f.nullable, checks) } } http://git-wip-us.apache.org/repos/asf/spark/blob/15ef3740/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 07b787a..a77f920 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2548,4 +2548,20 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer( sql("SELECT * FROM `_tbl`"), Row(1) :: Row(2) :: Row(3) :: Nil) } } + + test("SPARK-19334: check code injection is prevented") { +// The end of comment (*/) should be escaped. +val badQuery = + """|SELECT inline(array(cast(struct(1) AS + | struct<`= + |new Object() { + | {f();} + | public void f() {throw new RuntimeException("This exception is injected.");} + | public int x; + |}.x + | `:int>)))""".stripMargin.replaceAll("\n", "") + +checkAnswer(sql(badQuery), Row(1) :: Nil) + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19017][SQL] NOT IN subquery with more than one column may return incorrect results
Repository: spark Updated Branches: refs/heads/master 8f3f73abc -> cdb691eb4 [SPARK-19017][SQL] NOT IN subquery with more than one column may return incorrect results ## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the NULL-aware expression of a NOT IN query is expanded in Rule `RewritePredicateSubquery`. Example: The query select a1,b1 from t1 where (a1,b1) not in (select a2,b2 from t2); has the (a1, b1) = (a2, b2) rewritten from (before this fix): Join LeftAnti, ((isnull((_1#2 = a2#16)) || isnull((_2#3 = b2#17))) || ((_1#2 = a2#16) && (_2#3 = b2#17))) to (after this fix): Join LeftAnti, (((_1#2 = a2#16) || isnull((_1#2 = a2#16))) && ((_2#3 = b2#17) || isnull((_2#3 = b2#17 ## How was this patch tested? sql/test, catalyst/test and new test cases in SQLQueryTestSuite. Author: Nattavut SutyanyongCloses #16467 from nsyca/19017. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdb691eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdb691eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdb691eb Branch: refs/heads/master Commit: cdb691eb4da5dbf52dccf1da0ae57a9b1874f010 Parents: 8f3f73a Author: Nattavut Sutyanyong Authored: Tue Jan 24 23:31:06 2017 +0100 Committer: Herman van Hovell Committed: Tue Jan 24 23:31:06 2017 +0100 -- .../spark/sql/catalyst/optimizer/subquery.scala | 10 +++- .../in-subquery/not-in-multiple-columns.sql | 55 ++ .../in-subquery/not-in-multiple-columns.sql.out | 59 .../apache/spark/sql/SQLQueryTestSuite.scala| 7 ++- .../org/apache/spark/sql/SubquerySuite.scala| 6 +- 5 files changed, 131 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cdb691eb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index f14aaab..4d62cce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -68,8 +68,14 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Note that will almost certainly be planned as a Broadcast Nested Loop join. // Use EXISTS if performance matters to you. val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - val anyNull = splitConjunctivePredicates(joinCond.get).map(IsNull).reduceLeft(Or) - Join(outerPlan, sub, LeftAnti, Option(Or(anyNull, joinCond.get))) + // Expand the NOT IN expression with the NULL-aware semantic + // to its full form. That is from: + // (a1,b1,...) = (a2,b2,...) + // to + // (a1=a2 OR isnull(a1=a2)) AND (b1=b2 OR isnull(b1=b2)) AND ... + val joinConds = splitConjunctivePredicates(joinCond.get) + val pairs = joinConds.map(c => Or(c, IsNull(c))).reduceLeft(And) + Join(outerPlan, sub, LeftAnti, Option(pairs)) case (p, predicate) => val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) Project(p.output, Filter(newCond.get, inputPlan)) http://git-wip-us.apache.org/repos/asf/spark/blob/cdb691eb/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql new file mode 100644 index 000..db66850 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql @@ -0,0 +1,55 @@ +-- This file contains test cases for NOT IN subquery with multiple columns. + +-- The data sets are populated as follows: +-- 1) When T1.A1 = T2.A2 +--1.1) T1.B1 = T2.B2 +--1.2) T1.B1 = T2.B2 returns false +--1.3) T1.B1 is null +--1.4) T2.B2 is null +-- 2) When T1.A1 = T2.A2 returns false +-- 3) When T1.A1 is null +-- 4) When T1.A2 is null + +-- T1.A1 T1.B1 T2.A2 T2.B2 +-- - - - - +-- 1 1 1 1(1.1) +-- 1 3 (1.2) +-- 1 null 1 null(1.3 & 1.4) +-- +-- 2 1 1 1(2) +-- null 1
spark git commit: [SPARK-19017][SQL] NOT IN subquery with more than one column may return incorrect results
Repository: spark Updated Branches: refs/heads/branch-2.1 d128b6a39 -> b94fb284b [SPARK-19017][SQL] NOT IN subquery with more than one column may return incorrect results ## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the NULL-aware expression of a NOT IN query is expanded in Rule `RewritePredicateSubquery`. Example: The query select a1,b1 from t1 where (a1,b1) not in (select a2,b2 from t2); has the (a1, b1) = (a2, b2) rewritten from (before this fix): Join LeftAnti, ((isnull((_1#2 = a2#16)) || isnull((_2#3 = b2#17))) || ((_1#2 = a2#16) && (_2#3 = b2#17))) to (after this fix): Join LeftAnti, (((_1#2 = a2#16) || isnull((_1#2 = a2#16))) && ((_2#3 = b2#17) || isnull((_2#3 = b2#17 ## How was this patch tested? sql/test, catalyst/test and new test cases in SQLQueryTestSuite. Author: Nattavut SutyanyongCloses #16467 from nsyca/19017. (cherry picked from commit cdb691eb4da5dbf52dccf1da0ae57a9b1874f010) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b94fb284 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b94fb284 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b94fb284 Branch: refs/heads/branch-2.1 Commit: b94fb284b93c763cf6e604705509a4e970d6ce6e Parents: d128b6a Author: Nattavut Sutyanyong Authored: Tue Jan 24 23:31:06 2017 +0100 Committer: Herman van Hovell Committed: Tue Jan 24 23:31:19 2017 +0100 -- .../spark/sql/catalyst/optimizer/subquery.scala | 10 +++- .../in-subquery/not-in-multiple-columns.sql | 55 ++ .../in-subquery/not-in-multiple-columns.sql.out | 59 .../apache/spark/sql/SQLQueryTestSuite.scala| 7 ++- .../org/apache/spark/sql/SubquerySuite.scala| 6 +- 5 files changed, 131 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b94fb284/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index f14aaab..4d62cce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -68,8 +68,14 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Note that will almost certainly be planned as a Broadcast Nested Loop join. // Use EXISTS if performance matters to you. val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - val anyNull = splitConjunctivePredicates(joinCond.get).map(IsNull).reduceLeft(Or) - Join(outerPlan, sub, LeftAnti, Option(Or(anyNull, joinCond.get))) + // Expand the NOT IN expression with the NULL-aware semantic + // to its full form. That is from: + // (a1,b1,...) = (a2,b2,...) + // to + // (a1=a2 OR isnull(a1=a2)) AND (b1=b2 OR isnull(b1=b2)) AND ... + val joinConds = splitConjunctivePredicates(joinCond.get) + val pairs = joinConds.map(c => Or(c, IsNull(c))).reduceLeft(And) + Join(outerPlan, sub, LeftAnti, Option(pairs)) case (p, predicate) => val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) Project(p.output, Filter(newCond.get, inputPlan)) http://git-wip-us.apache.org/repos/asf/spark/blob/b94fb284/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql new file mode 100644 index 000..db66850 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql @@ -0,0 +1,55 @@ +-- This file contains test cases for NOT IN subquery with multiple columns. + +-- The data sets are populated as follows: +-- 1) When T1.A1 = T2.A2 +--1.1) T1.B1 = T2.B2 +--1.2) T1.B1 = T2.B2 returns false +--1.3) T1.B1 is null +--1.4) T2.B2 is null +-- 2) When T1.A1 = T2.A2 returns false +-- 3) When T1.A1 is null +-- 4) When T1.A2 is null + +-- T1.A1 T1.B1 T2.A2 T2.B2 +-- - - - - +-- 1 1 1 1(1.1) +-- 1 3
[2/2] spark git commit: [SPARK-19139][CORE] New auth mechanism for transport library.
[SPARK-19139][CORE] New auth mechanism for transport library. This change introduces a new auth mechanism to the transport library, to be used when users enable strong encryption. This auth mechanism has better security than the currently used DIGEST-MD5. The new protocol uses symmetric key encryption to mutually authenticate the endpoints, and is very loosely based on ISO/IEC 9798. The new protocol falls back to SASL when it thinks the remote end is old. Because SASL does not support asking the server for multiple auth protocols, which would mean we could re-use the existing SASL code by just adding a new SASL provider, the protocol is implemented outside of the SASL API to avoid the boilerplate of adding a new provider. Details of the auth protocol are discussed in the included README.md file. This change partly undos the changes added in SPARK-13331; AES encryption is now decoupled from SASL authentication. The encryption code itself, though, has been re-used as part of this change. ## How was this patch tested? - Unit tests - Tested Spark 2.2 against Spark 1.6 shuffle service with SASL enabled - Tested Spark 2.2 against Spark 2.2 shuffle service with SASL fallback disabled Author: Marcelo VanzinCloses #16521 from vanzin/SPARK-19139. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f3f73ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f3f73ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f3f73ab Branch: refs/heads/master Commit: 8f3f73abc1fe62496722476460c174af0250e3fe Parents: d978338 Author: Marcelo Vanzin Authored: Tue Jan 24 10:44:04 2017 -0800 Committer: Shixiong Zhu Committed: Tue Jan 24 10:44:04 2017 -0800 -- .../network/crypto/AuthClientBootstrap.java | 128 + .../apache/spark/network/crypto/AuthEngine.java | 284 +++ .../spark/network/crypto/AuthRpcHandler.java| 170 +++ .../network/crypto/AuthServerBootstrap.java | 55 .../spark/network/crypto/ClientChallenge.java | 101 +++ .../org/apache/spark/network/crypto/README.md | 158 +++ .../spark/network/crypto/ServerResponse.java| 85 ++ .../spark/network/crypto/TransportCipher.java | 257 + .../spark/network/sasl/SaslClientBootstrap.java | 36 +-- .../spark/network/sasl/SaslRpcHandler.java | 41 +-- .../spark/network/sasl/aes/AesCipher.java | 281 -- .../network/sasl/aes/AesConfigMessage.java | 101 --- .../spark/network/util/TransportConf.java | 92 -- .../spark/network/crypto/AuthEngineSuite.java | 109 +++ .../network/crypto/AuthIntegrationSuite.java| 213 ++ .../spark/network/crypto/AuthMessagesSuite.java | 80 ++ .../spark/network/sasl/SparkSaslSuite.java | 97 +-- .../network/shuffle/ExternalShuffleClient.java | 19 +- .../mesos/MesosExternalShuffleClient.java | 5 +- .../ExternalShuffleIntegrationSuite.java| 4 +- .../shuffle/ExternalShuffleSecuritySuite.java | 9 +- .../spark/network/yarn/YarnShuffleService.java | 4 +- .../org/apache/spark/SecurityManager.scala | 11 +- .../main/scala/org/apache/spark/SparkConf.scala | 5 + .../main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../spark/deploy/ExternalShuffleService.scala | 10 +- .../apache/spark/internal/config/package.scala | 16 ++ .../netty/NettyBlockTransferService.scala | 7 +- .../apache/spark/rpc/netty/NettyRpcEnv.scala| 8 +- .../org/apache/spark/storage/BlockManager.scala | 3 +- .../scala/org/apache/spark/SparkConfSuite.scala | 19 ++ .../netty/NettyBlockTransferSecuritySuite.scala | 14 + .../org/apache/spark/rpc/RpcEnvSuite.scala | 54 +++- docs/configuration.md | 50 ++-- .../MesosCoarseGrainedSchedulerBackend.scala| 3 +- 35 files changed, 1909 insertions(+), 622 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java new file mode 100644 index 000..980525d --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership.
[1/2] spark git commit: [SPARK-19139][CORE] New auth mechanism for transport library.
Repository: spark Updated Branches: refs/heads/master d9783380f -> 8f3f73abc http://git-wip-us.apache.org/repos/asf/spark/blob/8f3f73ab/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java -- diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java new file mode 100644 index 000..21609d5 --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.crypto; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.netty.channel.Channel; +import org.junit.After; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.apache.spark.network.TestUtils; +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.client.TransportClientBootstrap; +import org.apache.spark.network.sasl.SaslRpcHandler; +import org.apache.spark.network.sasl.SaslServerBootstrap; +import org.apache.spark.network.sasl.SecretKeyHolder; +import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.server.StreamManager; +import org.apache.spark.network.server.TransportServer; +import org.apache.spark.network.server.TransportServerBootstrap; +import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.MapConfigProvider; +import org.apache.spark.network.util.TransportConf; + +public class AuthIntegrationSuite { + + private AuthTestCtx ctx; + + @After + public void cleanUp() throws Exception { +if (ctx != null) { + ctx.close(); +} +ctx = null; + } + + @Test + public void testNewAuth() throws Exception { +ctx = new AuthTestCtx(); +ctx.createServer("secret"); +ctx.createClient("secret"); + +ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 5000); +assertEquals("Pong", JavaUtils.bytesToString(reply)); +assertTrue(ctx.authRpcHandler.doDelegate); +assertFalse(ctx.authRpcHandler.delegate instanceof SaslRpcHandler); + } + + @Test + public void testAuthFailure() throws Exception { +ctx = new AuthTestCtx(); +ctx.createServer("server"); + +try { + ctx.createClient("client"); + fail("Should have failed to create client."); +} catch (Exception e) { + assertFalse(ctx.authRpcHandler.doDelegate); + assertFalse(ctx.serverChannel.isActive()); +} + } + + @Test + public void testSaslServerFallback() throws Exception { +ctx = new AuthTestCtx(); +ctx.createServer("secret", true); +ctx.createClient("secret", false); + +ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 5000); +assertEquals("Pong", JavaUtils.bytesToString(reply)); + } + + @Test + public void testSaslClientFallback() throws Exception { +ctx = new AuthTestCtx(); +ctx.createServer("secret", false); +ctx.createClient("secret", true); + +ByteBuffer reply = ctx.client.sendRpcSync(JavaUtils.stringToBytes("Ping"), 5000); +assertEquals("Pong", JavaUtils.bytesToString(reply)); + } + + @Test + public void testAuthReplay() throws Exception { +// This test covers the case where an attacker replays a challenge message sniffed from the +// network, but doesn't know the actual secret. The server should close the connection as +// soon as a message is sent after authentication is performed. This is emulated by removing +// the client encryption handler after authentication. +ctx = new AuthTestCtx(); +ctx.createServer("secret"); +ctx.createClient("secret"); + +assertNotNull(ctx.client.getChannel().pipeline() +
spark git commit: [SPARK-18036][ML][MLLIB] Fixing decision trees handling edge cases
Repository: spark Updated Branches: refs/heads/master 59c184e02 -> d9783380f [SPARK-18036][ML][MLLIB] Fixing decision trees handling edge cases ## What changes were proposed in this pull request? Decision trees/GBT/RF do not handle edge cases such as constant features or empty features. In the case of constant features we choose any arbitrary split instead of failing with a cryptic error message. In the case of empty features we fail with a better error message stating: DecisionTree requires number of features > 0, but was given an empty features vector Instead of the cryptic error message: java.lang.UnsupportedOperationException: empty.max ## How was this patch tested? Unit tests are added in the patch for: DecisionTreeRegressor GBTRegressor Random Forest Regressor Author: Ilya MatiachCloses #16377 from imatiach-msft/ilmat/fix-decision-tree. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9783380 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9783380 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9783380 Branch: refs/heads/master Commit: d9783380ff0a6440117348dee3205826d0f9687e Parents: 59c184e Author: Ilya Matiach Authored: Tue Jan 24 10:25:12 2017 -0800 Committer: Joseph K. Bradley Committed: Tue Jan 24 10:25:12 2017 -0800 -- .../ml/tree/impl/DecisionTreeMetadata.scala | 2 ++ .../spark/ml/tree/impl/RandomForest.scala | 22 +++-- .../spark/ml/tree/impl/RandomForestSuite.scala | 33 +--- 3 files changed, 51 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d9783380/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala index bc3c86a..8a9dcb4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala @@ -113,6 +113,8 @@ private[spark] object DecisionTreeMetadata extends Logging { throw new IllegalArgumentException(s"DecisionTree requires size of input RDD > 0, " + s"but was given by empty one.") } +require(numFeatures > 0, s"DecisionTree requires number of features > 0, " + + s"but was given an empty features vector") val numExamples = input.count() val numClasses = strategy.algo match { case Classification => strategy.numClasses http://git-wip-us.apache.org/repos/asf/spark/blob/d9783380/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index a61ea37..008dd19 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -714,7 +714,7 @@ private[spark] object RandomForest extends Logging { } // For each (feature, split), calculate the gain, and select the best (feature, split). -val (bestSplit, bestSplitStats) = +val splitsAndImpurityInfo = validFeatureSplits.map { case (featureIndexIdx, featureIndex) => val numSplits = binAggregates.metadata.numSplits(featureIndex) if (binAggregates.metadata.isContinuous(featureIndex)) { @@ -828,8 +828,26 @@ private[spark] object RandomForest extends Logging { new CategoricalSplit(featureIndex, categoriesForSplit.toArray, numCategories) (bestFeatureSplit, bestFeatureGainStats) } - }.maxBy(_._2.gain) + } +val (bestSplit, bestSplitStats) = + if (splitsAndImpurityInfo.isEmpty) { +// If no valid splits for features, then this split is invalid, +// return invalid information gain stats. Take any split and continue. +// Splits is empty, so arbitrarily choose to split on any threshold +val dummyFeatureIndex = featuresForNode.map(_.head).getOrElse(0) +val parentImpurityCalculator = binAggregates.getParentImpurityCalculator() +if (binAggregates.metadata.isContinuous(dummyFeatureIndex)) { + (new ContinuousSplit(dummyFeatureIndex, 0), +ImpurityStats.getInvalidImpurityStats(parentImpurityCalculator)) +} else { + val numCategories = binAggregates.metadata.featureArity(dummyFeatureIndex) + (new CategoricalSplit(dummyFeatureIndex, Array(),
spark git commit: [SPARK-14049][CORE] Add functionality in spark history sever API to query applications by end time
Repository: spark Updated Branches: refs/heads/master 752502be0 -> 0ff67a1cf [SPARK-14049][CORE] Add functionality in spark history sever API to query applications by end time ## What changes were proposed in this pull request? Currently, spark history server REST API provides functionality to query applications by application start time range based on minDate and maxDate query parameters, but it lacks support to query applications by their end time. In this pull request we are proposing optional minEndDate and maxEndDate query parameters and filtering capability based on these parameters to spark history server REST API. This functionality can be used for following queries, 1. Applications finished in last 'x' minutes 2. Applications finished before 'y' time 3. Applications finished between 'x' time to 'y' time 4. Applications started from 'x' time and finished before 'y' time. For backward compatibility, we can keep existing minDate and maxDate query parameters as they are and they can continue support filtering based on start time range. ## How was this patch tested? Existing unit tests and 4 new unit tests. Author: Parag ChaudhariCloses #11867 from paragpc/master-SHS-query-by-endtime_2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ff67a1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ff67a1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ff67a1c Branch: refs/heads/master Commit: 0ff67a1cf91ce4a36657c789c0fe676f4f89282f Parents: 752502b Author: Parag Chaudhari Authored: Tue Jan 24 08:41:46 2017 -0600 Committer: Imran Rashid Committed: Tue Jan 24 08:41:46 2017 -0600 -- .../status/api/v1/ApplicationListResource.scala | 22 - .../maxEndDate_app_list_json_expectation.json | 95 ...nd_maxEndDate_app_list_json_expectation.json | 53 +++ ...nd_maxEndDate_app_list_json_expectation.json | 53 +++ .../minEndDate_app_list_json_expectation.json | 70 +++ .../deploy/history/HistoryServerSuite.scala | 6 ++ docs/monitoring.md | 17 +++- 7 files changed, 311 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ff67a1c/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 7677929..a023926 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -30,6 +30,8 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) { @QueryParam("status") status: JList[ApplicationStatus], @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: SimpleDateParam, @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam, + @DefaultValue("2010-01-01") @QueryParam("minEndDate") minEndDate: SimpleDateParam, + @DefaultValue("3000-01-01") @QueryParam("maxEndDate") maxEndDate: SimpleDateParam, @QueryParam("limit") limit: Integer) : Iterator[ApplicationInfo] = { @@ -43,11 +45,27 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) { // keep the app if *any* attempts fall in the right time window ((!anyRunning && includeCompleted) || (anyRunning && includeRunning)) && app.attempts.exists { attempt => -val start = attempt.startTime.getTime -start >= minDate.timestamp && start <= maxDate.timestamp +isAttemptInRange(attempt, minDate, maxDate, minEndDate, maxEndDate, anyRunning) } }.take(numApps) } + + private def isAttemptInRange( + attempt: ApplicationAttemptInfo, + minStartDate: SimpleDateParam, + maxStartDate: SimpleDateParam, + minEndDate: SimpleDateParam, + maxEndDate: SimpleDateParam, + anyRunning: Boolean): Boolean = { +val startTimeOk = attempt.startTime.getTime >= minStartDate.timestamp && + attempt.startTime.getTime <= maxStartDate.timestamp +// If the maxEndDate is in the past, exclude all running apps. +val endTimeOkForRunning = anyRunning && (maxEndDate.timestamp > System.currentTimeMillis()) +val endTimeOkForCompleted = !anyRunning && (attempt.endTime.getTime >= minEndDate.timestamp && + attempt.endTime.getTime <= maxEndDate.timestamp) +val endTimeOk = endTimeOkForRunning || endTimeOkForCompleted +startTimeOk && endTimeOk + } } private[spark] object ApplicationsListResource {
spark git commit: [SPARK-19246][SQL] CataLogTable's partitionSchema order and exist check
Repository: spark Updated Branches: refs/heads/master 3c86fdddf -> 752502be0 [SPARK-19246][SQL] CataLogTable's partitionSchema order and exist check ## What changes were proposed in this pull request? CataLogTable's partitionSchema should check if each column name in partitionColumnNames must match one and only one field in schema, if not we should throw an exception and CataLogTable's partitionSchema should keep order with partitionColumnNames ## How was this patch tested? N/A Author: windpigerCloses #16606 from windpiger/checkPartionColNameWithSchema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/752502be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/752502be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/752502be Branch: refs/heads/master Commit: 752502be053c66a95b04204b4ae0e9574394bc58 Parents: 3c86fdd Author: windpiger Authored: Tue Jan 24 20:49:23 2017 +0800 Committer: Wenchen Fan Committed: Tue Jan 24 20:49:23 2017 +0800 -- .../apache/spark/sql/catalyst/catalog/interface.scala | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/752502be/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 7bbaf6e..b8dc5f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -182,10 +182,15 @@ case class CatalogTable( import CatalogTable._ - /** schema of this table's partition columns */ - def partitionSchema: StructType = StructType(schema.filter { -c => partitionColumnNames.contains(c.name) - }) + /** + * schema of this table's partition columns + */ + def partitionSchema: StructType = { +val partitionFields = schema.takeRight(partitionColumnNames.length) +assert(partitionFields.map(_.name) == partitionColumnNames) + +StructType(partitionFields) + } /** Return the database this table was specified to belong to, assuming it exists. */ def database: String = identifier.database.getOrElse { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19152][SQL] DataFrameWriter.saveAsTable support hive append
Repository: spark Updated Branches: refs/heads/master cca868004 -> 3c86fdddf [SPARK-19152][SQL] DataFrameWriter.saveAsTable support hive append ## What changes were proposed in this pull request? After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support. This PR implement: DataFrameWriter.saveAsTable work with hive format with append mode ## How was this patch tested? unit test added Author: windpigerCloses #16552 from windpiger/saveAsTableWithHiveAppend. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c86fddd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c86fddd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c86fddd Branch: refs/heads/master Commit: 3c86fdddf4bb1eac985654f80c3c716b7ae7540b Parents: cca8680 Author: windpiger Authored: Tue Jan 24 20:40:27 2017 +0800 Committer: Wenchen Fan Committed: Tue Jan 24 20:40:27 2017 +0800 -- .../spark/sql/execution/datasources/ddl.scala | 7 +++- .../spark/sql/execution/datasources/rules.scala | 7 +--- apache.spark.sql.sources.DataSourceRegister | 1 + .../apache/spark/sql/hive/HiveStrategies.scala | 9 + .../CreateHiveTableAsSelectCommand.scala| 22 +++- .../sql/hive/execution/HiveFileFormat.scala | 9 - .../sql/hive/MetastoreDataSourcesSuite.scala| 13 +++ .../spark/sql/hive/execution/HiveDDLSuite.scala | 36 +--- .../sql/hive/execution/SQLQuerySuite.scala | 17 + 9 files changed, 83 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c86fddd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 695ba12..d10fa2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} import org.apache.spark.sql.types._ case class CreateTable( @@ -65,6 +65,11 @@ case class CreateTempViewUsing( } def run(sparkSession: SparkSession): Seq[Row] = { +if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException("Hive data source can only be used with tables, " + +"you can't use it with CREATE TEMP VIEW USING") +} + val dataSource = DataSource( sparkSession, userSpecifiedSchema = userSpecifiedSchema, http://git-wip-us.apache.org/repos/asf/spark/blob/3c86fddd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 6888dec..d553d44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -49,7 +49,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { // will catch it and return the original plan, so that the analyzer can report table not // found later. val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass) -if (!isFileFormat) { +if (!isFileFormat || dataSource.className.toLowerCase == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Unsupported data source type for direct query on files: " + s"${u.tableIdentifier.database.get}") } @@ -110,11 +110,6 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl throw new AnalysisException("Saving data into a view is not allowed.") } - if (DDLUtils.isHiveTable(existingTable)) { -throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + - "not
spark git commit: delete useless var “j”
Repository: spark Updated Branches: refs/heads/master 7c61c2a1c -> cca868004 delete useless var âjâ the var âjâ defined in "var j = 0" is useless for âdef compressâ Author: Souljoy ZhuoCloses #16676 from xiaoyesoso/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cca86800 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cca86800 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cca86800 Branch: refs/heads/master Commit: cca8680047bb2ec312ffc296a561abd5cbc8323c Parents: 7c61c2a Author: Souljoy Zhuo Authored: Tue Jan 24 11:33:17 2017 + Committer: Sean Owen Committed: Tue Jan 24 11:33:17 2017 + -- mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 -- 1 file changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cca86800/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 995780b..97c8655 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1038,14 +1038,12 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { uniqueSrcIdsBuilder += preSrcId var curCount = 1 var i = 1 - var j = 0 while (i < sz) { val srcId = srcIds(i) if (srcId != preSrcId) { uniqueSrcIdsBuilder += srcId dstCountsBuilder += curCount preSrcId = srcId - j += 1 curCount = 0 } curCount += 1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCS] Fix typo in docs
Repository: spark Updated Branches: refs/heads/master f27e02476 -> 7c61c2a1c [DOCS] Fix typo in docs ## What changes were proposed in this pull request? Fix typo in docs ## How was this patch tested? Author: uncleGenCloses #16658 from uncleGen/typo-issue. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c61c2a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c61c2a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c61c2a1 Branch: refs/heads/master Commit: 7c61c2a1c40629311b84dff8d91b257efb345d07 Parents: f27e024 Author: uncleGen Authored: Tue Jan 24 11:32:11 2017 + Committer: Sean Owen Committed: Tue Jan 24 11:32:11 2017 + -- docs/configuration.md| 2 +- docs/index.md| 2 +- docs/programming-guide.md| 6 +++--- docs/streaming-kafka-0-10-integration.md | 2 +- docs/submitting-applications.md | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c61c2a1/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index a6b1f15..b7f10e6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -435,7 +435,7 @@ Apart from these, the following properties are also available, and may be useful spark.jars.packages -Comma-separated list of maven coordinates of jars to include on the driver and executor +Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts will be searched for in the local maven repo, then maven central and finally any additional remote http://git-wip-us.apache.org/repos/asf/spark/blob/7c61c2a1/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index 57b9fa8..023e06a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -15,7 +15,7 @@ It also supports a rich set of higher-level tools including [Spark SQL](sql-prog Get Spark from the [downloads page](http://spark.apache.org/downloads.html) of the project website. This documentation is for Spark version {{site.SPARK_VERSION}}. Spark uses Hadoop's client libraries for HDFS and YARN. Downloads are pre-packaged for a handful of popular Hadoop versions. Users can also download a "Hadoop free" binary and run Spark with any Hadoop version [by augmenting Spark's classpath](hadoop-provided.html). -Scala and Java users can include Spark in their projects using its maven cooridnates and in the future Python users can also install Spark from PyPI. +Scala and Java users can include Spark in their projects using its Maven coordinates and in the future Python users can also install Spark from PyPI. If you'd like to build Spark from http://git-wip-us.apache.org/repos/asf/spark/blob/7c61c2a1/docs/programming-guide.md -- diff --git a/docs/programming-guide.md b/docs/programming-guide.md index a4017b5..db8b048 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -185,7 +185,7 @@ In the Spark shell, a special interpreter-aware SparkContext is already created variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `--master` argument, and you can add JARs to the classpath by passing a comma-separated list to the `--jars` argument. You can also add dependencies -(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates +(e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. Sonatype) can be passed to the `--repositories` argument. For example, to run `bin/spark-shell` on exactly four cores, use: @@ -200,7 +200,7 @@ Or, to also add `code.jar` to its classpath, use: $ ./bin/spark-shell --master local[4] --jars code.jar {% endhighlight %} -To include a dependency using maven coordinates: +To include a dependency using Maven coordinates: {% highlight bash %} $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" @@ -217,7 +217,7 @@ In the PySpark shell, a special interpreter-aware SparkContext is already create variable called `sc`. Making your own SparkContext will not work. You can set which master the context
spark git commit: [SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case
Repository: spark Updated Branches: refs/heads/branch-2.1 9c04e427d -> d128b6a39 [SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case [SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case where no children exist in updateAssignments ## What changes were proposed in this pull request? Fix a bug in which BisectingKMeans fails with error: java.util.NoSuchElementException: key not found: 166 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125) at scala.collection.immutable.List.reduceLeft(List.scala:84) at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231) at scala.collection.AbstractTraversable.minBy(Traversable.scala:105) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337) at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) ## How was this patch tested? The dataset was run against the code change to verify that the code works. I will try to add unit tests to the code. (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Ilya MatiachCloses #16355 from imatiach-msft/ilmat/fix-kmeans. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d128b6a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d128b6a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d128b6a3 Branch: refs/heads/branch-2.1 Commit: d128b6a39ebafd56041e1fb44d71c61033ae6f8e Parents: 9c04e42 Author: Ilya Matiach Authored: Mon Jan 23 13:34:27 2017 -0800 Committer: Sean Owen Committed: Tue Jan 24 11:27:38 2017 + -- .../spark/mllib/clustering/BisectingKMeans.scala | 19 --- .../ml/clustering/BisectingKMeansSuite.scala | 19 +++ .../apache/spark/ml/clustering/KMeansSuite.scala | 13 + 3 files changed, 44 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d128b6a3/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 336f2fc..ae98e24 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -339,10 +339,15 @@ private object BisectingKMeans extends Serializable { assignments.map { case (index, v) => if (divisibleIndices.contains(index)) { val children = Seq(leftChildIndex(index), rightChildIndex(index)) -val selected = children.minBy { child => - KMeans.fastSquaredDistance(newClusterCenters(child), v) +val newClusterChildren = children.filter(newClusterCenters.contains(_)) +if (newClusterChildren.nonEmpty) { + val selected = newClusterChildren.minBy { child => +KMeans.fastSquaredDistance(newClusterCenters(child), v) + } +
spark git commit: [SPARK-18823][SPARKR] add support for assigning to column
Repository: spark Updated Branches: refs/heads/master ec9493b44 -> f27e02476 [SPARK-18823][SPARKR] add support for assigning to column ## What changes were proposed in this pull request? Support for ``` df[[myname]] <- 1 df[[2]] <- df$eruptions ``` ## How was this patch tested? manual tests, unit tests Author: Felix CheungCloses #16663 from felixcheung/rcolset. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f27e0247 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f27e0247 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f27e0247 Branch: refs/heads/master Commit: f27e024768e328b96704a9ef35b77381da480328 Parents: ec9493b Author: Felix Cheung Authored: Tue Jan 24 00:23:23 2017 -0800 Committer: Felix Cheung Committed: Tue Jan 24 00:23:23 2017 -0800 -- R/pkg/R/DataFrame.R | 48 +++--- R/pkg/inst/tests/testthat/test_sparkSQL.R | 20 +++ 2 files changed, 55 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f27e0247/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 3d912c9..0a10122 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1717,6 +1717,23 @@ getColumn <- function(x, c) { column(callJMethod(x@sdf, "col", c)) } +setColumn <- function(x, c, value) { + if (class(value) != "Column" && !is.null(value)) { +if (isAtomicLengthOne(value)) { + value <- lit(value) +} else { + stop("value must be a Column, literal value as atomic in length of 1, or NULL") +} + } + + if (is.null(value)) { +nx <- drop(x, c) + } else { +nx <- withColumn(x, c, value) + } + nx +} + #' @param name name of a Column (without being wrapped by \code{""}). #' @rdname select #' @name $ @@ -1735,19 +1752,7 @@ setMethod("$", signature(x = "SparkDataFrame"), #' @note $<- since 1.4.0 setMethod("$<-", signature(x = "SparkDataFrame"), function(x, name, value) { -if (class(value) != "Column" && !is.null(value)) { - if (isAtomicLengthOne(value)) { -value <- lit(value) - } else { -stop("value must be a Column, literal value as atomic in length of 1, or NULL") - } -} - -if (is.null(value)) { - nx <- drop(x, name) -} else { - nx <- withColumn(x, name, value) -} +nx <- setColumn(x, name, value) x@sdf <- nx@sdf x }) @@ -1768,6 +1773,21 @@ setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"), }) #' @rdname subset +#' @name [[<- +#' @aliases [[<-,SparkDataFrame,numericOrcharacter-method +#' @note [[<- since 2.1.1 +setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"), + function(x, i, value) { +if (is.numeric(i)) { + cols <- columns(x) + i <- cols[[i]] +} +nx <- setColumn(x, i, value) +x@sdf <- nx@sdf +x + }) + +#' @rdname subset #' @name [ #' @aliases [,SparkDataFrame-method #' @note [ since 1.4.0 @@ -1814,6 +1834,8 @@ setMethod("[", signature(x = "SparkDataFrame"), #' @param j,select expression for the single Column or a list of columns to select from the SparkDataFrame. #' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column. #' Otherwise, a SparkDataFrame will always be returned. +#' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}. +#' If \code{NULL}, the specified Column is dropped. #' @param ... currently not used. #' @return A new SparkDataFrame containing only the rows that meet the condition with selected columns. #' @export http://git-wip-us.apache.org/repos/asf/spark/blob/f27e0247/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 2601742..aaa8fb4 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1021,6 +1021,9 @@ test_that("select operators", { df$age2 <- df$age * 2 expect_equal(columns(df), c("name", "age", "age2")) expect_equal(count(where(df, df$age2 == df$age * 2)), 2) + df$age2 <- df[["age"]] * 3 + expect_equal(columns(df), c("name", "age", "age2")) + expect_equal(count(where(df, df$age2 == df$age * 3)), 2) df$age2 <- 21 expect_equal(columns(df),