[GitHub] spark issue #21952: [SPARK-24993] [SQL] Make Avro Fast Again
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21952 do we have the same regression for parquet? wondering if the regression comes from the `FileFormat` framework. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/22022 Merged to branch 2.2, please close this PR @mgaido91 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21027: [SPARK-23943][MESOS][DEPLOY] Improve observability of Me...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21027 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208462120 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- > No. Seq[Any] takes literal values (objects); Seq[Column] takes Column expressions. I mean: Before: ``` scala> val df = spark.range(10).selectExpr("struct(id) as a") df: org.apache.spark.sql.DataFrame = [a: struct] scala> df.groupBy().pivot("a", Seq(struct(lit(1.count().show() java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Column named_struct(col1, 1) at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78) at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:419) at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:338) ... 51 elided ``` After: ``` scala> val df = spark.range(10).selectExpr("struct(id) as a") df: org.apache.spark.sql.DataFrame = [a: struct] scala> df.groupBy().pivot("a", Seq(struct(lit(1.count().show() +---+ |[1]| +---+ | 1| +---+ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22033: [SPARK-23935][SQL][followup] mapEntry throws org....
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22033 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > just for clarification, so now .. there no outstanding bugs, some tests are ignored per #21320 (comment) and left comments were mostly addressed. Did i understand correctly? The ignored testsâand the scenarios they are intended to testâwill fail with a runtime exception if this feature is enabled. I put forward a fix in `ParquetReadSupport.scala`, but @gatorsmile didn't want to address that in this PR. Otherwise, there are no known bugs with this patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22033: [SPARK-23935][SQL][followup] mapEntry throws org.codehau...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/22033 Thanks! merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21939 @shaneknapp I think we would be better off just upping the minimum version of arrow to 0.10.0 here since it's pretty involved to get a test matrix up and running and the project is still in a fair amount of flux until a stable 1.0 is released. What are your thoughts on this @HyukjinKwon @cloud-fan @holdenk ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21889 **[Test build #94408 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94408/testReport)** for PR 21889 at commit [`23d03fb`](https://github.com/apache/spark/commit/23d03fb9f865053dc1e1da77532271177d8002b6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208460288 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- > This is what @MaxGekk added in #21699. Oops, I mean `pivot(String, Seq[Any])`. typo --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208460101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- > The previous interface pivot(Column, Seq[Any]) has existed for more then multiple years. Is this based on actual feedback from users or your speculation?\ This is what @MaxGekk added in https://github.com/apache/spark/pull/21699. > This assumption of yours is not true. See my reply to your comment below. No. Seq[Any] takes literal values (objects); Seq[Column] takes `Column` expressions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21889 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94406/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21889 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21889 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21889 **[Test build #94406 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94406/testReport)** for PR 21889 at commit [`23d03fb`](https://github.com/apache/spark/commit/23d03fb9f865053dc1e1da77532271177d8002b6). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208459585 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- > This assumption of yours is not true. See my reply to your comment below. By this PR, it's true, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208459448 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- > we would end up having too many versions of pivot and that would be sth confusing I'm afraid. The previous interface `pivot(Column, Seq[Any])` has existed for more then multiple years. Is this based on actual feedback from users or your speculation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208459011 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- > pivot(String, Seq[Any]) takes values and columns too (#22030 (comment), I guess). How about we have pivot(Column, Seq[Any]) takes values and columns too? This assumption of yours is not true. See my reply to your comment below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22022 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22022 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94401/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208458861 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -335,7 +337,7 @@ class RelationalGroupedDataset protected[sql]( * @since 1.6.0 */ def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = { -pivot(Column(pivotColumn), values) +pivot(Column(pivotColumn), values.map(lit)) --- End diff -- Yes, with Seq[Any] we only allow literal values, not `Column`s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208458789 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- The downside is if have both pivot(Column, Seq[Any]) and pivot(Column, Seq[Column]), we would end up having too many versions of pivot and that would be sth confusing I'm afraid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22022 **[Test build #94401 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94401/testReport)** for PR 22022 at commit [`16233d1`](https://github.com/apache/spark/commit/16233d181b0a61d6cd45a7dc42d49a8905c964ea). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22033: [SPARK-23935][SQL][followup] mapEntry throws org.codehau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22033 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94399/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22033: [SPARK-23935][SQL][followup] mapEntry throws org.codehau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22033 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21889 just for clarification, so now .. there no outstanding bugs, some tests are ignored per https://github.com/apache/spark/pull/21320#issuecomment-406353694 and left comments were mostly addressed. Did i understand correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22033: [SPARK-23935][SQL][followup] mapEntry throws org.codehau...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22033 **[Test build #94399 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94399/testReport)** for PR 22033 at commit [`d248773`](https://github.com/apache/spark/commit/d248773f6a4fd8cf6bf7d1118d1c6c1bd5d35b7b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208457801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- Here's nothing to argue with the analyzer or something. The first interface exposed is `pivot(String, Seq[Any]`. We better keep it similar to the original version if there isn't a big issue. What's the downside of allowing both by `pivot(Column, Seq[Any])`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21939 **[Test build #94407 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94407/testReport)** for PR 21939 at commit [`0652617`](https://github.com/apache/spark/commit/0652617d2960bedbf11a643d67bb6d65cb467ebc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21939 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1935/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208456902 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -335,7 +337,7 @@ class RelationalGroupedDataset protected[sql]( * @since 1.6.0 */ def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = { -pivot(Column(pivotColumn), values) +pivot(Column(pivotColumn), values.map(lit)) --- End diff -- So, we did allow only liternals but not generic columns before, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21939 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208456164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- However, we happened to have `pivot(String, Seq[Any])` first which I guess most of users already are used to it. `pivot(String, Seq[Any])` takes values and columns too (https://github.com/apache/spark/pull/22030#discussion_r208439423, I guess). How about we have `pivot(Column, Seq[Any])` takes values and columns too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22034: [SPARK-25054][CORE] Enable MetricsServlet sink for Execu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22034 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22034: [SPARK-25054][CORE] Enable MetricsServlet sink for Execu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22034 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22034: [SPARK-25054][CORE] Enable MetricsServlet sink for Execu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22034 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22034: [SPARK-25054][CORE] Enable MetricsServlet sink fo...
GitHub user LantaoJin opened a pull request: https://github.com/apache/spark/pull/22034 [SPARK-25054][CORE] Enable MetricsServlet sink for Executor ## What changes were proposed in this pull request? The MetricsServlet sink is added by default as a sink in the master. But there is no way to query the Executor metrics via Servlet. This ticket offers a way to enable the MetricsServlet Sink in Executor side when spark.executor.ui.enabled is set to true. ## How was this patch tested? Unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/LantaoJin/spark SPARK-25054 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22034.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22034 commit f446fccfcd803f5081e1f994401ce86a474b9fff Author: LantaoJin Date: 2018-08-08T04:07:36Z [SPARK-25054][CORE] Enable MetricsServlet sink for Executor --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208453178 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- The very fundamental interface we should have is `pivot(Column, Seq[Column])`, which allows any form and any type of pivot column, and the same with pivot values. This is close to what we support in SQL (SQL pivot support will actually be a subset of DataFrame pivot support after we have this interface), and verifying that the pivot values are constant is taken care of in the Analyzer. That said, we still need to keep the old `pivot(String, Seq[Any])` for simple usages and for backward compatibility, but I don't think we need to expand its capability. It is pretty clear to me that pivot(String ...) takes a column name and simple objects while with pivot(Column...) you could make any sophisticated use of pivot you would like to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21845: [SPARK-24886][INFRA] Fix the testing script to increase ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21845 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94396/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21845: [SPARK-24886][INFRA] Fix the testing script to increase ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21845 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21845: [SPARK-24886][INFRA] Fix the testing script to increase ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21845 **[Test build #94396 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94396/testReport)** for PR 21845 at commit [`08b4ebe`](https://github.com/apache/spark/commit/08b4ebe6a278f4e12eff95a9109803ed88a2c25b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r208449418 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala --- @@ -137,13 +135,12 @@ case class AggregateInPandasExec( val columnarBatchIter = new ArrowPythonRunner( pyFuncs, -bufferSize, -reuseWorker, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, argOffsets, aggInputSchema, sessionLocalTimeZone, -pythonRunnerConf).compute(projectedRowIter, context.partitionId(), context) +pythonRunnerConf, +sparkContext.conf).compute(projectedRowIter, context.partitionId(), context) --- End diff -- Seems like this is in executor side, but can we get `sparkContext`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208451663 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -335,7 +337,7 @@ class RelationalGroupedDataset protected[sql]( * @since 1.6.0 */ def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = { -pivot(Column(pivotColumn), values) +pivot(Column(pivotColumn), values.map(lit)) --- End diff -- Yes, you did. This "old" interface only takes in a single named column (say, "a", but not "a+1") by its name, but we turn it into a `Column` just to reuse the same implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 See https://github.com/apache/spark/pull/21320#issuecomment-406353694 for @gatorsmile's request to move the changes to `ParquetReadSupport.scala` to another PR. There was another, unrelated bug reported by @jainaks and addressed in https://github.com/apache/spark/pull/21320#issuecomment-408588685. AFAIK, there's nothing outstanding blocking this PR from being merged as I stated in https://github.com/apache/spark/pull/21889#issuecomment-410557228. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21608 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21608 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94397/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21608 **[Test build #94397 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94397/testReport)** for PR 21608 at commit [`70eddc8`](https://github.com/apache/spark/commit/70eddc8179532964ccb29bb22c939662cfb27e88). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21520: [SPARK-24505][SQL] Forbidding string interpolation in Co...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21520 @HyukjinKwon Thanks for looking into this. It is based on the comment and discussion here https://github.com/apache/spark/pull/21193#discussion_r186627099. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21889 That comment is before https://github.com/apache/spark/pull/21889#issuecomment-408330791. I am okay in general but want to be clear if I'm ignoring his decision or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user ajacques commented on the issue: https://github.com/apache/spark/pull/21889 >> but @gatorsmile wants to review it in a follow-on PR. > Where did he say it after the comment above? It was my interpretation of this comment: https://github.com/apache/spark/pull/21320#issuecomment-406353694 @gatorsmile, @HyukjinKwon Do we wish to block this PR to fix the issue with it enabled? It's not clear what your expectations are for this PR. 1. Are you okay with it not 100% working if it's disabled by default 2. Do you want this issue to be fixed at the cost of bringing more changes into this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21889 > but @gatorsmile wants to review it in a follow-on PR. I need a confirmation from @gatorsmile. I don't want to ignore his decision here in > Just FYI, we are unable to merge it if it has a correctness bug. @ajacques, thanks. I overlooked the recent changes made. Will take another look soon but don't block on this since most of them look addressed from a cursory look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21889 **[Test build #94406 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94406/testReport)** for PR 21889 at commit [`23d03fb`](https://github.com/apache/spark/commit/23d03fb9f865053dc1e1da77532271177d8002b6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user ajacques commented on the issue: https://github.com/apache/spark/pull/21889 @HyukjinKwon Looks like most of your comments have been already addressed, but I've gone ahead and made a few more tweaks to help this get merged. Please let me know if any blocking comments have been missed. As mentioned: This feature is not known to have any regressions in the default, disabled state. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21889 > but @gatorsmile wants to review it in a follow-on PR. Where did he say it after the comment above? Also why don't you address my comments if you're going to push more changes then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22029 Is there a clear definition for the expected behavior? I tried postgre before, it returns null for things like `(x, y) = (a, null)`, but throws analysis error for things like `(x, (y, z)) = (a, (null, b))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Assuming from #21889 (comment), we shouldn't have any identified bug here. What kind of bugs left to be fixed? That bug was address by b50ddb4. We still need to fix the bug underlying the failing (ignored) test case. I have a tentative fix for that, but @gatorsmile wants to review it in a follow-on PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21889#discussion_r208446828 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val janeDoe = FullName("Jane", "X.", "Doe") + val johnDoe = FullName("John", "Y.", "Doe") + val susanSmith = FullName("Susan", "Z.", "Smith") + + val contacts = +Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), + relatives = Map("brother" -> johnDoe)) :: +Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(id: Int, name: Name, address: String) + + val briefContacts = +BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map(), +p: Int) + + case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(id, name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(id, name, address) => + BriefContactWithDataPartitionColumn(id, name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and its parent struct") { +val query = sql("select name.middle, name from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, + Row("X.", Row("Jane", "X.", "Doe")) :: + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, Row("Janet", null, "Jones")) :: + Row(null, Row("Jim", null, "Jones")) :: + Nil) + } + + testSchemaPruning("select a single complex field array and its parent struct array") { +val query = sql("select friends.middle, friends from contacts where p=1 order by id") +checkScanSchemata(query, + "struct>>") +checkAnswer(query, + Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) :: + Row(Array.empty[String], Array.empty[Row]) :: + Nil) + } + + testSchemaPruning("select a single complex field from a map entry and its parent map entry") { +val query = + sql("select relatives[\"brother\"].middle, relatives[\"brother\"] from contacts where p=1 " + +"order by id") +checkScanSchemata(query, + "struct>>") +checkAnswer(query, +
[GitHub] spark issue #22013: [SPARK-23939][SQL] Add transform_keys function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22013 **[Test build #94405 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94405/testReport)** for PR 22013 at commit [`150a6a5`](https://github.com/apache/spark/commit/150a6a5c405c78e7a5f7dd9b3f3c72f95290ec71). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user codeatri commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r20844 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala --- @@ -181,4 +187,46 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper (acc, array) => coalesce(aggregate(array, acc, (acc, elem) => acc + elem), acc)), 15) } + + test("TransformKeys") { +val ai0 = Literal.create( + Map(1 -> 1, 2 -> 2, 3 -> 3), --- End diff -- Thanks for catching this! Included test cases, both here and in DataFrameFunctionsSuite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22032: [SPARK-25047][ML] Can't assign SerializedLambda to scala...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22032 **[Test build #94400 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94400/testReport)** for PR 22032 at commit [`e76cd81`](https://github.com/apache/spark/commit/e76cd810e57a0c7dcee53cc0ed5daa2a5308c59f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22032: [SPARK-25047][ML] Can't assign SerializedLambda to scala...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22032 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94400/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22032: [SPARK-25047][ML] Can't assign SerializedLambda to scala...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22032 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208444793 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType --- End diff -- We don't need to define this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208444629 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -2117,6 +2117,65 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(ex4.getMessage.contains("data type mismatch: argument 3 requires int type")) } + test("zip_with function - arrays for primitive type not containing null") { +val df1 = Seq( + (Seq(9001, 9002, 9003), Seq(4, 5, 6)), + (Seq(1, 2), Seq(3, 4)), + (Seq.empty[Int], Seq.empty[Int]), + (null, null) +).toDF("val1", "val2") +val df2 = Seq( + (Seq(1, 2, 3), Seq("a", "b", "c")), + (Seq(1, 2, 3), Seq("a", "b")) +).toDF("val1", "val2") + +def testArrayOfPrimitiveTypeNotContainsNull(): Unit = { + val expectedValue1 = Seq( +Row(Seq(9005, 9007, 9009)), +Row(Seq(4, 6)), +Row(Seq.empty), +Row(null)) + checkAnswer(df1.selectExpr("zip_with(val1, val2, (x, y) -> x + y)"), expectedValue1) + + val expectedValue2 = Seq( +Row(Seq(Row("a", 1), Row("b", 2), Row("c", 3))), +Row(Seq(Row("a", 1), Row("b", 2), Row(null, 3 + checkAnswer(df2.selectExpr("zip_with(val1, val2, (x, y) -> (y, x))"), expectedValue2) +} + +// Test with local relation, the Project will be evaluated without codegen +testArrayOfPrimitiveTypeNotContainsNull() +// Test with cached relation, the Project will be evaluated with codegen +df1.cache() +df2.cache() +testArrayOfPrimitiveTypeNotContainsNull() + } + + test("zip_with function - arrays for primitive type containing null") { +val df1 = Seq[(Seq[Integer], Seq[Integer])]( + (Seq(9001, null, 9003), Seq(4, 5, 6)), + (Seq(1, null, 2, 4), Seq(3, 4)), + (Seq.empty, Seq.empty), + (null, null) +).toDF("val1", "val2") + +def testArrayOfPrimitiveTypeContainsNull(): Unit = { + val expectedValue1 = Seq( +Row(Seq(9005, null, 9009)), +Row(Seq(4, null, null, null)), +Row(Seq.empty), +Row(null)) + checkAnswer(df1.selectExpr("zip_with(val1, val2, (x, y) -> x + y)"), expectedValue1) +} + +// Test with local relation, the Project will be evaluated without codegen +testArrayOfPrimitiveTypeContainsNull() +// Test with cached relation, the Project will be evaluated with codegen +df1.cache() +testArrayOfPrimitiveTypeContainsNull() + } + + --- End diff -- Can you add a test for invalid cases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208445048 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +val (rightElementType, rightContainsNull) = right.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +copy(function = f(function, + (leftElementType, leftContainsNull) :: (rightElementType, rightContainsNull) :: Nil)) + } + + @transient lazy val (arr1Var, arr2Var) = { +val LambdaFunction(_, + (arr1Var: NamedLambdaVariable):: (arr2Var: NamedLambdaVariable) :: Nil, _) = function +(arr1Var, arr2Var) + } --- End diff -- nit: the following should work: ```scala @transient lazy val LambdaFunction(_, Seq(leftElemVar: NamedLambdaVariable, rightElemVar: NamedLambdaVariable), _) = function ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208445494 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -2117,6 +2117,65 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(ex4.getMessage.contains("data type mismatch: argument 3 requires int type")) } + test("zip_with function - arrays for primitive type not containing null") { +val df1 = Seq( + (Seq(9001, 9002, 9003), Seq(4, 5, 6)), + (Seq(1, 2), Seq(3, 4)), + (Seq.empty[Int], Seq.empty[Int]), + (null, null) +).toDF("val1", "val2") +val df2 = Seq( + (Seq(1, 2, 3), Seq("a", "b", "c")), + (Seq(1, 2, 3), Seq("a", "b")) +).toDF("val1", "val2") + +def testArrayOfPrimitiveTypeNotContainsNull(): Unit = { + val expectedValue1 = Seq( +Row(Seq(9005, 9007, 9009)), +Row(Seq(4, 6)), +Row(Seq.empty), +Row(null)) + checkAnswer(df1.selectExpr("zip_with(val1, val2, (x, y) -> x + y)"), expectedValue1) + + val expectedValue2 = Seq( +Row(Seq(Row("a", 1), Row("b", 2), Row("c", 3))), +Row(Seq(Row("a", 1), Row("b", 2), Row(null, 3 + checkAnswer(df2.selectExpr("zip_with(val1, val2, (x, y) -> (y, x))"), expectedValue2) +} + +// Test with local relation, the Project will be evaluated without codegen +testArrayOfPrimitiveTypeNotContainsNull() +// Test with cached relation, the Project will be evaluated with codegen +df1.cache() +df2.cache() +testArrayOfPrimitiveTypeNotContainsNull() + } + + test("zip_with function - arrays for primitive type containing null") { +val df1 = Seq[(Seq[Integer], Seq[Integer])]( + (Seq(9001, null, 9003), Seq(4, 5, 6)), + (Seq(1, null, 2, 4), Seq(3, 4)), + (Seq.empty, Seq.empty), + (null, null) +).toDF("val1", "val2") + +def testArrayOfPrimitiveTypeContainsNull(): Unit = { + val expectedValue1 = Seq( +Row(Seq(9005, null, 9009)), +Row(Seq(4, null, null, null)), +Row(Seq.empty), +Row(null)) + checkAnswer(df1.selectExpr("zip_with(val1, val2, (x, y) -> x + y)"), expectedValue1) +} + +// Test with local relation, the Project will be evaluated without codegen +testArrayOfPrimitiveTypeContainsNull() +// Test with cached relation, the Project will be evaluated with codegen +df1.cache() +testArrayOfPrimitiveTypeContainsNull() + } + + --- End diff -- Also can you add tests to `HigherOrderFunctionsSuite` to check more explicit patterns? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22027 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22027 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22013: [SPARK-23939][SQL] Add transform_keys function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22013 **[Test build #94404 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94404/testReport)** for PR 22013 at commit [`5806ac4`](https://github.com/apache/spark/commit/5806ac46707772fd1e4befa445157ed0f9c75084). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21520: [SPARK-24505][SQL] Forbidding string interpolation in Co...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21520 @viirya ~ I was just trying to read the PRs. Would you please mind if I ask where is the "Based on previous discussion" ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21977 Does this work by setting some container configs? Maybe we can apply this to k8s later, cc @liyinan926 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208441135 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + val dataset = spark.read.json(path.getAbsolutePath) + + dataset.createOrReplaceTempView("src") + + benchmark.addCase("sort with optimization", 10) { _ => + spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "true") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.addCase("sort without optimization", 10) { _ => + spark.conf.set("spark.sql.execution.rangeExchange.sampleCache.enabled", "false") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() +} + --- End diff -- I have removed them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user sddyljsx commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r208441067 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("sort", 10) { _ => +val dataset = spark.read.json(path.getAbsolutePath) +dataset.createOrReplaceTempView("src") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() --- End diff -- Yes, I forgot it. I have added it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208440273 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -93,21 +81,17 @@ case class DataSourceV2ScanExec( sparkContext, sqlContext.conf.continuousStreamingExecutorQueueSize, sqlContext.conf.continuousStreamingExecutorPollIntervalMs, -partitions).asInstanceOf[RDD[InternalRow]] - -case r: SupportsScanColumnarBatch if r.enableBatchRead() => - new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]] +partitions, +schema, + partitionReaderFactory.asInstanceOf[ContinuousPartitionReaderFactory]) --- End diff -- `DataSourceV2ScanExec` is shared between batch and streaming, so the `partitionReaderFactory` here is a general type instead of the concrete `ContinuousPartitionReaderFactory`. I think we can avoid this cast in the future refactoring, when we have a dedicated scan plan for continuous streaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21889 Assuming from https://github.com/apache/spark/pull/21889#issuecomment-408330791, we shouldn't have any identified bug here. What kind of bugs left to be fixed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21889 Can we address the comments I left on that PR too? Looks that's the only way to get through this? FWIW, since https://github.com/apache/spark/commit/51bee7aca13451167fa3e701fcd60f023eae5e61 is merged, we can now contribute to all people involved here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208439973 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -39,52 +36,43 @@ case class DataSourceV2ScanExec( @transient source: DataSourceV2, @transient options: Map[String, String], @transient pushedFilters: Seq[Expression], -@transient reader: DataSourceReader) +@transient readSupport: ReadSupport, +@transient scanConfig: ScanConfig) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { override def simpleString: String = "ScanV2 " + metadataString // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { case other: DataSourceV2ScanExec => - output == other.output && reader.getClass == other.reader.getClass && options == other.options + output == other.output && readSupport.getClass == other.readSupport.getClass && +options == other.options case _ => false } override def hashCode(): Int = { Seq(output, source, options).hashCode() } - override def outputPartitioning: physical.Partitioning = reader match { -case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 => - SinglePartition - -case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 => - SinglePartition - -case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 => + override def outputPartitioning: physical.Partitioning = readSupport match { +case _ if partitions.length == 1 => SinglePartition case s: SupportsReportPartitioning => new DataSourcePartitioning( -s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name))) case _ => super.outputPartitioning } - private lazy val partitions: Seq[InputPartition[InternalRow]] = { -reader.planInputPartitions().asScala - } + private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig) - private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match { -case r: SupportsScanColumnarBatch if r.enableBatchRead() => - assert(!reader.isInstanceOf[ContinuousReader], -"continuous stream reader does not support columnar read yet.") - r.planBatchInputPartitions().asScala - } + private lazy val partitionReaderFactory = readSupport.createReaderFactory(scanConfig) - private lazy val inputRDD: RDD[InternalRow] = reader match { -case _: ContinuousReader => + private lazy val inputRDD: RDD[InternalRow] = readSupport match { +case _: ContinuousReadSupport => + assert(!partitionReaderFactory.supportColumnarReads(), --- End diff -- maybe we should rename `supportColumnarReads` to `doColumnarReads`? A source can support both, and it should tell Spark which mode it wants to use. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208439720 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala --- @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag]( valuePrepared } - override def next(): T = { + override def next(): Any = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } valuePrepared = false reader.get() } } -new InterruptibleIterator(context, iter) +// TODO: get rid of this type hack. +new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) } override def getPreferredLocations(split: Partition): Seq[String] = { - split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations() + split.asInstanceOf[DataSourceRDDPartition].inputPartition.preferredLocations() --- End diff -- It's a common pattern in RDD that we cast the `split` to the concrete `Partition` class defined by this RDD. The partitions are created in `RDD#getPartitions`, so if we see other splits here, it's a bug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208439490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala --- @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag]( valuePrepared } - override def next(): T = { + override def next(): Any = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } valuePrepared = false reader.get() } } -new InterruptibleIterator(context, iter) +// TODO: get rid of this type hack. +new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) --- End diff -- The problem is that, we don't really have a batch API in Spark SQL. We rely on type erasure and codegen hack to implement columnar scan. It's hardcoded in the engine: `SparkPlan#execute` returns `RDD[InternalRow]`. if we have a RDD iterate over the rows in the batch, then whole stage codegen will break, as it iterates the input RDD and cast the record to `ColumnarBatch`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208439423 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -335,7 +337,7 @@ class RelationalGroupedDataset protected[sql]( * @since 1.6.0 */ def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = { -pivot(Column(pivotColumn), values) +pivot(Column(pivotColumn), values.map(lit)) --- End diff -- This is going to allow `pivot(String, Seq[Any])` also take `Column`. Did I misread the codes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 Also added javadoc as well. Most of contents are from StateStore but I didn't copy the note to implementation for state store since it is duplicated. Please let me know if we want to add content for the parameter target state store as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208439150 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); --- End diff -- For commit, the only thing it's interested in is the end offset. Even we pass in a `ScanConfig`, I think the implementation would just get the end offset from the `ScanConfig` and commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21733 **[Test build #94403 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94403/testReport)** for PR 21733 at commit [`e0ee04a`](https://github.com/apache/spark/commit/e0ee04af4f325db4813b8bf574c0de4cfbbbaed6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22022 **[Test build #94401 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94401/testReport)** for PR 22022 at commit [`16233d1`](https://github.com/apache/spark/commit/16233d181b0a61d6cd45a7dc42d49a8905c964ea). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22014 **[Test build #94402 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94402/testReport)** for PR 22014 at commit [`b9c11d5`](https://github.com/apache/spark/commit/b9c11d5de01f13258e2c41b78396dd21a269c0d2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22014 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1934/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/22022 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22014 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22022 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22022 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1933/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208438912 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * micro-batch mode. + */ +@InterfaceStability.Evolving +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end); + + /** + * Returns the most recent offset available. + */ + Offset latestOffset(Offset start); --- End diff -- a concrete example is in kafka data source, it needs to know the start offset to decide the latest offset, for purposes like rate control. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22022: [SPARK-24948][SHS][BACKPORT-2.2] Delegate check access p...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/22022 Sorry, let me test again to see everything is ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208438810 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); --- End diff -- I did it in many places, to allow a data source to implement both batch and streaming without conflicts. But here is a little different, `ContinuousPartitionReaderFactory` is a child of `PartitionReaderFactory`, which means a data source can return `ContinuousPartitionReaderFactory` for both batch and streaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22027 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22027 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94395/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22027 **[Test build #94395 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94395/testReport)** for PR 22027 at commit [`ddbcc04`](https://github.com/apache/spark/commit/ddbcc04bd6850b388f25faceb2cc4e1943a0f660). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user ajacques commented on the issue: https://github.com/apache/spark/pull/21889 Is there anything I can do to help with this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208437853 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Serializable; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +/** + * A factory of {@link PartitionReader}s. Implementations can do either row-based scan or columnar + * scan, by switching the {@link #supportColumnarReads()} flag. + */ +@InterfaceStability.Evolving +public interface PartitionReaderFactory extends Serializable { + + /** + * Returns a row-based partition reader to read data from the given {@link InputPartition}. + * + * Implementations probably need to cast the input partition to the concrete + * {@link InputPartition} class defined for the data source. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + PartitionReader createReader(InputPartition partition); + + /** + * Returns a columnar partition reader to read data from the given {@link InputPartition}. + * + * Implementations probably need to cast the input partition to the concrete + * {@link InputPartition} class defined for the data source. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + default PartitionReader createColumnarReader(InputPartition partition) { +throw new UnsupportedOperationException("Cannot create columnar reader."); + } + + /** + * If this method returns true, Spark will call {@link #createColumnarReader(InputPartition)} to + * create the {@link PartitionReader} and scan the data in a columnar way. This means, + * implementations must also implement {@link #createColumnarReader(InputPartition)} when true + * is returned here. + */ + default boolean supportColumnarReads() { --- End diff -- good idea! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208437780 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java --- @@ -29,24 +28,24 @@ * provide data writing ability for structured streaming. */ @InterfaceStability.Evolving -public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink { +public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink { -/** - * Creates an optional {@link StreamWriter} to save the data to this data source. Data - * sources can return None if there is no writing needed to be done. - * - * @param queryId A unique string for the writing query. It's possible that there are many - *writing queries running at the same time, and the returned - *{@link DataSourceWriter} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. - * @param mode the output mode which determines what successive epoch output means to this - * sink, please refer to {@link OutputMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - *case-insensitive string-to-string map. - */ -StreamWriter createStreamWriter( -String queryId, -StructType schema, -OutputMode mode, -DataSourceOptions options); + /** + * Creates an optional {@link StreamingWriteSupport} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many + *writing queries running at the same time, and the returned + *{@link StreamingWriteSupport} can use this id to distinguish itself from others. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive epoch output means to this + * sink, please refer to {@link OutputMode} for more details. + * @param options the options for the returned data source writer, which is an immutable + *case-insensitive string-to-string map. + */ + StreamingWriteSupport createStreamingWritSupport( +String queryId, --- End diff -- for the batch API, I think we can remove job id and ask the data source to generate UUID themselves. But for streaming, I'm not sure. Maybe we need it for failure recovery or streaming restart, cc @jose-torres --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22021: [SPARK-24948][SHS][BACKPORT-2.3] Delegate check access p...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/22021 @mgaido91 already merged to branch 2.3, please close this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org