spark git commit: [SPARK-20549] java.io.CharConversionException: Invalid UTF-32' in JsonToStructs
Repository: spark Updated Branches: refs/heads/master afb21bf22 -> 86174ea89 [SPARK-20549] java.io.CharConversionException: Invalid UTF-32' in JsonToStructs ## What changes were proposed in this pull request? A fix for the same problem was made in #17693 but ignored `JsonToStructs`. This PR uses the same fix for `JsonToStructs`. ## How was this patch tested? Regression test Author: Burak Yavuz Closes #17826 from brkyvz/SPARK-20549. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86174ea8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86174ea8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86174ea8 Branch: refs/heads/master Commit: 86174ea89b39a300caaba6baffac70f3dc702788 Parents: afb21bf Author: Burak Yavuz Authored: Tue May 2 14:08:16 2017 +0800 Committer: Wenchen Fan Committed: Tue May 2 14:08:16 2017 +0800 -- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 8 +++- .../apache/spark/sql/catalyst/json/CreateJacksonParser.scala | 7 +-- .../sql/catalyst/expressions/JsonExpressionsSuite.scala | 7 +++ 3 files changed, 15 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/86174ea8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 9fb0ea6..6b90354 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -151,8 +151,7 @@ case class GetJsonObject(json: Expression, path: Expression) try { /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson detect character encoding which could fail for some malformed strings */ -Utils.tryWithResource(jsonFactory.createParser(new InputStreamReader( -new ByteArrayInputStream(jsonStr.getBytes), "UTF-8"))) { parser => +Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, jsonStr)) { parser => val output = new ByteArrayOutputStream() val matched = Utils.tryWithResource( jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator => @@ -398,9 +397,8 @@ case class JsonTuple(children: Seq[Expression]) try { /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson detect character encoding which could fail for some malformed strings */ - Utils.tryWithResource(jsonFactory.createParser(new InputStreamReader( - new ByteArrayInputStream(json.getBytes), "UTF-8"))) { -parser => parseRow(parser, input) + Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => +parseRow(parser, input) } } catch { case _: JsonProcessingException => http://git-wip-us.apache.org/repos/asf/spark/blob/86174ea8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index e0ed03a..025a388 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.json -import java.io.InputStream +import java.io.{ByteArrayInputStream, InputStream, InputStreamReader} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.hadoop.io.Text @@ -33,7 +33,10 @@ private[sql] object CreateJacksonParser extends Serializable { val bb = record.getByteBuffer assert(bb.hasArray) -jsonFactory.createParser(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) +val bain = new ByteArrayInputStream( + bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) + +jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } def text(jsonFactory: JsonFactory, record: Text): JsonParser = { http://git-wip-us.apache.org/repos/asf/spark/blob/86174ea8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/ap
spark git commit: [SPARK-20549] java.io.CharConversionException: Invalid UTF-32' in JsonToStructs
Repository: spark Updated Branches: refs/heads/branch-2.2 b146481ff -> ef5e2a050 [SPARK-20549] java.io.CharConversionException: Invalid UTF-32' in JsonToStructs ## What changes were proposed in this pull request? A fix for the same problem was made in #17693 but ignored `JsonToStructs`. This PR uses the same fix for `JsonToStructs`. ## How was this patch tested? Regression test Author: Burak Yavuz Closes #17826 from brkyvz/SPARK-20549. (cherry picked from commit 86174ea89b39a300caaba6baffac70f3dc702788) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef5e2a05 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef5e2a05 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef5e2a05 Branch: refs/heads/branch-2.2 Commit: ef5e2a0509801f6afced3bc80f8d700acf84e0dd Parents: b146481 Author: Burak Yavuz Authored: Tue May 2 14:08:16 2017 +0800 Committer: Wenchen Fan Committed: Tue May 2 14:08:31 2017 +0800 -- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 8 +++- .../apache/spark/sql/catalyst/json/CreateJacksonParser.scala | 7 +-- .../sql/catalyst/expressions/JsonExpressionsSuite.scala | 7 +++ 3 files changed, 15 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef5e2a05/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 9fb0ea6..6b90354 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -151,8 +151,7 @@ case class GetJsonObject(json: Expression, path: Expression) try { /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson detect character encoding which could fail for some malformed strings */ -Utils.tryWithResource(jsonFactory.createParser(new InputStreamReader( -new ByteArrayInputStream(jsonStr.getBytes), "UTF-8"))) { parser => +Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, jsonStr)) { parser => val output = new ByteArrayOutputStream() val matched = Utils.tryWithResource( jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator => @@ -398,9 +397,8 @@ case class JsonTuple(children: Seq[Expression]) try { /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson detect character encoding which could fail for some malformed strings */ - Utils.tryWithResource(jsonFactory.createParser(new InputStreamReader( - new ByteArrayInputStream(json.getBytes), "UTF-8"))) { -parser => parseRow(parser, input) + Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => +parseRow(parser, input) } } catch { case _: JsonProcessingException => http://git-wip-us.apache.org/repos/asf/spark/blob/ef5e2a05/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index e0ed03a..025a388 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.json -import java.io.InputStream +import java.io.{ByteArrayInputStream, InputStream, InputStreamReader} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.hadoop.io.Text @@ -33,7 +33,10 @@ private[sql] object CreateJacksonParser extends Serializable { val bb = record.getByteBuffer assert(bb.hasArray) -jsonFactory.createParser(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) +val bain = new ByteArrayInputStream( + bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) + +jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } def text(jsonFactory: JsonFactory, record: Text): JsonParser = { http://git-wip-us.apache.org/repos/asf/spark/blob/ef5e2a05/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala -
spark git commit: [SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation
Repository: spark Updated Branches: refs/heads/branch-2.2 b7c1c2f97 -> b146481ff [SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation ## What changes were proposed in this pull request? As #17773 revealed `OnHeapColumnVector` may copy a part of the original storage. `OffHeapColumnVector` reallocation also copies to the new storage data up to 'elementsAppended'. This variable is only updated when using the `ColumnVector.appendX` API, while `ColumnVector.putX` is more commonly used. This PR copies the new storage data up to the previously-allocated size in`OffHeapColumnVector`. ## How was this patch tested? Existing test suites Author: Kazuaki Ishizaki Closes #17811 from kiszk/SPARK-20537. (cherry picked from commit afb21bf22a59c9416c04637412fb69d1442e6826) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b146481f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b146481f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b146481f Branch: refs/heads/branch-2.2 Commit: b146481fff1ce529245f9c03b35c73ea604712d0 Parents: b7c1c2f Author: Kazuaki Ishizaki Authored: Tue May 2 13:56:41 2017 +0800 Committer: Wenchen Fan Committed: Tue May 2 13:57:08 2017 +0800 -- .../execution/vectorized/OffHeapColumnVector.java | 17 + 1 file changed, 9 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b146481f/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index e988c07..a7d3744 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -436,28 +436,29 @@ public final class OffHeapColumnVector extends ColumnVector { // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { +int oldCapacity = (this.data == 0L) ? 0 : capacity; if (this.resultArray != null) { this.lengthData = - Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4); + Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); this.offsetData = - Platform.reallocateMemory(offsetData, elementsAppended * 4, newCapacity * 4); + Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4); } else if (type instanceof ByteType || type instanceof BooleanType) { - this.data = Platform.reallocateMemory(data, elementsAppended, newCapacity); + this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity); } else if (type instanceof ShortType) { - this.data = Platform.reallocateMemory(data, elementsAppended * 2, newCapacity * 2); + this.data = Platform.reallocateMemory(data, oldCapacity * 2, newCapacity * 2); } else if (type instanceof IntegerType || type instanceof FloatType || type instanceof DateType || DecimalType.is32BitDecimalType(type)) { - this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4); + this.data = Platform.reallocateMemory(data, oldCapacity * 4, newCapacity * 4); } else if (type instanceof LongType || type instanceof DoubleType || DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) { - this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8); + this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8); } else if (resultStruct != null) { // Nothing to store. } else { throw new RuntimeException("Unhandled " + type); } -this.nulls = Platform.reallocateMemory(nulls, elementsAppended, newCapacity); -Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - elementsAppended); +this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity); +Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - oldCapacity); capacity = newCapacity; } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation
Repository: spark Updated Branches: refs/heads/master 90d77e971 -> afb21bf22 [SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation ## What changes were proposed in this pull request? As #17773 revealed `OnHeapColumnVector` may copy a part of the original storage. `OffHeapColumnVector` reallocation also copies to the new storage data up to 'elementsAppended'. This variable is only updated when using the `ColumnVector.appendX` API, while `ColumnVector.putX` is more commonly used. This PR copies the new storage data up to the previously-allocated size in`OffHeapColumnVector`. ## How was this patch tested? Existing test suites Author: Kazuaki Ishizaki Closes #17811 from kiszk/SPARK-20537. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afb21bf2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afb21bf2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afb21bf2 Branch: refs/heads/master Commit: afb21bf22a59c9416c04637412fb69d1442e6826 Parents: 90d77e9 Author: Kazuaki Ishizaki Authored: Tue May 2 13:56:41 2017 +0800 Committer: Wenchen Fan Committed: Tue May 2 13:56:41 2017 +0800 -- .../execution/vectorized/OffHeapColumnVector.java | 17 + 1 file changed, 9 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/afb21bf2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index e988c07..a7d3744 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -436,28 +436,29 @@ public final class OffHeapColumnVector extends ColumnVector { // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { +int oldCapacity = (this.data == 0L) ? 0 : capacity; if (this.resultArray != null) { this.lengthData = - Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4); + Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); this.offsetData = - Platform.reallocateMemory(offsetData, elementsAppended * 4, newCapacity * 4); + Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4); } else if (type instanceof ByteType || type instanceof BooleanType) { - this.data = Platform.reallocateMemory(data, elementsAppended, newCapacity); + this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity); } else if (type instanceof ShortType) { - this.data = Platform.reallocateMemory(data, elementsAppended * 2, newCapacity * 2); + this.data = Platform.reallocateMemory(data, oldCapacity * 2, newCapacity * 2); } else if (type instanceof IntegerType || type instanceof FloatType || type instanceof DateType || DecimalType.is32BitDecimalType(type)) { - this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4); + this.data = Platform.reallocateMemory(data, oldCapacity * 4, newCapacity * 4); } else if (type instanceof LongType || type instanceof DoubleType || DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) { - this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8); + this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8); } else if (resultStruct != null) { // Nothing to store. } else { throw new RuntimeException("Unhandled " + type); } -this.nulls = Platform.reallocateMemory(nulls, elementsAppended, newCapacity); -Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - elementsAppended); +this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity); +Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - oldCapacity); capacity = newCapacity; } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20532][SPARKR] Implement grouping and grouping_id
Repository: spark Updated Branches: refs/heads/master d20a976e8 -> 90d77e971 [SPARK-20532][SPARKR] Implement grouping and grouping_id ## What changes were proposed in this pull request? Adds R wrappers for: - `o.a.s.sql.functions.grouping` as `o.a.s.sql.functions.is_grouping` (to avoid shading `base::grouping` - `o.a.s.sql.functions.grouping_id` ## How was this patch tested? Existing unit tests, additional unit tests. `check-cran.sh`. Author: zero323 Closes #17807 from zero323/SPARK-20532. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d77e97 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d77e97 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d77e97 Branch: refs/heads/master Commit: 90d77e971f6b3fa268e411279f34bc1db4321991 Parents: d20a976 Author: zero323 Authored: Mon May 1 21:39:17 2017 -0700 Committer: Felix Cheung Committed: Mon May 1 21:39:17 2017 -0700 -- R/pkg/NAMESPACE | 2 + R/pkg/R/functions.R | 84 ++ R/pkg/R/generics.R| 8 +++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 56 - 4 files changed, 148 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90d77e97/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index e8de34d..7ecd168 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -249,6 +249,8 @@ exportMethods("%<=>%", "getField", "getItem", "greatest", + "grouping_bit", + "grouping_id", "hex", "histogram", "hour", http://git-wip-us.apache.org/repos/asf/spark/blob/90d77e97/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index f9687d6..38384a8 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3890,3 +3890,87 @@ setMethod("not", jc <- callJStatic("org.apache.spark.sql.functions", "not", x@jc) column(jc) }) + +#' grouping_bit +#' +#' Indicates whether a specified column in a GROUP BY list is aggregated or not, +#' returns 1 for aggregated or 0 for not aggregated in the result set. +#' +#' Same as \code{GROUPING} in SQL and \code{grouping} function in Scala. +#' +#' @param x Column to compute on +#' +#' @rdname grouping_bit +#' @name grouping_bit +#' @family agg_funcs +#' @aliases grouping_bit,Column-method +#' @export +#' @examples \dontrun{ +#' df <- createDataFrame(mtcars) +#' +#' # With cube +#' agg( +#' cube(df, "cyl", "gear", "am"), +#' mean(df$mpg), +#' grouping_bit(df$cyl), grouping_bit(df$gear), grouping_bit(df$am) +#' ) +#' +#' # With rollup +#' agg( +#' rollup(df, "cyl", "gear", "am"), +#' mean(df$mpg), +#' grouping_bit(df$cyl), grouping_bit(df$gear), grouping_bit(df$am) +#' ) +#' } +#' @note grouping_bit since 2.3.0 +setMethod("grouping_bit", + signature(x = "Column"), + function(x) { +jc <- callJStatic("org.apache.spark.sql.functions", "grouping", x@jc) +column(jc) + }) + +#' grouping_id +#' +#' Returns the level of grouping. +#' +#' Equals to \code{ +#' grouping_bit(c1) * 2^(n - 1) + grouping_bit(c2) * 2^(n - 2) + ... + grouping_bit(cn) +#' } +#' +#' @param x Column to compute on +#' @param ... additional Column(s) (optional). +#' +#' @rdname grouping_id +#' @name grouping_id +#' @family agg_funcs +#' @aliases grouping_id,Column-method +#' @export +#' @examples \dontrun{ +#' df <- createDataFrame(mtcars) +#' +#' # With cube +#' agg( +#' cube(df, "cyl", "gear", "am"), +#' mean(df$mpg), +#' grouping_id(df$cyl, df$gear, df$am) +#' ) +#' +#' # With rollup +#' agg( +#' rollup(df, "cyl", "gear", "am"), +#' mean(df$mpg), +#' grouping_id(df$cyl, df$gear, df$am) +#' ) +#' } +#' @note grouping_id since 2.3.0 +setMethod("grouping_id", + signature(x = "Column"), + function(x, ...) { +jcols <- lapply(list(x, ...), function (x) { + stopifnot(class(x) == "Column") + x@jc +}) +jc <- callJStatic("org.apache.spark.sql.functions", "grouping_id", jcols) +column(jc) + }) http://git-wip-us.apache.org/repos/asf/spark/blob/90d77e97/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index ef36765..e02d464 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1052,6 +1052,14 @@ setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") #' @export setGeneric("greatest", function
spark git commit: [SPARK-20192][SPARKR][DOC] SparkR migration guide to 2.2.0
Repository: spark Updated Branches: refs/heads/branch-2.2 5a0a8b039 -> b7c1c2f97 [SPARK-20192][SPARKR][DOC] SparkR migration guide to 2.2.0 ## What changes were proposed in this pull request? Updating R Programming Guide ## How was this patch tested? manually Author: Felix Cheung Closes #17816 from felixcheung/r22relnote. (cherry picked from commit d20a976e8918ca8d607af452301e8014fe14e64a) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7c1c2f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7c1c2f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7c1c2f9 Branch: refs/heads/branch-2.2 Commit: b7c1c2f973635a2ec05aedd89456765d830dfdce Parents: 5a0a8b0 Author: Felix Cheung Authored: Mon May 1 21:03:48 2017 -0700 Committer: Felix Cheung Committed: Mon May 1 21:04:04 2017 -0700 -- docs/sparkr.md | 8 1 file changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b7c1c2f9/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index e7f6021..0e97213 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -610,3 +610,11 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma ## Upgrading to SparkR 2.1.0 - `join` no longer performs Cartesian Product by default, use `crossJoin` instead. + +## Upgrading to SparkR 2.2.0 + + - A `numPartitions` parameter has been added to `createDataFrame` and `as.DataFrame`. When splitting the data, the partition position calculation has been made to match the one in Scala. + - The method `createExternalTable` has been deprecated to be replaced by `createTable`. Either methods can be called to create external or managed table. Additional catalog methods have also been added. + - By default, derby.log is now saved to `tempdir()`. This will be created when instantiating the SparkSession with `enableHiveSupport` set to `TRUE`. + - `spark.lda` was not setting the optimizer correctly. It has been corrected. + - Several model summary outputs are updated to have `coefficients` as `matrix`. This includes `spark.logit`, `spark.kmeans`, `spark.glm`. Model summary outputs for `spark.gaussianMixture` have added log-likelihood as `loglik`. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20192][SPARKR][DOC] SparkR migration guide to 2.2.0
Repository: spark Updated Branches: refs/heads/master 943a684b9 -> d20a976e8 [SPARK-20192][SPARKR][DOC] SparkR migration guide to 2.2.0 ## What changes were proposed in this pull request? Updating R Programming Guide ## How was this patch tested? manually Author: Felix Cheung Closes #17816 from felixcheung/r22relnote. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d20a976e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d20a976e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d20a976e Branch: refs/heads/master Commit: d20a976e8918ca8d607af452301e8014fe14e64a Parents: 943a684 Author: Felix Cheung Authored: Mon May 1 21:03:48 2017 -0700 Committer: Felix Cheung Committed: Mon May 1 21:03:48 2017 -0700 -- docs/sparkr.md | 8 1 file changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d20a976e/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index 16b1ef6..6dbd02a 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -644,3 +644,11 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma ## Upgrading to SparkR 2.1.0 - `join` no longer performs Cartesian Product by default, use `crossJoin` instead. + +## Upgrading to SparkR 2.2.0 + + - A `numPartitions` parameter has been added to `createDataFrame` and `as.DataFrame`. When splitting the data, the partition position calculation has been made to match the one in Scala. + - The method `createExternalTable` has been deprecated to be replaced by `createTable`. Either methods can be called to create external or managed table. Additional catalog methods have also been added. + - By default, derby.log is now saved to `tempdir()`. This will be created when instantiating the SparkSession with `enableHiveSupport` set to `TRUE`. + - `spark.lda` was not setting the optimizer correctly. It has been corrected. + - Several model summary outputs are updated to have `coefficients` as `matrix`. This includes `spark.logit`, `spark.kmeans`, `spark.glm`. Model summary outputs for `spark.gaussianMixture` have added log-likelihood as `loglik`. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.1-rc3 [deleted] 2ed19cff2 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.1-rc2 [deleted] 02b165dcc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.1-rc4 [deleted] 267aca5bd - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.1-rc1 [deleted] 30abb95c9 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.1 [created] 267aca5bd - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r19436 - /dev/spark/spark-2.1.1-rc4/
Author: marmbrus Date: Tue May 2 01:05:29 2017 New Revision: 19436 Log: Add spark-2.1.1-rc4 Added: dev/spark/spark-2.1.1-rc4/ dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz (with props) dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.asc dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.md5 dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.sha dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz (with props) dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz.asc dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz.md5 dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz.sha dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.3.tgz (with props) dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.3.tgz.asc dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.3.tgz.md5 dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.3.tgz.sha dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.4.tgz (with props) dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.4.tgz.asc dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.4.tgz.md5 dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.4.tgz.sha dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.6.tgz (with props) dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.6.tgz.asc dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.6.tgz.md5 dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.6.tgz.sha dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.7.tgz (with props) dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.7.tgz.asc dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.7.tgz.md5 dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-hadoop2.7.tgz.sha dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-without-hadoop.tgz (with props) dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-without-hadoop.tgz.asc dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-without-hadoop.tgz.md5 dev/spark/spark-2.1.1-rc4/spark-2.1.1-bin-without-hadoop.tgz.sha dev/spark/spark-2.1.1-rc4/spark-2.1.1.tgz (with props) dev/spark/spark-2.1.1-rc4/spark-2.1.1.tgz.asc dev/spark/spark-2.1.1-rc4/spark-2.1.1.tgz.md5 dev/spark/spark-2.1.1-rc4/spark-2.1.1.tgz.sha Added: dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz == Binary file - no diff available. Propchange: dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz -- svn:mime-type = application/octet-stream Added: dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.asc == --- dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.asc (added) +++ dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.asc Tue May 2 01:05:29 2017 @@ -0,0 +1,11 @@ +-BEGIN PGP SIGNATURE- +Version: GnuPG v2.0.14 (GNU/Linux) + +iQEcBAABAgAGBQJY/+V1AAoJEHxsEF/8jtCJPdIH/jx4g2JGnUxw2vh294HuGwTM +NNO33FDF+0Ghn/UvgThKicoLKDEgu/h4RfMi1orGd+dqWFJmHjhADfO2RsbC5Wa6 +Sitl6xQCSq3Riy0IAMiwhdwDQ2UwtUjEsLBslwDb8HFiNGVZHBIFaAolWQvmFfFi +aTR7Ba44YQaXXL8SKSwA+NYuxuDIRU9fZ9uPfGyLElRUMNNReMktfapU/k0AR3yl +A1lm0Y9eWSfXmwCUjAFreGwr/NN8ZmkpTj0gmp9PLx1A0Q08AUEE/xyxR6SJarHv +uHbiHY6qHStYNcRUvj8MhItZ5F86u7kCF8uUkgechYYkhoM7MNek8AdomU0Pm1c= +=21ZN +-END PGP SIGNATURE- Added: dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.md5 == --- dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.md5 (added) +++ dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.md5 Tue May 2 01:05:29 2017 @@ -0,0 +1 @@ +SparkR_2.1.1.tar.gz: 00 AC DA A4 F8 9A 0B E9 A2 0E 19 60 96 0D EF C1 Added: dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.sha == --- dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.sha (added) +++ dev/spark/spark-2.1.1-rc4/SparkR_2.1.1.tar.gz.sha Tue May 2 01:05:29 2017 @@ -0,0 +1,3 @@ +SparkR_2.1.1.tar.gz: 873FCDB8 845D3B9B 552280EC D9B79AA5 E874DFB0 846D88E3 + A5268D90 C0E0CCE1 2C65B686 915BC105 E646CA55 82BE7959 + 1FC148A4 C5610BEE 759C3A48 371D6A6F Added: dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz == Binary file - no diff available. Propchange: dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz -- svn:mime-type = application/octet-stream Added: dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz.asc == --- dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz.asc (added) +++ dev/spark/spark-2.1.1-rc4/pyspark-2.1.1+hadoop2.7.tar.gz.asc Tue May 2 01:05:29 2017 @@ -0,0 +1,11 @@ +-BEGIN PGP SIGNATURE- +Version: GnuPG v2.0.14 (GNU/Linux) + +iQEcBAABAgAGBQJY/+Ou
svn commit: r19437 - /dev/spark/spark-2.1.1-rc4/ /release/spark/spark-2.1.1/
Author: marmbrus Date: Tue May 2 01:06:55 2017 New Revision: 19437 Log: Release Spark 2.1.1 Added: release/spark/spark-2.1.1/ - copied from r19436, dev/spark/spark-2.1.1-rc4/ Removed: dev/spark/spark-2.1.1-rc4/ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20548] Disable ReplSuite.newProductSeqEncoder with REPL defined class
Repository: spark Updated Branches: refs/heads/master 259860d23 -> 943a684b9 [SPARK-20548] Disable ReplSuite.newProductSeqEncoder with REPL defined class ## What changes were proposed in this pull request? `newProductSeqEncoder with REPL defined class` in `ReplSuite` has been failing in-deterministically : https://spark-tests.appspot.com/failed-tests over the last few days. Disabling the test until a fix is in place. https://spark.test.databricks.com/job/spark-master-test-sbt-hadoop-2.7/176/testReport/junit/org.apache.spark.repl/ReplSuite/newProductSeqEncoder_with_REPL_defined_class/history/ ## How was this patch tested? N/A Author: Sameer Agarwal Closes #17823 from sameeragarwal/disable-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/943a684b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/943a684b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/943a684b Branch: refs/heads/master Commit: 943a684b9827ca294ed06a46431507538d40a134 Parents: 259860d Author: Sameer Agarwal Authored: Mon May 1 17:42:53 2017 -0700 Committer: Herman van Hovell Committed: Mon May 1 17:42:53 2017 -0700 -- .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/943a684b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 121a02a..8fe2708 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -474,7 +474,8 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("Exception", output) } - test("newProductSeqEncoder with REPL defined class") { + // TODO: [SPARK-20548] Fix and re-enable + ignore("newProductSeqEncoder with REPL defined class") { val output = runInterpreterInPasteMode("local-cluster[1,4,4096]", """ |case class Click(id: Int) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20463] Add support for IS [NOT] DISTINCT FROM.
Repository: spark Updated Branches: refs/heads/master af726cd61 -> 259860d23 [SPARK-20463] Add support for IS [NOT] DISTINCT FROM. ## What changes were proposed in this pull request? Add support for the SQL standard distinct predicate to SPARK SQL. ``` IS [NOT] DISTINCT FROM ``` ## How was this patch tested? Tested using unit tests, integration tests, manual tests. Author: ptkool Closes #17764 from ptkool/is_not_distinct_from. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/259860d2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/259860d2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/259860d2 Branch: refs/heads/master Commit: 259860d23d1740954b739b639c5bdc3ede65ed25 Parents: af726cd Author: ptkool Authored: Mon May 1 17:05:35 2017 -0700 Committer: Xiao Li Committed: Mon May 1 17:05:35 2017 -0700 -- .../main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 1 + .../scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 5 + .../spark/sql/catalyst/parser/ExpressionParserSuite.scala | 5 + 3 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/259860d2/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 1ecb3d1..14c511f 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -534,6 +534,7 @@ predicate | NOT? kind=IN '(' query ')' | NOT? kind=(RLIKE | LIKE) pattern=valueExpression | IS NOT? kind=NULL +| IS NOT? kind=DISTINCT FROM right=valueExpression ; valueExpression http://git-wip-us.apache.org/repos/asf/spark/blob/259860d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a48a693..d2a9b4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -935,6 +935,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * - (NOT) LIKE * - (NOT) RLIKE * - IS (NOT) NULL. + * - IS (NOT) DISTINCT FROM */ private def withPredicate(e: Expression, ctx: PredicateContext): Expression = withOrigin(ctx) { // Invert a predicate if it has a valid NOT clause. @@ -962,6 +963,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { IsNotNull(e) case SqlBaseParser.NULL => IsNull(e) + case SqlBaseParser.DISTINCT if ctx.NOT != null => +EqualNullSafe(e, expression(ctx.right)) + case SqlBaseParser.DISTINCT => +Not(EqualNullSafe(e, expression(ctx.right))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/259860d2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala index e7f3b64..eb68eb9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala @@ -167,6 +167,11 @@ class ExpressionParserSuite extends PlanTest { assertEqual("a = b is not null", ('a === 'b).isNotNull) } + test("is distinct expressions") { +assertEqual("a is distinct from b", !('a <=> 'b)) +assertEqual("a is not distinct from b", 'a <=> 'b) + } + test("binary arithmetic expressions") { // Simple operations assertEqual("a * b", 'a * 'b) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20459][SQL] JdbcUtils throws IllegalStateException: Cause already initialized after getting SQLException
Repository: spark Updated Branches: refs/heads/master 2b2dd08e9 -> af726cd61 [SPARK-20459][SQL] JdbcUtils throws IllegalStateException: Cause already initialized after getting SQLException ## What changes were proposed in this pull request? Avoid failing to initCause on JDBC exception with cause initialized to null ## How was this patch tested? Existing tests Author: Sean Owen Closes #17800 from srowen/SPARK-20459. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af726cd6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af726cd6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af726cd6 Branch: refs/heads/master Commit: af726cd6117de05c6e3b9616b8699d884a53651b Parents: 2b2dd08 Author: Sean Owen Authored: Mon May 1 17:01:05 2017 -0700 Committer: Xiao Li Committed: Mon May 1 17:01:05 2017 -0700 -- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 11 ++- 1 file changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/af726cd6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5fc3c27..0183805 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -652,8 +652,17 @@ object JdbcUtils extends Logging { case e: SQLException => val cause = e.getNextException if (cause != null && e.getCause != cause) { + // If there is no cause already, set 'next exception' as cause. If cause is null, + // it *may* be because no cause was set yet if (e.getCause == null) { -e.initCause(cause) +try { + e.initCause(cause) +} catch { + // Or it may be null because the cause *was* explicitly initialized, to *null*, + // in which case this fails. There is no other way to detect it. + // addSuppressed in this case as well. + case _: IllegalStateException => e.addSuppressed(cause) +} } else { e.addSuppressed(cause) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20459][SQL] JdbcUtils throws IllegalStateException: Cause already initialized after getting SQLException
Repository: spark Updated Branches: refs/heads/branch-2.2 cfa6bcbe8 -> 5a0a8b039 [SPARK-20459][SQL] JdbcUtils throws IllegalStateException: Cause already initialized after getting SQLException ## What changes were proposed in this pull request? Avoid failing to initCause on JDBC exception with cause initialized to null ## How was this patch tested? Existing tests Author: Sean Owen Closes #17800 from srowen/SPARK-20459. (cherry picked from commit af726cd6117de05c6e3b9616b8699d884a53651b) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a0a8b03 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a0a8b03 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a0a8b03 Branch: refs/heads/branch-2.2 Commit: 5a0a8b0396df2feadb8333876cc08edf219fa177 Parents: cfa6bcb Author: Sean Owen Authored: Mon May 1 17:01:05 2017 -0700 Committer: Xiao Li Committed: Mon May 1 17:01:13 2017 -0700 -- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 11 ++- 1 file changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5a0a8b03/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5fc3c27..0183805 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -652,8 +652,17 @@ object JdbcUtils extends Logging { case e: SQLException => val cause = e.getNextException if (cause != null && e.getCause != cause) { + // If there is no cause already, set 'next exception' as cause. If cause is null, + // it *may* be because no cause was set yet if (e.getCause == null) { -e.initCause(cause) +try { + e.initCause(cause) +} catch { + // Or it may be null because the cause *was* explicitly initialized, to *null*, + // in which case this fails. There is no other way to detect it. + // addSuppressed in this case as well. + case _: IllegalStateException => e.addSuppressed(cause) +} } else { e.addSuppressed(cause) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20540][CORE] Fix unstable executor requests.
Repository: spark Updated Branches: refs/heads/branch-2.1 868b4a1aa -> 5915588a9 [SPARK-20540][CORE] Fix unstable executor requests. There are two problems fixed in this commit. First, the ExecutorAllocationManager sets a timeout to avoid requesting executors too often. However, the timeout is always updated based on its value and a timeout, not the current time. If the call is delayed by locking for more than the ongoing scheduler timeout, the manager will request more executors on every run. This seems to be the main cause of SPARK-20540. The second problem is that the total number of requested executors is not tracked by the CoarseGrainedSchedulerBackend. Instead, it calculates the value based on the current status of 3 variables: the number of known executors, the number of executors that have been killed, and the number of pending executors. But, the number of pending executors is never less than 0, even though there may be more known than requested. When executors are killed and not replaced, this can cause the request sent to YARN to be incorrect because there were too many executors due to the scheduler's state being slightly out of date. This is fixed by tracking the currently requested size explicitly. ## How was this patch tested? Existing tests. Author: Ryan Blue Closes #17813 from rdblue/SPARK-20540-fix-dynamic-allocation. (cherry picked from commit 2b2dd08e975dd7fbf261436aa877f1d7497ed31f) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5915588a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5915588a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5915588a Branch: refs/heads/branch-2.1 Commit: 5915588a92e6da955cd42621c3701031547ebf9e Parents: 868b4a1 Author: Ryan Blue Authored: Mon May 1 14:48:02 2017 -0700 Committer: Marcelo Vanzin Committed: Mon May 1 14:48:24 2017 -0700 -- .../spark/ExecutorAllocationManager.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 32 +--- .../StandaloneDynamicAllocationSuite.scala | 6 ++-- 3 files changed, 33 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5915588a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1366251..f054a78 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager( val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") - addTime += sustainedSchedulerBacklogTimeoutS * 1000 + addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000) delta } else { 0 http://git-wip-us.apache.org/repos/asf/spark/blob/5915588a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3452487..6fa239e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // `CoarseGrainedSchedulerBackend.this`. private val executorDataMap = new HashMap[String, ExecutorData] + // Number of executors requested by the cluster manager, [[ExecutorAllocationManager]] + @GuardedBy("CoarseGrainedSchedulerBackend.this") + private var requestedTotalExecutors = 0 + // Number of executors requested from the cluster manager that have not registered yet @GuardedBy("CoarseGrainedSchedulerBackend.this") private var numPendingExecutors = 0 @@ -390,6 +394,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * */ protected def reset(): Unit = { val executors = synchronized { + requestedTotalExecutors = 0 numPendingExecutors = 0 executorsPendingToRemove.clear() Set() ++ executorDataMap.keys @@ -463,12 +468,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logInfo(s"Requesting $numAdditionalExecutors additional
spark git commit: [SPARK-20540][CORE] Fix unstable executor requests.
Repository: spark Updated Branches: refs/heads/branch-2.2 6f0d29672 -> cfa6bcbe8 [SPARK-20540][CORE] Fix unstable executor requests. There are two problems fixed in this commit. First, the ExecutorAllocationManager sets a timeout to avoid requesting executors too often. However, the timeout is always updated based on its value and a timeout, not the current time. If the call is delayed by locking for more than the ongoing scheduler timeout, the manager will request more executors on every run. This seems to be the main cause of SPARK-20540. The second problem is that the total number of requested executors is not tracked by the CoarseGrainedSchedulerBackend. Instead, it calculates the value based on the current status of 3 variables: the number of known executors, the number of executors that have been killed, and the number of pending executors. But, the number of pending executors is never less than 0, even though there may be more known than requested. When executors are killed and not replaced, this can cause the request sent to YARN to be incorrect because there were too many executors due to the scheduler's state being slightly out of date. This is fixed by tracking the currently requested size explicitly. ## How was this patch tested? Existing tests. Author: Ryan Blue Closes #17813 from rdblue/SPARK-20540-fix-dynamic-allocation. (cherry picked from commit 2b2dd08e975dd7fbf261436aa877f1d7497ed31f) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfa6bcbe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfa6bcbe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfa6bcbe Branch: refs/heads/branch-2.2 Commit: cfa6bcbe83b9a4b9607e23ac889963b6aa02f0d9 Parents: 6f0d296 Author: Ryan Blue Authored: Mon May 1 14:48:02 2017 -0700 Committer: Marcelo Vanzin Committed: Mon May 1 14:48:11 2017 -0700 -- .../spark/ExecutorAllocationManager.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 32 +--- .../StandaloneDynamicAllocationSuite.scala | 6 ++-- 3 files changed, 33 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cfa6bcbe/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 261b332..fcc72ff 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager( val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") - addTime += sustainedSchedulerBacklogTimeoutS * 1000 + addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000) delta } else { 0 http://git-wip-us.apache.org/repos/asf/spark/blob/cfa6bcbe/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4eedaae..dc82bb7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // `CoarseGrainedSchedulerBackend.this`. private val executorDataMap = new HashMap[String, ExecutorData] + // Number of executors requested by the cluster manager, [[ExecutorAllocationManager]] + @GuardedBy("CoarseGrainedSchedulerBackend.this") + private var requestedTotalExecutors = 0 + // Number of executors requested from the cluster manager that have not registered yet @GuardedBy("CoarseGrainedSchedulerBackend.this") private var numPendingExecutors = 0 @@ -413,6 +417,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * */ protected def reset(): Unit = { val executors = synchronized { + requestedTotalExecutors = 0 numPendingExecutors = 0 executorsPendingToRemove.clear() Set() ++ executorDataMap.keys @@ -487,12 +492,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logInfo(s"Requesting $numAdditionalExecutors additional
spark git commit: [SPARK-20540][CORE] Fix unstable executor requests.
Repository: spark Updated Branches: refs/heads/master 6fc6cf88d -> 2b2dd08e9 [SPARK-20540][CORE] Fix unstable executor requests. There are two problems fixed in this commit. First, the ExecutorAllocationManager sets a timeout to avoid requesting executors too often. However, the timeout is always updated based on its value and a timeout, not the current time. If the call is delayed by locking for more than the ongoing scheduler timeout, the manager will request more executors on every run. This seems to be the main cause of SPARK-20540. The second problem is that the total number of requested executors is not tracked by the CoarseGrainedSchedulerBackend. Instead, it calculates the value based on the current status of 3 variables: the number of known executors, the number of executors that have been killed, and the number of pending executors. But, the number of pending executors is never less than 0, even though there may be more known than requested. When executors are killed and not replaced, this can cause the request sent to YARN to be incorrect because there were too many executors due to the scheduler's state being slightly out of date. This is fixed by tracking the currently requested size explicitly. ## How was this patch tested? Existing tests. Author: Ryan Blue Closes #17813 from rdblue/SPARK-20540-fix-dynamic-allocation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b2dd08e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b2dd08e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b2dd08e Branch: refs/heads/master Commit: 2b2dd08e975dd7fbf261436aa877f1d7497ed31f Parents: 6fc6cf8 Author: Ryan Blue Authored: Mon May 1 14:48:02 2017 -0700 Committer: Marcelo Vanzin Committed: Mon May 1 14:48:02 2017 -0700 -- .../spark/ExecutorAllocationManager.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 32 +--- .../StandaloneDynamicAllocationSuite.scala | 6 ++-- 3 files changed, 33 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2b2dd08e/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 261b332..fcc72ff 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager( val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") - addTime += sustainedSchedulerBacklogTimeoutS * 1000 + addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000) delta } else { 0 http://git-wip-us.apache.org/repos/asf/spark/blob/2b2dd08e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4eedaae..dc82bb7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // `CoarseGrainedSchedulerBackend.this`. private val executorDataMap = new HashMap[String, ExecutorData] + // Number of executors requested by the cluster manager, [[ExecutorAllocationManager]] + @GuardedBy("CoarseGrainedSchedulerBackend.this") + private var requestedTotalExecutors = 0 + // Number of executors requested from the cluster manager that have not registered yet @GuardedBy("CoarseGrainedSchedulerBackend.this") private var numPendingExecutors = 0 @@ -413,6 +417,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * */ protected def reset(): Unit = { val executors = synchronized { + requestedTotalExecutors = 0 numPendingExecutors = 0 executorsPendingToRemove.clear() Set() ++ executorDataMap.keys @@ -487,12 +492,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") val response = synchronized { + requestedTotalExecutors +
spark git commit: [SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group
Repository: spark Updated Branches: refs/heads/branch-2.2 38edb9256 -> 6f0d29672 [SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group ## What changes were proposed in this pull request? Job group: adding a job group is required to properly cancel running jobs related to a query. Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI. ## How was this patch tested? - Unit tests - UI screenshot - Order by job id: ![screen shot 2017-04-27 at 5 10 09 pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png) - Order by description: ![screen shot 2017-04-27 at 5 10 22 pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png) - Order by job id (no query name): ![screen shot 2017-04-27 at 5 21 33 pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png) - Order by description (no query name): ![screen shot 2017-04-27 at 5 21 44 pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png) Author: Kunal Khamar Closes #17765 from kunalkhamar/sc-6696. (cherry picked from commit 6fc6cf88d871f5b05b0ad1a504e0d6213cf9d331) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f0d2967 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f0d2967 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f0d2967 Branch: refs/heads/branch-2.2 Commit: 6f0d29672512bcb720fb82bc92071207dfae5eb1 Parents: 38edb92 Author: Kunal Khamar Authored: Mon May 1 11:37:30 2017 -0700 Committer: Shixiong Zhu Committed: Mon May 1 11:37:44 2017 -0700 -- .../scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../execution/streaming/StreamExecution.scala | 12 .../spark/sql/streaming/StreamSuite.scala | 66 3 files changed, 79 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f0d2967/core/src/main/scala/org/apache/spark/ui/UIUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index e53d690..79b0d81 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging { val xml = XML.loadString(s"""$desc""") // Verify that this has only anchors and span (we are wrapping in span) - val allowedNodeLabels = Set("a", "span") + val allowedNodeLabels = Set("a", "span", "br") val illegalNodes = xml \\ "_" filterNot { case node: Node => allowedNodeLabels.contains(node.label) } http://git-wip-us.apache.org/repos/asf/spark/blob/6f0d2967/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bcf0d97..affc201 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -252,6 +252,8 @@ class StreamExecution( */ private def runBatches(): Unit = { try { + sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString, +interruptOnCancel = true) if (sparkSession.sessionState.conf.streamingMetricsEnabled) { sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } @@ -289,6 +291,7 @@ class StreamExecution( if (currentBatchId < 0) { // We'll do this initialization only once populateStartOffsets(sparkSessionToRunBatches) + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) logDebug(s"Stream running from $committedOffsets to $availableOffsets") } else { constructNextBatch() @@ -308,6 +311,7 @@ class StreamExecution( logDebug(s"batch ${currentBatchId} committed") // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) } else { currentSta
spark git commit: [SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group
Repository: spark Updated Branches: refs/heads/master ab30590f4 -> 6fc6cf88d [SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group ## What changes were proposed in this pull request? Job group: adding a job group is required to properly cancel running jobs related to a query. Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI. ## How was this patch tested? - Unit tests - UI screenshot - Order by job id: ![screen shot 2017-04-27 at 5 10 09 pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png) - Order by description: ![screen shot 2017-04-27 at 5 10 22 pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png) - Order by job id (no query name): ![screen shot 2017-04-27 at 5 21 33 pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png) - Order by description (no query name): ![screen shot 2017-04-27 at 5 21 44 pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png) Author: Kunal Khamar Closes #17765 from kunalkhamar/sc-6696. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6fc6cf88 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6fc6cf88 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6fc6cf88 Branch: refs/heads/master Commit: 6fc6cf88d871f5b05b0ad1a504e0d6213cf9d331 Parents: ab30590 Author: Kunal Khamar Authored: Mon May 1 11:37:30 2017 -0700 Committer: Shixiong Zhu Committed: Mon May 1 11:37:30 2017 -0700 -- .../scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../execution/streaming/StreamExecution.scala | 12 .../spark/sql/streaming/StreamSuite.scala | 66 3 files changed, 79 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6fc6cf88/core/src/main/scala/org/apache/spark/ui/UIUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index e53d690..79b0d81 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging { val xml = XML.loadString(s"""$desc""") // Verify that this has only anchors and span (we are wrapping in span) - val allowedNodeLabels = Set("a", "span") + val allowedNodeLabels = Set("a", "span", "br") val illegalNodes = xml \\ "_" filterNot { case node: Node => allowedNodeLabels.contains(node.label) } http://git-wip-us.apache.org/repos/asf/spark/blob/6fc6cf88/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bcf0d97..affc201 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -252,6 +252,8 @@ class StreamExecution( */ private def runBatches(): Unit = { try { + sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString, +interruptOnCancel = true) if (sparkSession.sessionState.conf.streamingMetricsEnabled) { sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } @@ -289,6 +291,7 @@ class StreamExecution( if (currentBatchId < 0) { // We'll do this initialization only once populateStartOffsets(sparkSessionToRunBatches) + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) logDebug(s"Stream running from $committedOffsets to $availableOffsets") } else { constructNextBatch() @@ -308,6 +311,7 @@ class StreamExecution( logDebug(s"batch ${currentBatchId} committed") // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) } else { currentStatus = currentStatus.copy(isDataAvailable = false) updateStatusMessage("Waiting for data to a
spark git commit: [SPARK-20517][UI] Fix broken history UI download link
Repository: spark Updated Branches: refs/heads/branch-2.1 5131b0a96 -> 868b4a1aa [SPARK-20517][UI] Fix broken history UI download link The download link in history server UI is concatenated with: ``` Download ``` Here `num` field represents number of attempts, this is not equal to REST APIs. In the REST API, if attempt id is not existed the URL should be `api/v1/applications//logs`, otherwise the URL should be `api/v1/applications///logs`. Using `` to represent `` will lead to the issue of "no such app". Manual verification. CC ajbozarth can you please review this change, since you add this feature before? Thanks! Author: jerryshao Closes #17795 from jerryshao/SPARK-20517. (cherry picked from commit ab30590f448d05fc1864c54a59b6815bdeef8fc7) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/868b4a1a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/868b4a1a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/868b4a1a Branch: refs/heads/branch-2.1 Commit: 868b4a1aa954d03d9ba29b9d7743eeefeece682c Parents: 5131b0a Author: jerryshao Authored: Mon May 1 10:25:29 2017 -0700 Committer: Marcelo Vanzin Committed: Mon May 1 10:26:33 2017 -0700 -- .../org/apache/spark/ui/static/historypage-template.html | 2 +- core/src/main/resources/org/apache/spark/ui/static/historypage.js | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/868b4a1a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html index 42e2d9a..6ba3b09 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html @@ -77,7 +77,7 @@ {{duration}} {{sparkUser}} {{lastUpdated}} - Download + Download {{/attempts}} {{/applications}} http://git-wip-us.apache.org/repos/asf/spark/blob/868b4a1a/core/src/main/resources/org/apache/spark/ui/static/historypage.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 8fd9186..d095a2c 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -114,6 +114,9 @@ $(document).ready(function() { attempt["startTime"] = formatDate(attempt["startTime"]); attempt["endTime"] = formatDate(attempt["endTime"]); attempt["lastUpdated"] = formatDate(attempt["lastUpdated"]); + attempt["log"] = uiRoot + "/api/v1/applications/" + id + "/" + +(attempt.hasOwnProperty("attemptId") ? attempt["attemptId"] + "/" : "") + "logs"; + var app_clone = {"id" : id, "name" : name, "num" : num, "attempts" : [attempt]}; array.push(app_clone); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20517][UI] Fix broken history UI download link
Repository: spark Updated Branches: refs/heads/master 6b44c4d63 -> ab30590f4 [SPARK-20517][UI] Fix broken history UI download link The download link in history server UI is concatenated with: ``` Download ``` Here `num` field represents number of attempts, this is not equal to REST APIs. In the REST API, if attempt id is not existed the URL should be `api/v1/applications//logs`, otherwise the URL should be `api/v1/applications///logs`. Using `` to represent `` will lead to the issue of "no such app". Manual verification. CC ajbozarth can you please review this change, since you add this feature before? Thanks! Author: jerryshao Closes #17795 from jerryshao/SPARK-20517. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab30590f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab30590f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab30590f Branch: refs/heads/master Commit: ab30590f448d05fc1864c54a59b6815bdeef8fc7 Parents: 6b44c4d Author: jerryshao Authored: Mon May 1 10:25:29 2017 -0700 Committer: Marcelo Vanzin Committed: Mon May 1 10:26:08 2017 -0700 -- .../org/apache/spark/ui/static/historypage-template.html | 2 +- core/src/main/resources/org/apache/spark/ui/static/historypage.js | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab30590f/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html index 42e2d9a..6ba3b09 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html @@ -77,7 +77,7 @@ {{duration}} {{sparkUser}} {{lastUpdated}} - Download + Download {{/attempts}} {{/applications}} http://git-wip-us.apache.org/repos/asf/spark/blob/ab30590f/core/src/main/resources/org/apache/spark/ui/static/historypage.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 54810ed..1f89306 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -120,6 +120,9 @@ $(document).ready(function() { attempt["startTime"] = formatDate(attempt["startTime"]); attempt["endTime"] = formatDate(attempt["endTime"]); attempt["lastUpdated"] = formatDate(attempt["lastUpdated"]); + attempt["log"] = uiRoot + "/api/v1/applications/" + id + "/" + +(attempt.hasOwnProperty("attemptId") ? attempt["attemptId"] + "/" : "") + "logs"; + var app_clone = {"id" : id, "name" : name, "num" : num, "attempts" : [attempt]}; array.push(app_clone); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20517][UI] Fix broken history UI download link
Repository: spark Updated Branches: refs/heads/branch-2.2 813abd2db -> 38edb9256 [SPARK-20517][UI] Fix broken history UI download link The download link in history server UI is concatenated with: ``` Download ``` Here `num` field represents number of attempts, this is not equal to REST APIs. In the REST API, if attempt id is not existed the URL should be `api/v1/applications//logs`, otherwise the URL should be `api/v1/applications///logs`. Using `` to represent `` will lead to the issue of "no such app". Manual verification. CC ajbozarth can you please review this change, since you add this feature before? Thanks! Author: jerryshao Closes #17795 from jerryshao/SPARK-20517. (cherry picked from commit ab30590f448d05fc1864c54a59b6815bdeef8fc7) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38edb925 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38edb925 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38edb925 Branch: refs/heads/branch-2.2 Commit: 38edb9256d426799901017561b486912e61369d2 Parents: 813abd2 Author: jerryshao Authored: Mon May 1 10:25:29 2017 -0700 Committer: Marcelo Vanzin Committed: Mon May 1 10:26:21 2017 -0700 -- .../org/apache/spark/ui/static/historypage-template.html | 2 +- core/src/main/resources/org/apache/spark/ui/static/historypage.js | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38edb925/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html index 42e2d9a..6ba3b09 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html @@ -77,7 +77,7 @@ {{duration}} {{sparkUser}} {{lastUpdated}} - Download + Download {{/attempts}} {{/applications}} http://git-wip-us.apache.org/repos/asf/spark/blob/38edb925/core/src/main/resources/org/apache/spark/ui/static/historypage.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 54810ed..1f89306 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -120,6 +120,9 @@ $(document).ready(function() { attempt["startTime"] = formatDate(attempt["startTime"]); attempt["endTime"] = formatDate(attempt["endTime"]); attempt["lastUpdated"] = formatDate(attempt["lastUpdated"]); + attempt["log"] = uiRoot + "/api/v1/applications/" + id + "/" + +(attempt.hasOwnProperty("attemptId") ? attempt["attemptId"] + "/" : "") + "logs"; + var app_clone = {"id" : id, "name" : name, "num" : num, "attempts" : [attempt]}; array.push(app_clone); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20534][SQL] Make outer generate exec return empty rows
Repository: spark Updated Branches: refs/heads/branch-2.2 c890e938c -> 813abd2db [SPARK-20534][SQL] Make outer generate exec return empty rows ## What changes were proposed in this pull request? Generate exec does not produce `null` values if the generator for the input row is empty and the generate operates in outer mode without join. This is caused by the fact that the `join=false` code path is different from the `join=true` code path, and that the `join=false` code path did deal with outer properly. This PR addresses this issue. ## How was this patch tested? Updated `outer*` tests in `GeneratorFunctionSuite`. Author: Herman van Hovell Closes #17810 from hvanhovell/SPARK-20534. (cherry picked from commit 6b44c4d63ab14162e338c5f1ac77333956870a90) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/813abd2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/813abd2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/813abd2d Branch: refs/heads/branch-2.2 Commit: 813abd2db6140c4a294cdbeca2303dbfb7903107 Parents: c890e93 Author: Herman van Hovell Authored: Mon May 1 09:46:35 2017 -0700 Committer: gatorsmile Committed: Mon May 1 09:46:44 2017 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../plans/logical/basicLogicalOperators.scala | 2 +- .../spark/sql/execution/GenerateExec.scala | 33 +++- .../spark/sql/GeneratorFunctionSuite.scala | 12 +++ 4 files changed, 26 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dd768d1..f2b9764 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -441,8 +441,7 @@ object ColumnPruning extends Rule[LogicalPlan] { g.copy(child = prunedChild(g.child, g.references)) // Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) -if g.join && !g.outer && p.references.subsetOf(g.generatedSet) => +case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) // Eliminate unneeded attributes from right side of a Left Existence Join. http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 3ad757e..f663d7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -83,7 +83,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * @param join when true, each output row is implicitly joined with the input tuple that produced * it. * @param outer when true, each input row will be output at least once, even if the output of the - * given `generator` is empty. `outer` has no effect when `join` is false. + * given `generator` is empty. * @param qualifier Qualifier for the attributes of generator(UDTF) * @param generatorOutput The output schema of the Generator. * @param child Children logical plan node http://git-wip-us.apache.org/repos/asf/spark/blob/813abd2d/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index f87d058..1812a11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow]) extends Iterator[InternalRow] { - l
spark git commit: [SPARK-20534][SQL] Make outer generate exec return empty rows
Repository: spark Updated Branches: refs/heads/master f0169a1c6 -> 6b44c4d63 [SPARK-20534][SQL] Make outer generate exec return empty rows ## What changes were proposed in this pull request? Generate exec does not produce `null` values if the generator for the input row is empty and the generate operates in outer mode without join. This is caused by the fact that the `join=false` code path is different from the `join=true` code path, and that the `join=false` code path did deal with outer properly. This PR addresses this issue. ## How was this patch tested? Updated `outer*` tests in `GeneratorFunctionSuite`. Author: Herman van Hovell Closes #17810 from hvanhovell/SPARK-20534. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b44c4d6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b44c4d6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b44c4d6 Branch: refs/heads/master Commit: 6b44c4d63ab14162e338c5f1ac77333956870a90 Parents: f0169a1 Author: Herman van Hovell Authored: Mon May 1 09:46:35 2017 -0700 Committer: gatorsmile Committed: Mon May 1 09:46:35 2017 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../plans/logical/basicLogicalOperators.scala | 2 +- .../spark/sql/execution/GenerateExec.scala | 33 +++- .../spark/sql/GeneratorFunctionSuite.scala | 12 +++ 4 files changed, 26 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6b44c4d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dd768d1..f2b9764 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -441,8 +441,7 @@ object ColumnPruning extends Rule[LogicalPlan] { g.copy(child = prunedChild(g.child, g.references)) // Turn off `join` for Generate if no column from it's child is used -case p @ Project(_, g: Generate) -if g.join && !g.outer && p.references.subsetOf(g.generatedSet) => +case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) // Eliminate unneeded attributes from right side of a Left Existence Join. http://git-wip-us.apache.org/repos/asf/spark/blob/6b44c4d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 3ad757e..f663d7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -83,7 +83,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * @param join when true, each output row is implicitly joined with the input tuple that produced * it. * @param outer when true, each input row will be output at least once, even if the output of the - * given `generator` is empty. `outer` has no effect when `join` is false. + * given `generator` is empty. * @param qualifier Qualifier for the attributes of generator(UDTF) * @param generatorOutput The output schema of the Generator. * @param child Children logical plan node http://git-wip-us.apache.org/repos/asf/spark/blob/6b44c4d6/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index f87d058..1812a11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow]) extends Iterator[InternalRow] { - lazy val results = func().toIterator + lazy val results: Iterator[InternalRow] = func().toIterator ove
spark git commit: [SPARK-20290][MINOR][PYTHON][SQL] Add PySpark wrapper for eqNullSafe
Repository: spark Updated Branches: refs/heads/master a355b667a -> f0169a1c6 [SPARK-20290][MINOR][PYTHON][SQL] Add PySpark wrapper for eqNullSafe ## What changes were proposed in this pull request? Adds Python bindings for `Column.eqNullSafe` ## How was this patch tested? Manual tests, existing unit tests, doc build. Author: zero323 Closes #17605 from zero323/SPARK-20290. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0169a1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0169a1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0169a1c Branch: refs/heads/master Commit: f0169a1c6a1ac06045d57f8aaa2c841bb39e23ac Parents: a355b66 Author: zero323 Authored: Mon May 1 09:43:32 2017 -0700 Committer: gatorsmile Committed: Mon May 1 09:43:32 2017 -0700 -- python/pyspark/sql/column.py | 55 +++ python/pyspark/sql/tests.py | 2 +- 2 files changed, 56 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0169a1c/python/pyspark/sql/column.py -- diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index b8df37f..e753ed4 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -171,6 +171,61 @@ class Column(object): __ge__ = _bin_op("geq") __gt__ = _bin_op("gt") +_eqNullSafe_doc = """ +Equality test that is safe for null values. + +:param other: a value or :class:`Column` + +>>> from pyspark.sql import Row +>>> df1 = spark.createDataFrame([ +... Row(id=1, value='foo'), +... Row(id=2, value=None) +... ]) +>>> df1.select( +... df1['value'] == 'foo', +... df1['value'].eqNullSafe('foo'), +... df1['value'].eqNullSafe(None) +... ).show() ++-+---++ +|(value = foo)|(value <=> foo)|(value <=> NULL)| ++-+---++ +| true| true| false| +| null| false|true| ++-+---++ +>>> df2 = spark.createDataFrame([ +... Row(value = 'bar'), +... Row(value = None) +... ]) +>>> df1.join(df2, df1["value"] == df2["value"]).count() +0 +>>> df1.join(df2, df1["value"].eqNullSafe(df2["value"])).count() +1 +>>> df2 = spark.createDataFrame([ +... Row(id=1, value=float('NaN')), +... Row(id=2, value=42.0), +... Row(id=3, value=None) +... ]) +>>> df2.select( +... df2['value'].eqNullSafe(None), +... df2['value'].eqNullSafe(float('NaN')), +... df2['value'].eqNullSafe(42.0) +... ).show() +++---++ +|(value <=> NULL)|(value <=> NaN)|(value <=> 42.0)| +++---++ +| false| true| false| +| false| false|true| +|true| false| false| +++---++ + +.. note:: Unlike Pandas, PySpark doesn't consider NaN values to be NULL. + See the `NaN Semantics`_ for details. +.. _NaN Semantics: + https://spark.apache.org/docs/latest/sql-programming-guide.html#nan-semantics +.. versionadded:: 2.3.0 +""" +eqNullSafe = _bin_op("eqNullSafe", _eqNullSafe_doc) + # `and`, `or`, `not` cannot be overloaded in Python, # so use bitwise operators as boolean operators __and__ = _bin_op('and') http://git-wip-us.apache.org/repos/asf/spark/blob/f0169a1c/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index cd92148..ce4abf8 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -982,7 +982,7 @@ class SQLTests(ReusedPySparkTestCase): cbool = (ci & ci), (ci | ci), (~ci) self.assertTrue(all(isinstance(c, Column) for c in cbool)) css = cs.contains('a'), cs.like('a'), cs.rlike('a'), cs.asc(), cs.desc(),\ -cs.startswith('a'), cs.endswith('a') +cs.startswith('a'), cs.endswith('a'), ci.eqNullSafe(cs) self.assertTrue(all(isinstance(c, Column) for c in css)) self.assertTrue(isinstance(ci.cast(LongType()), Column)) self.assertRaisesRegexp(ValueError, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org